Commit 910c9ade authored by George Knee's avatar George Knee Committed by GitHub

batcher: add batchSubmitter.checkExpectedProgress (#12430)

* implement batchSubmitter.checkExpectedProgress

* remove buffer variable

* add warning logs when calling waitNodeSyncAndClearState

* push method down into channel manager and add test

* clarify SyncStatus documentation

* improve TestChannelManager_CheckExpectedProgress

make parameters "tighter" / more realistic and check an extra case
parent 8e0b89c1
......@@ -549,3 +549,19 @@ func (s *channelManager) PendingDABytes() int64 {
}
return int64(f)
}
// CheckExpectedProgress uses the supplied syncStatus to infer
// whether the node providing the status has made the expected
// safe head progress given fully submitted channels held in
// state.
func (m *channelManager) CheckExpectedProgress(syncStatus eth.SyncStatus) error {
for _, ch := range m.channelQueue {
if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured
!ch.isTimedOut() &&
syncStatus.CurrentL1.Number > ch.maxInclusionBlock &&
syncStatus.SafeL2.Number < ch.LatestL2().Number {
return errors.New("safe head did not make expected progress")
}
}
return nil
}
......@@ -627,3 +627,57 @@ func TestChannelManager_ChannelOutFactory(t *testing.T) {
require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co)
}
func TestChannelManager_CheckExpectedProgress(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
cfg.InitNoneCompressor()
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
channelMaxInclusionBlockNumber := uint64(3)
channelLatestSafeBlockNumber := uint64(11)
// Prepare a (dummy) fully submitted channel
// with
// maxInclusionBlock and latest safe block number as above
A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
rng := rand.New(rand.NewSource(123))
a0 := derivetest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID)
a0 = a0.WithSeal(&types.Header{Number: big.NewInt(int64(channelLatestSafeBlockNumber))})
_, err = A.AddBlock(a0)
require.NoError(t, err)
A.maxInclusionBlock = channelMaxInclusionBlockNumber
A.Close()
A.channelBuilder.frames = nil
A.channelBuilder.frameCursor = 0
require.True(t, A.isFullySubmitted())
m.channelQueue = append(m.channelQueue, A)
// The current L1 number implies that
// channel A above should have been derived
// from, so we expect safe head to progress to
// the channelLatestSafeBlockNumber.
// Since the safe head moved to 11, there is no error:
ss := eth.SyncStatus{
CurrentL1: eth.L1BlockRef{Number: channelMaxInclusionBlockNumber + 1},
SafeL2: eth.L2BlockRef{Number: channelLatestSafeBlockNumber},
}
err = m.CheckExpectedProgress(ss)
require.NoError(t, err)
// If the currentL1 is as above but the
// safe head is less than channelLatestSafeBlockNumber,
// the method should return an error:
ss.SafeL2 = eth.L2BlockRef{Number: channelLatestSafeBlockNumber - 1}
err = m.CheckExpectedProgress(ss)
require.Error(t, err)
// If the safe head is still less than channelLatestSafeBlockNumber
// but the currentL1 is _equal_ to the channelMaxInclusionBlockNumber
// there should be no error as that block is still being derived from:
ss.CurrentL1 = eth.L1BlockRef{Number: channelMaxInclusionBlockNumber}
err = m.CheckExpectedProgress(ss)
require.NoError(t, err)
}
......@@ -471,17 +471,20 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR
l.state.pruneSafeBlocks(syncStatus.SafeL2)
l.state.pruneChannels(syncStatus.SafeL2)
err = l.state.CheckExpectedProgress(*syncStatus)
if err != nil {
l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
continue
}
if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) {
// Wait for any in flight transactions
// to be ingested by the node before
// we start loading blocks again.
err := l.waitNodeSync()
if err != nil {
l.Log.Warn("error waiting for node sync", "err", err)
}
l.clearState(l.shutdownCtx)
l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
continue
}
l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-ctx.Done():
l.Log.Warn("main loop returning")
......@@ -579,6 +582,17 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
}
}
func (l *BatchSubmitter) waitNodeSyncAndClearState() {
// Wait for any in flight transactions
// to be ingested by the node before
// we start loading blocks again.
err := l.waitNodeSync()
if err != nil {
l.Log.Warn("error waiting for node sync", "err", err)
}
l.clearState(l.shutdownCtx)
}
// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
......
......@@ -5,7 +5,7 @@ package eth
type SyncStatus struct {
// CurrentL1 is the L1 block that the derivation process is last idled at.
// This may not be fully derived into L2 data yet.
// The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block.
// The safe L2 blocks were produced/included fully from the L1 chain up to _but excluding_ this L1 block.
// If the node is synced, this matches the HeadL1, minus the verifier confirmation distance.
CurrentL1 L1BlockRef `json:"current_l1"`
// CurrentL1Finalized is a legacy sync-status attribute. This is deprecated.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment