Commit 8ee03877 authored by George Knee's avatar George Knee Committed by GitHub

batcher: fix state inconsistency (#12981)

* remove lastStoredBlock and lastL1Tip from BatchSubmitter state

We can use the channelManager's state to infer lastStoredBlock. And lastL1Tip is actually unused.

* change log line wording

* fix typo

* remove unecessary method

* clean up driver.calculateL2BlockRangeToStore

* some typos

* tiny tweak
parent 451c52b0
......@@ -480,6 +480,7 @@ func (s *channelManager) pruneSafeBlocks(newSafeHead eth.L2BlockRef) {
if newSafeHead.Number+1 < oldestBlock.NumberU64() {
// This could happen if there was an L1 reorg.
// Or if the sequencer restarted.
s.log.Warn("safe head reversed, clearing channel manager state",
"oldestBlock", eth.ToBlockID(oldestBlock),
"newSafeBlock", newSafeHead)
......@@ -565,3 +566,10 @@ func (m *channelManager) CheckExpectedProgress(syncStatus eth.SyncStatus) error
}
return nil
}
func (m *channelManager) LastStoredBlock() eth.BlockID {
if m.blocks.Len() == 0 {
return eth.BlockID{}
}
return eth.ToBlockID(m.blocks[m.blocks.Len()-1])
}
......@@ -114,10 +114,6 @@ type BatchSubmitter struct {
txpoolState TxPoolState
txpoolBlockedBlob bool
// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef
state *channelManager
}
......@@ -147,7 +143,6 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
l.clearState(l.shutdownCtx)
l.lastStoredBlock = eth.BlockID{}
if err := l.waitForL2Genesis(); err != nil {
return fmt.Errorf("error waiting for L2 genesis: %w", err)
......@@ -271,13 +266,11 @@ func (l *BatchSubmitter) loadBlocksIntoState(syncStatus eth.SyncStatus, ctx cont
block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.Log.Warn("Found L2 reorg", "block_number", i)
l.lastStoredBlock = eth.BlockID{}
return err
} else if err != nil {
l.Log.Warn("Failed to load block into state", "err", err)
return err
}
l.lastStoredBlock = eth.ToBlockID(block)
latestBlock = block
}
......@@ -366,29 +359,31 @@ func (l *BatchSubmitter) getSyncStatus(ctx context.Context) (*eth.SyncStatus, er
}
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions
// as well as garbage collecting blocks which became safe)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus) (eth.BlockID, eth.BlockID, error) {
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status")
}
// Check last stored to see if it needs to be set on startup OR set if is lagged behind.
// It lagging implies that the op-node processed some batches that were submitted prior to the current instance of the batcher being alive.
if l.lastStoredBlock == (eth.BlockID{}) {
l.Log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
} else if l.lastStoredBlock.Number < syncStatus.SafeL2.Number {
l.Log.Warn("Last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastStoredBlock, "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
}
// Check if we should even attempt to load any blocks. TODO: May not need this check
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("L2 safe head(%d) ahead of L2 unsafe head(%d)", syncStatus.SafeL2.Number, syncStatus.UnsafeL2.Number)
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("L2 safe head(%d) >= L2 unsafe head(%d)", syncStatus.SafeL2.Number, syncStatus.UnsafeL2.Number)
}
lastStoredBlock := l.state.LastStoredBlock()
start := lastStoredBlock
end := syncStatus.UnsafeL2.ID()
// Check last stored block to see if it is empty or has lagged behind.
// It lagging implies that the op-node processed some batches that
// were submitted prior to the current instance of the batcher being alive.
if lastStoredBlock == (eth.BlockID{}) {
l.Log.Info("Resuming batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
start = syncStatus.SafeL2.ID()
} else if lastStoredBlock.Number < syncStatus.SafeL2.Number {
l.Log.Warn("Last stored block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", lastStoredBlock, "safe", syncStatus.SafeL2)
start = syncStatus.SafeL2.ID()
}
return l.lastStoredBlock, syncStatus.UnsafeL2.ID(), nil
return start, end, nil
}
// The following things occur:
......@@ -704,7 +699,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
l.Log.Error("Failed to query L1 tip", "err", err)
return err
}
l.recordL1Tip(l1tip)
l.Metr.RecordLatestL1Block(l1tip)
// Collect next transaction data. This pulls data out of the channel, so we need to make sure
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
......@@ -886,14 +881,6 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
}
}
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
if l.lastL1Tip == l1tip {
return
}
l.lastL1Tip = l1tip
l.Metr.RecordLatestL1Block(l1tip)
}
func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment