Commit d8144c4e authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #3815 from ethereum-optimism/jg/split_batcher_start_v2

op-batcher: Split up main loop
parents 927b4321 f1dda46e
package op_batcher
import (
"bytes"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
type txID struct {
chID derive.ChannelID
frameNumber uint16
}
type taggedData struct {
data []byte
id txID
}
type channelManager struct {
// All blocks since the last request for new tx data
blocks []*types.Block
datas []taggedData
}
// func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// // todo: implement
// }
// TxData returns the next tx.data that should be submitted to L1.
// It is very simple & currently ignores the l1Head provided (this will change).
// It may buffer very large channels as well.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
// Note: l1Head is not actually used in this function.
// Return a pre-existing frame if we have it.
if len(s.datas) != 0 {
r := s.datas[0]
s.datas = s.datas[1:]
return r.data, r.id, nil
}
// Also return io.EOF if we cannot create a channel
if len(s.blocks) == 0 {
return nil, txID{}, io.EOF
}
// Add all pending blocks to a channel
ch, err := derive.NewChannelOut()
if err != nil {
return nil, txID{}, err
}
// TODO: use peek/pop paradigm here instead of manually slicing
i := 0
// Cap length at 100 blocks
l := len(s.blocks)
if l > 100 {
l = 100
}
for ; i < l; i++ {
if err := ch.AddBlock(s.blocks[i]); err == derive.ErrTooManyRLPBytes {
break
} else if err != nil {
return nil, txID{}, err
}
// TODO: limit the RLP size of the channel to be lower than the limit to enable
// channels to be fully submitted on time.
}
if err := ch.Close(); err != nil {
return nil, txID{}, err
}
var t []taggedData
frameNumber := uint16(0)
for {
var buf bytes.Buffer
buf.WriteByte(derive.DerivationVersion0)
err := ch.OutputFrame(&buf, 120_000)
if err != io.EOF && err != nil {
return nil, txID{}, err
}
t = append(t, taggedData{
data: buf.Bytes(),
id: txID{ch.ID(), frameNumber},
})
frameNumber += 1
if err == io.EOF {
break
}
}
s.datas = append(s.datas, t...)
// Say i = 0, 1 are added to the channel, but i = 2 returns ErrTooManyRLPBytes. i remains 2 & is inclusive, so this works.
// Say all blocks are added, i will be len(blocks) after exiting the loop (but never inside the loop).
s.blocks = s.blocks[i:]
if len(s.datas) == 0 {
return nil, txID{}, io.EOF // TODO: not enough data error instead
}
r := s.datas[0]
s.datas = s.datas[1:]
return r.data, r.id, nil
}
// TODO: Continuity check here?
// Invariants about what's on L1?
func (s *channelManager) AddL2Block(block *types.Block) error {
s.blocks = append(s.blocks, block)
return nil
}
package op_batcher
import (
"bytes"
"context"
"crypto/ecdsa"
"errors"
......@@ -13,11 +12,9 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/sequencer"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
......@@ -38,7 +35,7 @@ type BatchSubmitter struct {
lastSubmittedBlock eth.BlockID
ch *derive.ChannelOut
state *channelManager
}
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
......@@ -151,6 +148,7 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
txMgr: txmgr.NewSimpleTxManager("batcher", txManagerConfig, l1Client),
done: make(chan struct{}),
log: l,
state: new(channelManager),
// TODO: this context only exists because the even loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
......@@ -170,147 +168,83 @@ func (l *BatchSubmitter) Stop() {
l.wg.Wait()
}
// loadBlocksIntoState loads all blocks since the previous submitted block
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
if err != nil {
l.log.Warn("issue fetching L2 head", "err", err)
return
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
return
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
return
}
// If we just started, start at safe-head
if l.lastSubmittedBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
// If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
prevID := l.lastSubmittedBlock
// Add all blocks to "state"
for i := l.lastSubmittedBlock.Number + 1; i <= syncStatus.UnsafeL2.Number; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
return
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
return
}
if err := l.state.AddL2Block(block); err != nil {
return
}
prevID = eth.ToBlockID(block)
l.lastSubmittedBlock = prevID
l.log.Info("added L2 block to local state", "block", prevID, "tx_count", len(block.Transactions()), "time", block.Time())
}
}
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
ticker := time.NewTicker(l.cfg.PollInterval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-ticker.C:
// Do the simplest thing of one channel per range of blocks since the iteration of this loop.
// The channel is closed at the end of this loop (to avoid lifecycle management of the channel).
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
cancel()
if err != nil {
l.log.Warn("issue fetching L2 head", "err", err)
continue
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
continue
}
// If we just started, start at safe-head
if l.lastSubmittedBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
// If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
if ch, err := derive.NewChannelOut(); err != nil {
l.log.Error("Error creating channel", "err", err)
continue
} else {
l.ch = ch
}
prevID := l.lastSubmittedBlock
maxBlocksPerChannel := uint64(100)
// Hacky min() here to ensure that we don't batch submit more than 100 blocks per channel.
// TODO: use proper channel size here instead.
upToBlockNumber := syncStatus.UnsafeL2.Number
if l.lastSubmittedBlock.Number+1+maxBlocksPerChannel < upToBlockNumber {
upToBlockNumber = l.lastSubmittedBlock.Number + 1 + maxBlocksPerChannel
}
for i := l.lastSubmittedBlock.Number + 1; i <= upToBlockNumber; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
continue mainLoop
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
continue mainLoop
}
if err := l.ch.AddBlock(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err, "channel_id", l.ch.ID())
continue mainLoop
}
prevID = eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}
l.log.Info("added L2 block to channel", "block", prevID, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
}
if err := l.ch.Close(); err != nil {
l.log.Error("issue getting adding L2 Block", "err", err)
continue
}
// Hand role do-while loop to fully pull all frames out of the channel
l.loadBlocksIntoState(l.ctx)
// Empty the state after loading into it on every iteration.
for {
// Collect the output frame
data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0)
done := false
// subtract one, to account for the version byte
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize-1); err == io.EOF {
done = true
data, _, err := l.state.TxData(eth.L1BlockRef{})
if err == io.EOF {
break // local for loop
} else if err != nil {
l.log.Error("error outputting frame", "err", err)
continue mainLoop
}
// Query for the submitter's current nonce.
walletAddr := crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
nonce, err := l.cfg.L1Client.NonceAt(ctx, walletAddr, nil)
cancel()
if err != nil {
l.log.Error("unable to get current nonce", "err", err)
continue mainLoop
}
// Create the transaction
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
tx, err := l.CraftTx(ctx, data.Bytes(), nonce)
cancel()
if err != nil {
l.log.Error("unable to craft tx", "err", err)
continue mainLoop
}
// Construct the a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Debug("updating batch tx gas price")
return l.UpdateGasPrice(ctx, tx)
}
// Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low.
// TODO: does the tx manager nicely replace the tx?
// (submit a new one, that's within the channel timeout, but higher fee than previously submitted tx? Or use a cheap cancel tx?)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*time.Duration(l.cfg.ChannelTimeout))
receipt, err := l.txMgr.Send(ctx, updateGasPrice, l.cfg.L1Client.SendTransaction)
cancel()
if err != nil {
l.log.Warn("unable to publish tx", "err", err)
continue mainLoop
}
// The transaction was successfully submitted.
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "channel_id", l.ch.ID())
// If `ch.OutputFrame` returned io.EOF we don't need to submit any more frames for this channel.
if done {
break // local do-while loop
l.log.Error("unable to get tx data", "err", err)
break
}
_ = l.submitTransaction(data)
}
// TODO: if we exit to the mainLoop early on an error,
// it would be nice if we can determine which blocks are still readable from the partially submitted data.
// We can open a channel-in-reader, parse the data up to which we managed to submit it,
// and then take the block hash (if we remember which blocks we put in the channel)
//
// Now we just continue batch submission from the end of the channel.
l.lastSubmittedBlock = prevID
case <-l.done:
return
......
......@@ -2,6 +2,7 @@ package op_batcher
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/core"
......@@ -9,6 +10,48 @@ import (
"github.com/ethereum/go-ethereum/crypto"
)
func (l *BatchSubmitter) submitTransaction(data []byte) error {
// Query for the submitter's current nonce.
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
nonce, err := l.cfg.L1Client.NonceAt(ctx, l.addr, nil)
cancel()
if err != nil {
l.log.Error("unable to get current nonce", "err", err)
return err
}
// Create the transaction
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
tx, err := l.CraftTx(ctx, data, nonce)
cancel()
if err != nil {
l.log.Error("unable to craft tx", "err", err)
return err
}
// Construct the a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Debug("updating batch tx gas price")
return l.UpdateGasPrice(ctx, tx)
}
// Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low.
// TODO: does the tx manager nicely replace the tx?
// (submit a new one, that's within the channel timeout, but higher fee than previously submitted tx? Or use a cheap cancel tx?)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*time.Duration(l.cfg.ChannelTimeout))
receipt, err := l.txMgr.Send(ctx, updateGasPrice, l.cfg.L1Client.SendTransaction)
cancel()
if err != nil {
l.log.Warn("unable to publish tx", "err", err)
return err
}
// The transaction was successfully submitted.
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash)
return nil
}
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) CraftTx(ctx context.Context, data []byte, nonce uint64) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment