Commit a3084daa authored by Joshua Gutow's avatar Joshua Gutow

op-batcher: Split loading blocks into it's own function

More work in reducing the size of the state loop
parent 829f3c2c
...@@ -168,92 +168,83 @@ func (l *BatchSubmitter) Stop() { ...@@ -168,92 +168,83 @@ func (l *BatchSubmitter) Stop() {
l.wg.Wait() l.wg.Wait()
} }
// loadBlocksIntoState loads all blocks since the previous submitted block
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
if err != nil {
l.log.Warn("issue fetching L2 head", "err", err)
return
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
return
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
return
}
// If we just started, start at safe-head
if l.lastSubmittedBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
// If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
prevID := l.lastSubmittedBlock
// Add all blocks to "state"
for i := l.lastSubmittedBlock.Number + 1; i <= syncStatus.UnsafeL2.Number; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
return
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
return
}
if err := l.state.AddL2Block(block); err != nil {
return
}
prevID = eth.ToBlockID(block)
l.lastSubmittedBlock = prevID
l.log.Info("added L2 block to local state", "block", prevID, "tx_count", len(block.Transactions()), "time", block.Time())
}
}
func (l *BatchSubmitter) loop() { func (l *BatchSubmitter) loop() {
defer l.wg.Done() defer l.wg.Done()
ticker := time.NewTicker(l.cfg.PollInterval) ticker := time.NewTicker(l.cfg.PollInterval)
defer ticker.Stop() defer ticker.Stop()
mainLoop:
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// Do the simplest thing of one channel per range of blocks since the iteration of this loop. l.loadBlocksIntoState(l.ctx)
// The channel is closed at the end of this loop (to avoid lifecycle management of the channel).
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
cancel()
if err != nil {
l.log.Warn("issue fetching L2 head", "err", err)
continue
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
continue
}
// If we just started, start at safe-head
if l.lastSubmittedBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
// If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
prevID := l.lastSubmittedBlock
maxBlocksPerChannel := uint64(100)
// Hacky min() here to ensure that we don't batch submit more than 100 blocks per channel.
// TODO: use proper channel size here instead.
upToBlockNumber := syncStatus.UnsafeL2.Number
if l.lastSubmittedBlock.Number+1+maxBlocksPerChannel < upToBlockNumber {
upToBlockNumber = l.lastSubmittedBlock.Number + 1 + maxBlocksPerChannel
}
for i := l.lastSubmittedBlock.Number + 1; i <= upToBlockNumber; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
continue mainLoop
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
continue mainLoop
}
if err := l.state.AddL2Block(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err)
continue mainLoop
}
prevID = eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}
l.log.Info("added L2 block to channel", "block", prevID, "tx_count", len(block.Transactions()), "time", block.Time())
}
// Hand role do-while loop to fully pull all frames out of the channel // Empty the state after loading into it on every iteration.
for { for {
// Collect the output frame // Collect the output frame
data, _, err := l.state.TxData(syncStatus.HeadL1) data, _, err := l.state.TxData(eth.L1BlockRef{})
if err == io.EOF { if err == io.EOF {
break // local for loop break // local for loop
} else if err != nil { } else if err != nil {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
continue mainLoop break
} }
_ = l.submitTransaction(data) _ = l.submitTransaction(data)
} }
// TODO: if we exit to the mainLoop early on an error,
// it would be nice if we can determine which blocks are still readable from the partially submitted data.
// We can open a channel-in-reader, parse the data up to which we managed to submit it,
// and then take the block hash (if we remember which blocks we put in the channel)
//
// Now we just continue batch submission from the end of the channel.
l.lastSubmittedBlock = prevID
case <-l.done: case <-l.done:
return return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment