Commit f9eaf1fc authored by George Knee's avatar George Knee Committed by GitHub

op-batcher: extract state pruning, block fetching and progress checking into a...

op-batcher: extract state pruning, block fetching and progress checking into a single pure function (#13060)

* remove lastStoredBlock and lastL1Tip from BatchSubmitter state

We can use the channelManager's state to infer lastStoredBlock. And lastL1Tip is actually unused.

* change log line wording

* fix typo

* remove unecessary method

* WIP first pass at computeSyncActions

* computeSyncAction takes a ChannelStatuser interface

also report fully inclusive range of blocks to load

* add happy path test case

* clearState is a pointer

we can use nil value to signal no state clearing should be performed

* add more test cases

* add another test case

* computeSyncActions only takes prevCurrentL1, not prevSyncStatus

* add batcher restart case

* safe chain reorg case

* failed to make progress case

* simplify log messages, print entire struct

* add godoc

* wire up computeSyncActions

* cache prevCurrentL1 on BatchSubmitter

* document stages

* fix loadBlocksIntoState range interpretation

* pass syncStatus, not pointer to syncStatus and add test case for no progress

* check unsafe status before trying to get more blocks

* do not panic on invalid block ranges

return an error instead. This error is ultimated swallowed, matching existing behaviour.

* test: add assetions and mock data about blockID passed to clearState

* add readme section on max channel duration

* add back unit tests for pruning methods

* fix pruneBlocks behaviour when blockCursor pointed at block which is now pruned

* rename waitForNodeSync to sequencerOutOfSync

* Introduce SeqOutOfSyncError

* move SyncActions code to a separate file

* ChannelStatuser -> channelStatuser

* SeqOutOfSyncError -> ErrSeqOutOfSync

* move ctx to first position in fn signature

* do not update cached prevCurrentL1 value if there is an ErrSeqOutOfSync

* Always warn log when computeSyncActions returns an error

* move sync actions test to separate file

* computeSyncActions returns a bool, not an error

There is only ever one kind of error returned

* SyncActions -> syncActions

* define local variables to aid readability

* organise computeSyncActions and introduce startAfresh syncAction

Add comments explaining logical flow: the checks get increasingly deep and we return early where possible.

* undo changes to submodule

* move test utils to sync_actions_test.go file

* ensure pruneChannels clears currentChannel when appropriate

* fix submodule"

* don't try to get number of block if none exists

* improve log

* Update op-batcher/batcher/driver.go
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>

* use struct for block range, not array

* use startAfresh in one more place

* add test case for multiple channels

also set HeadL1 to more realistic values (generally ahead of currentL1 due to nonzero confirmation depth)

* print value of *struct  in Stringer

* add test case when there are no blocks in state

* Update op-batcher/batcher/sync_actions.go
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>

* tighten up log messages and test descriptions

---------
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>
parent ad868c53
......@@ -217,3 +217,7 @@ func (c *channel) OldestL2() eth.BlockID {
func (c *channel) Close() {
c.channelBuilder.Close()
}
func (c *channel) MaxInclusionBlock() uint64 {
return c.maxInclusionBlock
}
......@@ -464,78 +464,30 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo
var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")
// pruneSafeBlocks dequeues blocks from the internal blocks queue
// if they have now become safe.
func (s *channelManager) pruneSafeBlocks(newSafeHead eth.L2BlockRef) {
oldestBlock, ok := s.blocks.Peek()
// pruneSafeBlocks dequeues the provided number of blocks from the internal blocks queue
func (s *channelManager) pruneSafeBlocks(num int) {
_, ok := s.blocks.DequeueN(int(num))
if !ok {
// no blocks to prune
return
panic("tried to prune more blocks than available")
}
if newSafeHead.Number+1 == oldestBlock.NumberU64() {
// no blocks to prune
return
}
if newSafeHead.Number+1 < oldestBlock.NumberU64() {
// This could happen if there was an L1 reorg.
// Or if the sequencer restarted.
s.log.Warn("safe head reversed, clearing channel manager state",
"oldestBlock", eth.ToBlockID(oldestBlock),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}
numBlocksToDequeue := newSafeHead.Number + 1 - oldestBlock.NumberU64()
if numBlocksToDequeue > uint64(s.blocks.Len()) {
// This could happen if the batcher restarted.
// The sequencer may have derived the safe chain
// from channels sent by a previous batcher instance.
s.log.Warn("safe head above unsafe head, clearing channel manager state",
"unsafeBlock", eth.ToBlockID(s.blocks[s.blocks.Len()-1]),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}
if s.blocks[numBlocksToDequeue-1].Hash() != newSafeHead.Hash {
s.log.Warn("safe chain reorg, clearing channel manager state",
"existingBlock", eth.ToBlockID(s.blocks[numBlocksToDequeue-1]),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}
// This shouldn't return an error because
// We already checked numBlocksToDequeue <= s.blocks.Len()
_, _ = s.blocks.DequeueN(int(numBlocksToDequeue))
s.blockCursor -= int(numBlocksToDequeue)
s.blockCursor -= int(num)
if s.blockCursor < 0 {
panic("negative blockCursor")
s.blockCursor = 0
}
}
// pruneChannels dequeues channels from the internal channels queue
// if they were built using blocks which are now safe
func (s *channelManager) pruneChannels(newSafeHead eth.L2BlockRef) {
i := 0
for _, ch := range s.channelQueue {
if ch.LatestL2().Number > newSafeHead.Number {
break
// pruneChannels dequeues the provided number of channels from the internal channels queue
func (s *channelManager) pruneChannels(num int) {
clearCurrentChannel := false
for i := 0; i < num; i++ {
if s.channelQueue[i] == s.currentChannel {
clearCurrentChannel = true
}
i++
}
s.channelQueue = s.channelQueue[i:]
s.channelQueue = s.channelQueue[num:]
if clearCurrentChannel {
s.currentChannel = nil
}
}
// PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
......
......@@ -463,14 +463,12 @@ func TestChannelManager_handleChannelInvalidated(t *testing.T) {
}
func TestChannelManager_PruneBlocks(t *testing.T) {
l := testlog.Logger(t, log.LevelDebug)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
cfg.InitNoneCompressor()
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, nil, nil, nil)
b := types.NewBlock(&types.Header{ // This will shortly become the safe head
b := types.NewBlock(&types.Header{
Number: big.NewInt(1),
ParentHash: a.Hash(),
}, nil, nil, nil)
......@@ -479,132 +477,157 @@ func TestChannelManager_PruneBlocks(t *testing.T) {
ParentHash: b.Hash(),
}, nil, nil, nil)
require.NoError(t, m.AddL2Block(a))
m.blockCursor += 1
require.NoError(t, m.AddL2Block(b))
m.blockCursor += 1
require.NoError(t, m.AddL2Block(c))
m.blockCursor += 1
// Normal path
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: b.Hash(),
Number: b.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks)
// Safe chain didn't move, nothing to prune
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: b.Hash(),
Number: b.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks)
// Safe chain moved beyond the blocks we had
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: c.Hash(),
Number: uint64(99),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)
// No blocks to prune, NOOP
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: c.Hash(),
Number: c.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)
// Put another block in
d := types.NewBlock(&types.Header{
Number: big.NewInt(3),
ParentHash: c.Hash(),
}, nil, nil, nil)
require.NoError(t, m.AddL2Block(d))
m.blockCursor += 1
// Safe chain reorg
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: a.Hash(),
Number: uint64(3),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)
// Put another block in
require.NoError(t, m.AddL2Block(d))
m.blockCursor += 1
type testCase struct {
name string
initialQ queue.Queue[*types.Block]
initialBlockCursor int
numChannelsToPrune int
expectedQ queue.Queue[*types.Block]
expectedBlockCursor int
}
// Safe chain reversed
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: a.Hash(), // unused
Number: uint64(1),
for _, tc := range []testCase{
{
name: "[A,B,C]*+1->[B,C]*", // * denotes the cursor
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 2,
},
{
name: "[A,B,C*]+1->[B,C*]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 2,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 1,
},
{
name: "[A,B,C]*+2->[C]*",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 2,
expectedQ: queue.Queue[*types.Block]{c},
expectedBlockCursor: 1,
},
{
name: "[A,B,C*]+2->[C*]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 2,
numChannelsToPrune: 2,
expectedQ: queue.Queue[*types.Block]{c},
expectedBlockCursor: 0,
},
{
name: "[A*,B,C]+1->[B*,C]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 0,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 0,
},
{
name: "[A,B,C]+3->[]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 3,
expectedQ: queue.Queue[*types.Block]{},
expectedBlockCursor: 0,
},
{
name: "[A,B,C]*+4->panic",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 4,
expectedQ: nil, // declare that the prune method should panic
expectedBlockCursor: 0,
},
} {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.blocks = tc.initialQ
m.blockCursor = tc.initialBlockCursor
if tc.expectedQ != nil {
m.pruneSafeBlocks(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.blocks)
} else {
require.Panics(t, func() { m.pruneSafeBlocks(tc.numChannelsToPrune) })
}
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)
}
}
func TestChannelManager_PruneChannels(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
cfg.InitNoneCompressor()
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
B, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
C, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
m.channelQueue = []*channel{A, B, C}
numTx := 1
rng := rand.New(rand.NewSource(123))
a0 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
a0 = a0.WithSeal(&types.Header{Number: big.NewInt(0)})
a1 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
a1 = a1.WithSeal(&types.Header{Number: big.NewInt(1)})
b2 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b2 = b2.WithSeal(&types.Header{Number: big.NewInt(2)})
b3 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b3 = b3.WithSeal(&types.Header{Number: big.NewInt(3)})
c4 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
c4 = c4.WithSeal(&types.Header{Number: big.NewInt(4)})
_, err = A.AddBlock(a0)
require.NoError(t, err)
_, err = A.AddBlock(a1)
A, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)
_, err = B.AddBlock(b2)
require.NoError(t, err)
_, err = B.AddBlock(b3)
B, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)
_, err = C.AddBlock(c4)
C, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)
m.pruneChannels(eth.L2BlockRef{
Number: uint64(3),
})
require.Equal(t, []*channel{C}, m.channelQueue)
m.pruneChannels(eth.L2BlockRef{
Number: uint64(4),
})
require.Equal(t, []*channel{}, m.channelQueue)
type testCase struct {
name string
initialQ []*channel
initialCurrentChannel *channel
numChannelsToPrune int
expectedQ []*channel
expectedCurrentChannel *channel
}
m.pruneChannels(eth.L2BlockRef{
Number: uint64(4),
for _, tc := range []testCase{
{
name: "[A,B,C]+1->[B,C]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 1,
expectedQ: []*channel{B, C},
},
{
name: "[A,B,C]+3->[] + currentChannel=C",
initialQ: []*channel{A, B, C},
initialCurrentChannel: C,
numChannelsToPrune: 3,
expectedQ: []*channel{},
expectedCurrentChannel: nil,
},
{
name: "[A,B,C]+2->[C]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 2,
expectedQ: []*channel{C},
},
{
name: "[A,B,C]+3->[]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 3,
expectedQ: []*channel{},
},
{
name: "[A,B,C]+4->panic",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 4,
expectedQ: nil, // declare that the prune method should panic
},
} {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.channelQueue = tc.initialQ
m.currentChannel = tc.initialCurrentChannel
if tc.expectedQ != nil {
m.pruneChannels(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.channelQueue)
require.Equal(t, tc.expectedCurrentChannel, m.currentChannel)
} else {
require.Panics(t, func() { m.pruneChannels(tc.numChannelsToPrune) })
}
})
require.Equal(t, []*channel{}, m.channelQueue)
}
}
func TestChannelManager_ChannelOutFactory(t *testing.T) {
type ChannelOutWrapper struct {
derive.ChannelOut
......
......@@ -115,6 +115,7 @@ type BatchSubmitter struct {
txpoolBlockedBlob bool
state *channelManager
prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus
}
// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
......@@ -241,28 +242,15 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error {
return nil
}
// loadBlocksIntoState loads all blocks since the previous stored block
// It does the following:
// 1. Fetch the sync status of the sequencer
// 2. Check if the sync status is valid or if we are all the way up to date
// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?)
// 4. Load all new blocks into the local state.
// 5. Dequeue blocks from local state which are now safe.
//
// If there is a reorg, it will reset the last stored block but not clear the internal state so
// the state can be flushed to L1.
func (l *BatchSubmitter) loadBlocksIntoState(syncStatus eth.SyncStatus, ctx context.Context) error {
start, end, err := l.calculateL2BlockRangeToStore(syncStatus)
if err != nil {
l.Log.Warn("Error calculating L2 block range", "err", err)
return err
} else if start.Number >= end.Number {
return errors.New("start number is >= end number")
// loadBlocksIntoState loads the blocks between start and end (inclusive).
// If there is a reorg, it will return an error.
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context, start, end uint64) error {
if end <= start {
return fmt.Errorf("start number is >= end number %d,%d", start, end)
}
var latestBlock *types.Block
// Add all blocks to "state"
for i := start.Number + 1; i < end.Number+1; i++ {
for i := start; i <= end; i++ {
block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.Log.Warn("Found L2 reorg", "block_number", i)
......@@ -358,34 +346,6 @@ func (l *BatchSubmitter) getSyncStatus(ctx context.Context) (*eth.SyncStatus, er
return syncStatus, nil
}
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus) (eth.BlockID, eth.BlockID, error) {
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status")
}
// Check if we should even attempt to load any blocks. TODO: May not need this check
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("L2 safe head(%d) >= L2 unsafe head(%d)", syncStatus.SafeL2.Number, syncStatus.UnsafeL2.Number)
}
lastStoredBlock := l.state.LastStoredBlock()
start := lastStoredBlock
end := syncStatus.UnsafeL2.ID()
// Check last stored block to see if it is empty or has lagged behind.
// It lagging implies that the op-node processed some batches that
// were submitted prior to the current instance of the batcher being alive.
if lastStoredBlock == (eth.BlockID{}) {
l.Log.Info("Resuming batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
start = syncStatus.SafeL2.ID()
} else if lastStoredBlock.Number < syncStatus.SafeL2.Number {
l.Log.Warn("Last stored block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", lastStoredBlock, "safe", syncStatus.SafeL2)
start = syncStatus.SafeL2.ID()
}
return start, end, nil
}
// The following things occur:
// New L2 block (reorg or not)
// L1 transaction is confirmed
......@@ -464,21 +424,35 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR
continue
}
l.state.pruneSafeBlocks(syncStatus.SafeL2)
l.state.pruneChannels(syncStatus.SafeL2)
// Decide appropriate actions
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.state.blocks, l.state.channelQueue, l.Log)
err = l.state.CheckExpectedProgress(*syncStatus)
if err != nil {
l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
if outOfSync {
// If the sequencer is out of sync
// do nothing and wait to see if it has
// got in sync on the next tick.
l.Log.Warn("Sequencer is out of sync, retrying next tick.")
continue
}
if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) {
l.prevCurrentL1 = syncStatus.CurrentL1
// Manage existing state / garbage collection
if syncActions.clearState != nil {
l.state.Clear(*syncActions.clearState)
} else {
l.state.pruneSafeBlocks(syncActions.blocksToPrune)
l.state.pruneChannels(syncActions.channelsToPrune)
}
if syncActions.blocksToLoad != nil {
// Get fresh unsafe blocks
if err := l.loadBlocksIntoState(l.shutdownCtx, syncActions.blocksToLoad.start, syncActions.blocksToLoad.end); errors.Is(err, ErrReorg) {
l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
continue
}
}
l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-ctx.Done():
......
package batcher
import (
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type channelStatuser interface {
isFullySubmitted() bool
isTimedOut() bool
LatestL2() eth.BlockID
MaxInclusionBlock() uint64
}
type inclusiveBlockRange struct{ start, end uint64 }
type syncActions struct {
clearState *eth.BlockID
blocksToPrune int
channelsToPrune int
blocksToLoad *inclusiveBlockRange // the blocks that should be loaded into the local state.
// NOTE this range is inclusive on both ends, which is a change to previous behaviour.
}
func (s syncActions) String() string {
return fmt.Sprintf(
"SyncActions{blocksToPrune: %d, channelsToPrune: %d, clearState: %v, blocksToLoad: %v}", s.blocksToPrune, s.channelsToPrune, s.clearState, s.blocksToLoad)
}
// computeSyncActions determines the actions that should be taken based on the inputs provided. The inputs are the current
// state of the batcher (blocks and channels), the new sync status, and the previous current L1 block. The actions are returned
// in a struct specifying the number of blocks to prune, the number of channels to prune, whether to wait for node sync, the block
// range to load into the local state, and whether to clear the state entirely. Returns an boolean indicating if the sequencer is out of sync.
func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, l log.Logger) (syncActions, bool) {
// PART 1: Initial checks on the sync status
if newSyncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.Warn("empty sync status")
return syncActions{}, true
}
if newSyncStatus.CurrentL1.Number < prevCurrentL1.Number {
// This can happen when the sequencer restarts
l.Warn("sequencer currentL1 reversed")
return syncActions{}, true
}
// PART 2: checks involving only the oldest block in the state
oldestBlockInState, hasBlocks := blocks.Peek()
oldestUnsafeBlockNum := newSyncStatus.SafeL2.Number + 1
youngestUnsafeBlockNum := newSyncStatus.UnsafeL2.Number
if !hasBlocks {
s := syncActions{
blocksToLoad: &inclusiveBlockRange{oldestUnsafeBlockNum, youngestUnsafeBlockNum},
}
l.Info("no blocks in state", "syncActions", s)
return s, false
}
// These actions apply in multiple unhappy scenarios below, where
// we detect that the existing state is invalidated
// and we need to start over from the sequencer's oldest
// unsafe (and not safe) block.
startAfresh := syncActions{
clearState: &newSyncStatus.SafeL2.L1Origin,
blocksToLoad: &inclusiveBlockRange{oldestUnsafeBlockNum, youngestUnsafeBlockNum},
}
oldestBlockInStateNum := oldestBlockInState.NumberU64()
if oldestUnsafeBlockNum < oldestBlockInStateNum {
l.Warn("oldest unsafe block is below oldest block in state", "syncActions", startAfresh, "oldestBlockInState", oldestBlockInState, "newSafeBlock", newSyncStatus.SafeL2)
return startAfresh, false
}
// PART 3: checks involving all blocks in state
newestBlockInState := blocks[blocks.Len()-1]
newestBlockInStateNum := newestBlockInState.NumberU64()
numBlocksToDequeue := oldestUnsafeBlockNum - oldestBlockInStateNum
if numBlocksToDequeue > uint64(blocks.Len()) {
// This could happen if the batcher restarted.
// The sequencer may have derived the safe chain
// from channels sent by a previous batcher instance.
l.Warn("oldest unsafe block above newest block in state, clearing channel manager state",
"oldestUnsafeBlockNum", oldestUnsafeBlockNum,
"newestBlockInState", eth.ToBlockID(newestBlockInState),
"syncActions",
startAfresh)
return startAfresh, false
}
if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != newSyncStatus.SafeL2.Hash {
l.Warn("safe chain reorg, clearing channel manager state",
"existingBlock", eth.ToBlockID(blocks[numBlocksToDequeue-1]),
"newSafeBlock", newSyncStatus.SafeL2,
"syncActions", startAfresh)
return startAfresh, false
}
// PART 4: checks involving channels
for _, ch := range channels {
if ch.isFullySubmitted() &&
!ch.isTimedOut() &&
newSyncStatus.CurrentL1.Number > ch.MaxInclusionBlock() &&
newSyncStatus.SafeL2.Number < ch.LatestL2().Number {
// Safe head did not make the expected progress
// for a fully submitted channel. This indicates
// that the derivation pipeline may have stalled
// e.g. because of Holocene strict ordering rules.
l.Warn("sequencer did not make expected progress",
"existingBlock", eth.ToBlockID(blocks[numBlocksToDequeue-1]),
"newSafeBlock", newSyncStatus.SafeL2,
"syncActions", startAfresh)
return startAfresh, false
}
}
// PART 5: happy path
numChannelsToPrune := 0
for _, ch := range channels {
if ch.LatestL2().Number > newSyncStatus.SafeL2.Number {
// If the channel has blocks which are not yet safe
// we do not want to prune it.
break
}
numChannelsToPrune++
}
start := newestBlockInStateNum + 1
end := youngestUnsafeBlockNum
return syncActions{
blocksToPrune: int(numBlocksToDequeue),
channelsToPrune: numChannelsToPrune,
blocksToLoad: &inclusiveBlockRange{start, end},
}, false
}
package batcher
import (
"math/big"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type testChannelStatuser struct {
latestL2 eth.BlockID
inclusionBlock uint64
fullySubmitted, timedOut bool
}
func (tcs testChannelStatuser) LatestL2() eth.BlockID {
return tcs.latestL2
}
func (tcs testChannelStatuser) MaxInclusionBlock() uint64 {
return tcs.inclusionBlock
}
func (tcs testChannelStatuser) isFullySubmitted() bool {
return tcs.fullySubmitted
}
func (tcs testChannelStatuser) isTimedOut() bool {
return tcs.timedOut
}
func TestBatchSubmitter_computeSyncActions(t *testing.T) {
block101 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(101)})
block102 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(102)})
block103 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(103)})
channel103 := testChannelStatuser{
latestL2: eth.ToBlockID(block103),
inclusionBlock: 1,
fullySubmitted: true,
timedOut: false,
}
block104 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(104)})
channel104 := testChannelStatuser{
latestL2: eth.ToBlockID(block104),
inclusionBlock: 1,
fullySubmitted: false,
timedOut: false,
}
type TestCase struct {
name string
// inputs
newSyncStatus eth.SyncStatus
prevCurrentL1 eth.L1BlockRef
blocks queue.Queue[*types.Block]
channels []channelStatuser
// expectations
expected syncActions
expectedSeqOutOfSync bool
expectedLogs []string
}
testCases := []TestCase{
{name: "empty sync status",
// This can happen when the sequencer recovers from a reorg
newSyncStatus: eth.SyncStatus{},
expected: syncActions{},
expectedSeqOutOfSync: true,
expectedLogs: []string{"empty sync status"},
},
{name: "current l1 reversed",
// This can happen when the sequencer restarts or is switched
// to a backup sequencer:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 2},
CurrentL1: eth.BlockRef{Number: 1},
},
prevCurrentL1: eth.BlockRef{Number: 2},
expected: syncActions{},
expectedSeqOutOfSync: true,
expectedLogs: []string{"sequencer currentL1 reversed"},
},
{name: "gap between safe chain and state",
// This can happen if there is an L1 reorg:
// although the sequencer has derived up the same
// L1 block height, it derived fewer safe L2 blocks.
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 1},
SafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block102, block103}, // note absence of block101
channels: []channelStatuser{channel103},
expected: syncActions{
clearState: &eth.BlockID{Number: 1},
blocksToLoad: &inclusiveBlockRange{101, 109},
},
expectedLogs: []string{"oldest unsafe block is below oldest block in state"},
},
{name: "unexpectedly good progress",
// This can happen if another batcher instance got some blocks
// included in the safe chain:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 6},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
channels: []channelStatuser{channel103},
expected: syncActions{
clearState: &eth.BlockID{Number: 1},
blocksToLoad: &inclusiveBlockRange{105, 109},
},
expectedLogs: []string{"oldest unsafe block above newest block in state"},
},
{name: "safe chain reorg",
// This can happen if there is an L1 reorg, the safe chain is at an acceptable
// height but it does not descend from the blocks in state:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
channels: []channelStatuser{channel103},
expected: syncActions{
clearState: &eth.BlockID{Number: 1},
blocksToLoad: &inclusiveBlockRange{104, 109},
},
expectedLogs: []string{"safe chain reorg"},
},
{name: "failed to make expected progress",
// This could happen if the batcher unexpectedly violates the
// Holocene derivation rules:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 3},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
channels: []channelStatuser{channel103},
expected: syncActions{
clearState: &eth.BlockID{Number: 1},
blocksToLoad: &inclusiveBlockRange{102, 109},
},
expectedLogs: []string{"sequencer did not make expected progress"},
},
{name: "no progress",
// This can happen if we have a long channel duration
// and we didn't submit or have any txs confirmed since
// the last sync.
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 4},
CurrentL1: eth.BlockRef{Number: 1},
SafeL2: eth.L2BlockRef{Number: 100},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
channels: []channelStatuser{channel103},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{104, 109},
},
},
{name: "no blocks",
// This happens when the batcher is starting up for the first time
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{},
channels: []channelStatuser{},
expected: syncActions{
blocksToLoad: &inclusiveBlockRange{104, 109},
},
},
{name: "happy path",
// This happens when the safe chain is being progressed as expected:
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103},
channels: []channelStatuser{channel103},
expected: syncActions{
blocksToPrune: 3,
channelsToPrune: 1,
blocksToLoad: &inclusiveBlockRange{104, 109},
},
},
{name: "happy path + multiple channels",
newSyncStatus: eth.SyncStatus{
HeadL1: eth.BlockRef{Number: 5},
CurrentL1: eth.BlockRef{Number: 2},
SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()},
UnsafeL2: eth.L2BlockRef{Number: 109},
},
prevCurrentL1: eth.BlockRef{Number: 1},
blocks: queue.Queue[*types.Block]{block101, block102, block103, block104},
channels: []channelStatuser{channel103, channel104},
expected: syncActions{
blocksToPrune: 3,
channelsToPrune: 1,
blocksToLoad: &inclusiveBlockRange{105, 109},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
l, h := testlog.CaptureLogger(t, log.LevelDebug)
result, outOfSync := computeSyncActions(
tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l,
)
require.Equal(t, tc.expected, result)
require.Equal(t, tc.expectedSeqOutOfSync, outOfSync)
for _, e := range tc.expectedLogs {
r := h.FindLog(testlog.NewMessageContainsFilter(e))
require.NotNil(t, r, "could not find log message containing '%s'", e)
}
})
}
}
......@@ -32,11 +32,14 @@ The philosophy behind the current architecture is:
### Happy path
In the happy path, the batcher periodically:
0. Queries the sequencer's syncStatus and
a. (optionally) waits for it to ingest more L1 data before taking action
b. prunes blocks and channels from its internal state which are no longer required
1. Enqueues unsafe blocks and dequeues safe blocks from the sequencer to its internal state.
2. Enqueues a new channel, if necessary.
3. Processes some unprocessed blocks into the current channel, triggers the compression of the block data and the creation of frames.
4. Sends frames from the channel queue to the DA layer as (e.g. to Ethereum L1 as calldata or blob transactions).
5. If there is more transaction data to send, go to 2. Else wait for a tick and go to 1.
5. If there is more transaction data to send, go to 2. Else wait for a tick and go to 0.
The `blockCursor` state variable tracks the next unprocessed block.
......@@ -57,7 +60,6 @@ At the current time, the batcher should be optimized for correctness, simplicity
The batcher can almost always recover from unforeseen situations by being restarted.
Some complexity is permitted, however, for handling data availability switching, so that the batcher is not wasting money for longer periods of time.
### Data Availability Backlog
......@@ -79,6 +81,9 @@ transaction. But in the case of a DA backlog (as defined by OP_BATCHER_THROTTLE_
block builder to instead impose a (tighter) block level limit of OP_BATCHER_THROTTLE_BLOCK_SIZE, and a single
transaction limit of OP_BATCHER_THROTTLE_TRANSACTION_SIZE.
### Max Channel Duration
The batcher tries to ensure that batches are posted at a minimum frequency specified by `MAX_CHANNEL_DURATION`. To achiveve this, it caches the l1 origin of the last submitted channel, and will force close a channel if the timestamp of the l1 head moves beyond the timestamp of that l1 origin plus `MAX_CHANNEL_DURATION`. When clearing its state, e.g. following the detection of a reorg, the batcher will not clear the cached l1 origin: this way, the regular posting of batches will not be disturbed by events like reorgs.
## Known issues and future work
Link to [open issues with the `op-batcher` tag](https://github.com/ethereum-optimism/optimism/issues?q=is%3Aopen+is%3Aissue+label%3AA-op-batcher).
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment