Commit 58089858 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-node/rollup/derive: Implement Holocene Batch Stage (#12417)

parent f6ca2362
...@@ -27,7 +27,12 @@ import ( ...@@ -27,7 +27,12 @@ import (
// It is internally responsible for making sure that batches with L1 inclusions block outside it's // It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned. // working range are not considered or pruned.
type ChannelFlusher interface {
FlushChannel()
}
type NextBatchProvider interface { type NextBatchProvider interface {
ChannelFlusher
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (Batch, error) NextBatch(ctx context.Context) (Batch, error)
} }
...@@ -37,9 +42,10 @@ type SafeBlockFetcher interface { ...@@ -37,9 +42,10 @@ type SafeBlockFetcher interface {
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error) PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
} }
// BatchQueue contains a set of batches for every L1 block. // The baseBatchStage is a shared implementation of basic channel stage functionality. It is
// L1 blocks are contiguous and this does not support reorgs. // currently shared between the legacy BatchQueue, which buffers future batches, and the
type BatchQueue struct { // post-Holocene BatchStage, which requires strictly ordered batches.
type baseBatchStage struct {
log log.Logger log log.Logger
config *rollup.Config config *rollup.Config
prev NextBatchProvider prev NextBatchProvider
...@@ -53,18 +59,14 @@ type BatchQueue struct { ...@@ -53,18 +59,14 @@ type BatchQueue struct {
// length of l1Blocks never exceeds SequencerWindowSize // length of l1Blocks never exceeds SequencerWindowSize
l1Blocks []eth.L1BlockRef l1Blocks []eth.L1BlockRef
// batches in order of when we've first seen them
batches []*BatchWithL1InclusionBlock
// nextSpan is cached SingularBatches derived from SpanBatch // nextSpan is cached SingularBatches derived from SpanBatch
nextSpan []*SingularBatch nextSpan []*SingularBatch
l2 SafeBlockFetcher l2 SafeBlockFetcher
} }
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. func newBaseBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) baseBatchStage {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue { return baseBatchStage{
return &BatchQueue{
log: log, log: log,
config: cfg, config: cfg,
prev: prev, prev: prev,
...@@ -72,80 +74,130 @@ func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l ...@@ -72,80 +74,130 @@ func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l
} }
} }
func (bq *BatchQueue) Origin() eth.L1BlockRef { func (bs *baseBatchStage) base() *baseBatchStage {
return bq.prev.Origin() return bs
}
func (bs *baseBatchStage) Log() log.Logger {
if len(bs.l1Blocks) == 0 {
return bs.log.New("origin", bs.origin.ID())
} else {
return bs.log.New("origin", bs.origin.ID(), "epoch", bs.l1Blocks[0])
}
}
type SingularBatchProvider interface {
ResettableStage
NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error)
}
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
baseBatchStage
// batches in order of when we've first seen them
batches []*BatchWithL1InclusionBlock
}
var _ SingularBatchProvider = (*BatchQueue)(nil)
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
return &BatchQueue{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)}
}
func (bs *baseBatchStage) Origin() eth.L1BlockRef {
return bs.prev.Origin()
} }
// popNextBatch pops the next batch from the current queued up span-batch nextSpan. // popNextBatch pops the next batch from the current queued up span-batch nextSpan.
// The queue must be non-empty, or the function will panic. // The queue must be non-empty, or the function will panic.
func (bq *BatchQueue) popNextBatch(parent eth.L2BlockRef) *SingularBatch { func (bs *baseBatchStage) popNextBatch(parent eth.L2BlockRef) *SingularBatch {
if len(bq.nextSpan) == 0 { if len(bs.nextSpan) == 0 {
panic("popping non-existent span-batch, invalid state") panic("popping non-existent span-batch, invalid state")
} }
nextBatch := bq.nextSpan[0] nextBatch := bs.nextSpan[0]
bq.nextSpan = bq.nextSpan[1:] bs.nextSpan = bs.nextSpan[1:]
// Must set ParentHash before return. we can use parent because the parentCheck is verified in CheckBatch(). // Must set ParentHash before return. we can use parent because the parentCheck is verified in CheckBatch().
nextBatch.ParentHash = parent.Hash nextBatch.ParentHash = parent.Hash
bq.log.Debug("pop next batch from the cached span batch") bs.log.Debug("pop next batch from the cached span batch")
return nextBatch return nextBatch
} }
// NextBatch return next valid batch upon the given safe head. // NextBatch return next valid batch upon the given safe head.
// It also returns the boolean that indicates if the batch is the last block in the batch. // It also returns the boolean that indicates if the batch is the last block in the batch.
func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) { func (bs *baseBatchStage) nextFromSpanBatch(parent eth.L2BlockRef) (*SingularBatch, bool) {
if len(bq.nextSpan) > 0 { if len(bs.nextSpan) > 0 {
// There are cached singular batches derived from the span batch. // There are cached singular batches derived from the span batch.
// Check if the next cached batch matches the given parent block. // Check if the next cached batch matches the given parent block.
if bq.nextSpan[0].Timestamp == parent.Time+bq.config.BlockTime { if bs.nextSpan[0].Timestamp == parent.Time+bs.config.BlockTime {
// Pop first one and return. // Pop first one and return.
nextBatch := bq.popNextBatch(parent) nextBatch := bs.popNextBatch(parent)
// len(bq.nextSpan) == 0 means it's the last batch of the span. // len(bq.nextSpan) == 0 means it's the last batch of the span.
return nextBatch, len(bq.nextSpan) == 0, nil return nextBatch, len(bs.nextSpan) == 0
} else { } else {
// Given parent block does not match the next batch. It means the previously returned batch is invalid. // Given parent block does not match the next batch. It means the previously returned batch is invalid.
// Drop cached batches and find another batch. // Drop cached batches and find another batch.
bq.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bq.nextSpan[0].GetTimestamp()) bs.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bs.nextSpan[0].GetTimestamp())
bq.nextSpan = bq.nextSpan[:0] bs.nextSpan = bs.nextSpan[:0]
} }
} }
return nil, false
}
func (bs *baseBatchStage) updateOrigins(parent eth.L2BlockRef) {
// Note: We use the origin that we will have to determine if it's behind. This is important // Note: We use the origin that we will have to determine if it's behind. This is important
// because it's the future origin that gets saved into the l1Blocks array. // because it's the future origin that gets saved into the l1Blocks array.
// We always update the origin of this stage if it is not the same so after the update code // We always update the origin of this stage if it is not the same so after the update code
// runs, this is consistent. // runs, this is consistent.
originBehind := bq.prev.Origin().Number < parent.L1Origin.Number originBehind := bs.originBehind(parent)
// Advance origin if needed // Advance origin if needed
// Note: The entire pipeline has the same origin // Note: The entire pipeline has the same origin
// We just don't accept batches prior to the L1 origin of the L2 safe head // We just don't accept batches prior to the L1 origin of the L2 safe head
if bq.origin != bq.prev.Origin() { if bs.origin != bs.prev.Origin() {
bq.origin = bq.prev.Origin() bs.origin = bs.prev.Origin()
if !originBehind { if !originBehind {
bq.l1Blocks = append(bq.l1Blocks, bq.origin) bs.l1Blocks = append(bs.l1Blocks, bs.origin)
} else { } else {
// This is to handle the special case of startup. At startup we call Reset & include // This is to handle the special case of startup. At startup we call Reset & include
// the L1 origin. That is the only time where immediately after `Reset` is called // the L1 origin. That is the only time where immediately after `Reset` is called
// originBehind is false. // originBehind is false.
bq.l1Blocks = bq.l1Blocks[:0] bs.l1Blocks = bs.l1Blocks[:0]
} }
bq.log.Info("Advancing bq origin", "origin", bq.origin, "originBehind", originBehind) bs.log.Info("Advancing bq origin", "origin", bs.origin, "originBehind", originBehind)
} }
// If the epoch is advanced, update bq.l1Blocks // If the epoch is advanced, update bq.l1Blocks
// Advancing epoch must be done after the pipeline successfully apply the entire span batch to the chain. // Before Holocene, advancing the epoch must be done after the pipeline successfully applied the entire span batch to the chain.
// Because the span batch can be reverted during processing the batch, then we must preserve existing l1Blocks // This is because the entire span batch can be reverted after finding an invalid batch.
// to verify the epochs of the next candidate batch. // So we must preserve the existing l1Blocks to verify the epochs of the next candidate batch.
if len(bq.l1Blocks) > 0 && parent.L1Origin.Number > bq.l1Blocks[0].Number { if len(bs.l1Blocks) > 0 && parent.L1Origin.Number > bs.l1Blocks[0].Number {
for i, l1Block := range bq.l1Blocks { for i, l1Block := range bs.l1Blocks {
if parent.L1Origin.Number == l1Block.Number { if parent.L1Origin.Number == l1Block.Number {
bq.l1Blocks = bq.l1Blocks[i:] bs.l1Blocks = bs.l1Blocks[i:]
bq.log.Debug("Advancing internal L1 blocks", "next_epoch", bq.l1Blocks[0].ID(), "next_epoch_time", bq.l1Blocks[0].Time) bs.log.Debug("Advancing internal L1 blocks", "next_epoch", bs.l1Blocks[0].ID(), "next_epoch_time", bs.l1Blocks[0].Time)
break break
} }
} }
// If we can't find the origin of parent block, we have to advance bq.origin. // If we can't find the origin of parent block, we have to advance bq.origin.
} }
}
func (bs *baseBatchStage) originBehind(parent eth.L2BlockRef) bool {
return bs.prev.Origin().Number < parent.L1Origin.Number
}
func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) {
// Early return if there are singular batches from a span batch queued up
if batch, last := bq.nextFromSpanBatch(parent); batch != nil {
return batch, last, nil
}
bq.updateOrigins(parent)
originBehind := bq.originBehind(parent)
// Load more data into the batch queue // Load more data into the batch queue
outOfData := false outOfData := false
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
...@@ -206,17 +258,21 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si ...@@ -206,17 +258,21 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
return nextBatch, len(bq.nextSpan) == 0, nil return nextBatch, len(bq.nextSpan) == 0, nil
} }
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error { func (bs *baseBatchStage) reset(base eth.L1BlockRef) {
// Copy over the Origin from the next stage // Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress // It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.origin = base bs.origin = base
bq.batches = []*BatchWithL1InclusionBlock{}
// Include the new origin as an origin to build on // Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later // Note: This is only for the initialization case. During normal resets we will later
// throw out this block. // throw out this block.
bq.l1Blocks = bq.l1Blocks[:0] bs.l1Blocks = bs.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, base) bs.l1Blocks = append(bs.l1Blocks, base)
bq.nextSpan = bq.nextSpan[:0] bs.nextSpan = bs.nextSpan[:0]
}
func (bq *BatchQueue) Reset(_ context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
bq.baseBatchStage.reset(base)
bq.batches = bq.batches[:0]
return io.EOF return io.EOF
} }
...@@ -257,7 +313,6 @@ func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, paren ...@@ -257,7 +313,6 @@ func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, paren
// Find the first-seen batch that matches all validity conditions. // Find the first-seen batch that matches all validity conditions.
// We may not have sufficient information to proceed filtering, and then we stop. // We may not have sufficient information to proceed filtering, and then we stop.
// There may be none: in that case we force-create an empty batch // There may be none: in that case we force-create an empty batch
nextTimestamp := parent.Time + bq.config.BlockTime
var nextBatch *BatchWithL1InclusionBlock var nextBatch *BatchWithL1InclusionBlock
// Go over all batches, in order of inclusion, and find the first batch we can accept. // Go over all batches, in order of inclusion, and find the first batch we can accept.
...@@ -296,33 +351,39 @@ batchLoop: ...@@ -296,33 +351,39 @@ batchLoop:
nextBatch.Batch.LogContext(bq.log).Info("Found next batch") nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
return nextBatch.Batch, nil return nextBatch.Batch, nil
} }
return bq.deriveNextEmptyBatch(ctx, outOfData, parent)
}
// deriveNextEmptyBatch may derive an empty batch if the sequencing window is expired
func (bs *baseBatchStage) deriveNextEmptyBatch(ctx context.Context, outOfData bool, parent eth.L2BlockRef) (*SingularBatch, error) {
epoch := bs.l1Blocks[0]
// If the current epoch is too old compared to the L1 block we are at, // If the current epoch is too old compared to the L1 block we are at,
// i.e. if the sequence window expired, we create empty batches for the current epoch // i.e. if the sequence window expired, we create empty batches for the current epoch
expiryEpoch := epoch.Number + bq.config.SeqWindowSize expiryEpoch := epoch.Number + bs.config.SeqWindowSize
forceEmptyBatches := (expiryEpoch == bq.origin.Number && outOfData) || expiryEpoch < bq.origin.Number forceEmptyBatches := (expiryEpoch == bs.origin.Number && outOfData) || expiryEpoch < bs.origin.Number
firstOfEpoch := epoch.Number == parent.L1Origin.Number+1 firstOfEpoch := epoch.Number == parent.L1Origin.Number+1
nextTimestamp := parent.Time + bs.config.BlockTime
bq.log.Trace("Potentially generating an empty batch", bs.log.Trace("Potentially generating an empty batch",
"expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp, "expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp,
"epoch_time", epoch.Time, "len_l1_blocks", len(bq.l1Blocks), "firstOfEpoch", firstOfEpoch) "epoch_time", epoch.Time, "len_l1_blocks", len(bs.l1Blocks), "firstOfEpoch", firstOfEpoch)
if !forceEmptyBatches { if !forceEmptyBatches {
// sequence window did not expire yet, still room to receive batches for the current epoch, // sequence window did not expire yet, still room to receive batches for the current epoch,
// no need to force-create empty batch(es) towards the next epoch yet. // no need to force-create empty batch(es) towards the next epoch yet.
return nil, io.EOF return nil, io.EOF
} }
if len(bq.l1Blocks) < 2 { if len(bs.l1Blocks) < 2 {
// need next L1 block to proceed towards // need next L1 block to proceed towards
return nil, io.EOF return nil, io.EOF
} }
nextEpoch := bq.l1Blocks[1] nextEpoch := bs.l1Blocks[1]
// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin, // Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a // to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a
// batch to ensure that we at least have one batch per epoch. // batch to ensure that we at least have one batch per epoch.
if nextTimestamp < nextEpoch.Time || firstOfEpoch { if nextTimestamp < nextEpoch.Time || firstOfEpoch {
bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp) bs.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
return &SingularBatch{ return &SingularBatch{
ParentHash: parent.Hash, ParentHash: parent.Hash,
EpochNum: rollup.Epoch(epoch.Number), EpochNum: rollup.Epoch(epoch.Number),
...@@ -334,7 +395,9 @@ batchLoop: ...@@ -334,7 +395,9 @@ batchLoop:
// At this point we have auto generated every batch for the current epoch // At this point we have auto generated every batch for the current epoch
// that we can, so we can advance to the next epoch. // that we can, so we can advance to the next epoch.
bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time) // TODO(12444): Instead of manually advancing the epoch here, it may be better to generate a
bq.l1Blocks = bq.l1Blocks[1:] // batch for the next epoch, so that updateOrigins then properly advances the origin.
bs.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time)
bs.l1Blocks = bs.l1Blocks[1:]
return nil, io.EOF return nil, io.EOF
} }
...@@ -32,6 +32,12 @@ func (f *fakeBatchQueueInput) Origin() eth.L1BlockRef { ...@@ -32,6 +32,12 @@ func (f *fakeBatchQueueInput) Origin() eth.L1BlockRef {
return f.origin return f.origin
} }
func (f *fakeBatchQueueInput) FlushChannel() {
f.batches = nil
f.errors = nil
f.i = 0
}
func (f *fakeBatchQueueInput) NextBatch(ctx context.Context) (Batch, error) { func (f *fakeBatchQueueInput) NextBatch(ctx context.Context) (Batch, error) {
if f.i >= len(f.batches) { if f.i >= len(f.batches) {
return nil, io.EOF return nil, io.EOF
...@@ -141,33 +147,66 @@ func TestBatchQueue(t *testing.T) { ...@@ -141,33 +147,66 @@ func TestBatchQueue(t *testing.T) {
name string name string
f func(t *testing.T, batchType int) f func(t *testing.T, batchType int)
}{ }{
{"BatchQueueNewOrigin", BatchQueueNewOrigin}, {"Missing", testBatchQueue_Missing},
{"BatchQueueEager", BatchQueueEager}, {"Shuffle", testBatchQueue_Shuffle},
{"BatchQueueInvalidInternalAdvance", BatchQueueInvalidInternalAdvance},
{"BatchQueueMissing", BatchQueueMissing},
{"BatchQueueAdvancedEpoch", BatchQueueAdvancedEpoch},
{"BatchQueueShuffle", BatchQueueShuffle},
{"BatchQueueResetOneBlockBeforeOrigin", BatchQueueResetOneBlockBeforeOrigin},
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) { t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, SingularBatchType) test.f(t, SingularBatchType)
}) })
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, SpanBatchType)
})
} }
}
type testableBatchStageFactory func(log.Logger, *rollup.Config, NextBatchProvider, SafeBlockFetcher) testableBatchStage
type testableBatchStage interface {
SingularBatchProvider
base() *baseBatchStage
}
func TestBatchStages(t *testing.T) {
newBatchQueue := func(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) testableBatchStage {
return NewBatchQueue(log, cfg, prev, l2)
}
newBatchStage := func(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) testableBatchStage {
return NewBatchStage(log, cfg, prev, l2)
}
tests := []struct {
name string
f func(*testing.T, int, testableBatchStageFactory)
}{
{"NewOrigin", testBatchStage_NewOrigin},
{"Eager", testBatchStage_Eager},
{"InvalidInternalAdvance", testBatchStage_InvalidInternalAdvance},
{"AdvancedEpoch", testBatchStage_AdvancedEpoch},
{"ResetOneBlockBeforeOrigin", testBatchStage_ResetOneBlockBeforeOrigin},
}
for _, test := range tests { for _, test := range tests {
test := test test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) { t.Run("BatchQueue_"+test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, SpanBatchType) test.f(t, SingularBatchType, newBatchQueue)
})
t.Run("BatchQueue_"+test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, SpanBatchType, newBatchQueue)
})
t.Run("BatchStage_"+test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, SingularBatchType, newBatchStage)
})
t.Run("BatchStage_"+test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, SpanBatchType, newBatchStage)
}) })
} }
} }
// BatchQueueNewOrigin tests that the batch queue properly saves the new origin // testBatchStage_NewOrigin tests that the batch queue properly saves the new origin
// when the safehead's origin is ahead of the pipeline's origin (as is after a reset). // when the safehead's origin is ahead of the pipeline's origin (as is after a reset).
// This issue was fixed in https://github.com/ethereum-optimism/optimism/pull/3694 // This issue was fixed in https://github.com/ethereum-optimism/optimism/pull/3694
func BatchQueueNewOrigin(t *testing.T, batchType int) { func testBatchStage_NewOrigin(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
l1 := L1Chain([]uint64{10, 15, 20, 25}) l1 := L1Chain([]uint64{10, 15, 20, 25})
safeHead := eth.L2BlockRef{ safeHead := eth.L2BlockRef{
...@@ -194,17 +233,18 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) { ...@@ -194,17 +233,18 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) {
origin: l1[0], origin: l1[0],
} }
bq := NewBatchQueue(log, cfg, input, nil) bq := newBatchStage(log, cfg, input, nil)
bqb := bq.base()
_ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{})
require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks)
// Prev Origin: 0; Safehead Origin: 2; Internal Origin: 0 // Prev Origin: 0; Safehead Origin: 2; Internal Origin: 0
// Should return no data but keep the same origin // Should return no data but keep the same origin
data, _, err := bq.NextBatch(context.Background(), safeHead) data, _, err := bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data) require.Nil(t, data)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks)
require.Equal(t, l1[0], bq.origin) require.Equal(t, l1[0], bqb.origin)
// Prev Origin: 1; Safehead Origin: 2; Internal Origin: 0 // Prev Origin: 1; Safehead Origin: 2; Internal Origin: 0
// Should wipe l1blocks + advance internal origin // Should wipe l1blocks + advance internal origin
...@@ -212,8 +252,8 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) { ...@@ -212,8 +252,8 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) {
data, _, err = bq.NextBatch(context.Background(), safeHead) data, _, err = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data) require.Nil(t, data)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
require.Empty(t, bq.l1Blocks) require.Empty(t, bqb.l1Blocks)
require.Equal(t, l1[1], bq.origin) require.Equal(t, l1[1], bqb.origin)
// Prev Origin: 2; Safehead Origin: 2; Internal Origin: 1 // Prev Origin: 2; Safehead Origin: 2; Internal Origin: 1
// Should add to l1Blocks + advance internal origin // Should add to l1Blocks + advance internal origin
...@@ -221,14 +261,14 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) { ...@@ -221,14 +261,14 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) {
data, _, err = bq.NextBatch(context.Background(), safeHead) data, _, err = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data) require.Nil(t, data)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
require.Equal(t, []eth.L1BlockRef{l1[2]}, bq.l1Blocks) require.Equal(t, []eth.L1BlockRef{l1[2]}, bqb.l1Blocks)
require.Equal(t, l1[2], bq.origin) require.Equal(t, l1[2], bqb.origin)
} }
// BatchQueueResetOneBlockBeforeOrigin tests that the batch queue properly // testBatchStage_ResetOneBlockBeforeOrigin tests that the batch queue properly
// prunes the l1Block recorded as part of a reset when the starting origin // prunes the l1Block recorded as part of a reset when the starting origin
// is exactly one block prior to the safe head origin. // is exactly one block prior to the safe head origin.
func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { func testBatchStage_ResetOneBlockBeforeOrigin(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) {
log := testlog.Logger(t, log.LevelTrace) log := testlog.Logger(t, log.LevelTrace)
l1 := L1Chain([]uint64{10, 15, 20, 25}) l1 := L1Chain([]uint64{10, 15, 20, 25})
safeHead := eth.L2BlockRef{ safeHead := eth.L2BlockRef{
...@@ -255,17 +295,18 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { ...@@ -255,17 +295,18 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) {
origin: l1[0], origin: l1[0],
} }
bq := NewBatchQueue(log, cfg, input, nil) bq := newBatchStage(log, cfg, input, nil)
bqb := bq.base()
_ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{})
require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks)
// Prev Origin: 0; Safehead Origin: 1; Internal Origin: 0 // Prev Origin: 0; Safehead Origin: 1; Internal Origin: 0
// Should return no data but keep the same origin // Should return no data but keep the same origin
data, _, err := bq.NextBatch(context.Background(), safeHead) data, _, err := bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data) require.Nil(t, data)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks)
require.Equal(t, l1[0], bq.origin) require.Equal(t, l1[0], bqb.origin)
// Prev Origin: 1; Safehead Origin: 1; Internal Origin: 0 // Prev Origin: 1; Safehead Origin: 1; Internal Origin: 0
// Should record new l1 origin in l1blocks, prune block 0 and advance internal origin // Should record new l1 origin in l1blocks, prune block 0 and advance internal origin
...@@ -273,8 +314,8 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { ...@@ -273,8 +314,8 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) {
data, _, err = bq.NextBatch(context.Background(), safeHead) data, _, err = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data) require.Nil(t, data)
require.Equalf(t, io.EOF, err, "expected io.EOF but got %v", err) require.Equalf(t, io.EOF, err, "expected io.EOF but got %v", err)
require.Equal(t, []eth.L1BlockRef{l1[1]}, bq.l1Blocks) require.Equal(t, []eth.L1BlockRef{l1[1]}, bqb.l1Blocks)
require.Equal(t, l1[1], bq.origin) require.Equal(t, l1[1], bqb.origin)
// Prev Origin: 2; Safehead Origin: 1; Internal Origin: 1 // Prev Origin: 2; Safehead Origin: 1; Internal Origin: 1
// Should add to l1Blocks + advance internal origin // Should add to l1Blocks + advance internal origin
...@@ -282,13 +323,13 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { ...@@ -282,13 +323,13 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) {
data, _, err = bq.NextBatch(context.Background(), safeHead) data, _, err = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, data) require.Nil(t, data)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
require.Equal(t, []eth.L1BlockRef{l1[1], l1[2]}, bq.l1Blocks) require.Equal(t, []eth.L1BlockRef{l1[1], l1[2]}, bqb.l1Blocks)
require.Equal(t, l1[2], bq.origin) require.Equal(t, l1[2], bqb.origin)
} }
// BatchQueueEager adds a bunch of contiguous batches and asserts that // testBatchStage_Eager adds a bunch of contiguous batches and asserts that
// enough calls to `NextBatch` return all of those batches. // enough calls to `NextBatch` return all of those batches.
func BatchQueueEager(t *testing.T, batchType int) { func testBatchStage_Eager(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
l1 := L1Chain([]uint64{10, 20, 30}) l1 := L1Chain([]uint64{10, 20, 30})
chainId := big.NewInt(1234) chainId := big.NewInt(1234)
...@@ -344,7 +385,7 @@ func BatchQueueEager(t *testing.T, batchType int) { ...@@ -344,7 +385,7 @@ func BatchQueueEager(t *testing.T, batchType int) {
origin: l1[0], origin: l1[0],
} }
bq := NewBatchQueue(log, cfg, input, nil) bq := newBatchStage(log, cfg, input, nil)
_ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{})
// Advance the origin // Advance the origin
input.origin = l1[1] input.origin = l1[1]
...@@ -364,11 +405,11 @@ func BatchQueueEager(t *testing.T, batchType int) { ...@@ -364,11 +405,11 @@ func BatchQueueEager(t *testing.T, batchType int) {
} }
} }
// BatchQueueInvalidInternalAdvance asserts that we do not miss an epoch when generating batches. // testBatchStage_InvalidInternalAdvance asserts that we do not miss an epoch when generating batches.
// This is a regression test for CLI-3378. // This is a regression test for CLI-3378.
func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { func testBatchStage_InvalidInternalAdvance(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) {
log := testlog.Logger(t, log.LevelTrace) log := testlog.Logger(t, log.LevelTrace)
l1 := L1Chain([]uint64{10, 15, 20, 25, 30}) l1 := L1Chain([]uint64{5, 10, 15, 20, 25, 30})
chainId := big.NewInt(1234) chainId := big.NewInt(1234)
safeHead := eth.L2BlockRef{ safeHead := eth.L2BlockRef{
Hash: mockHash(10, 2), Hash: mockHash(10, 2),
...@@ -416,17 +457,26 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { ...@@ -416,17 +457,26 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
} }
} }
// prepend a nil batch so we can load the safe head's epoch
input := &fakeBatchQueueInput{ input := &fakeBatchQueueInput{
batches: inputBatches, batches: append([]Batch{nil}, inputBatches...),
errors: inputErrors, errors: append([]error{io.EOF}, inputErrors...),
origin: l1[0], origin: l1[0],
} }
bq := NewBatchQueue(log, cfg, input, nil) bq := newBatchStage(log, cfg, input, nil)
_ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{})
// first load base epoch
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
// then advance to origin 1 with batches
input.origin = l1[1]
// Load continuous batches for epoch 0 // Load continuous batches for epoch 0
for i := 0; i < len(expectedOutputBatches); i++ { for i := 0; i < len(expectedOutputBatches); i++ {
t.Logf("Iteration %d", i)
b, _, e := bq.NextBatch(context.Background(), safeHead) b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, expectedOutputErrors[i]) require.ErrorIs(t, e, expectedOutputErrors[i])
if b == nil { if b == nil {
...@@ -440,14 +490,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { ...@@ -440,14 +490,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
} }
} }
// Advance to origin 1. No forced batches yet. // Advance to origin 2. No forced batches yet.
input.origin = l1[1]
b, _, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
// Advance to origin 2. No forced batches yet because we are still on epoch 0
// & have batches for epoch 0.
input.origin = l1[2] input.origin = l1[2]
b, _, e = bq.NextBatch(context.Background(), safeHead) b, _, e = bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF) require.ErrorIs(t, e, io.EOF)
...@@ -456,7 +499,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { ...@@ -456,7 +499,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
// Advance to origin 3. Should generate one empty batch. // Advance to origin 3. Should generate one empty batch.
input.origin = l1[3] input.origin = l1[3]
b, _, e = bq.NextBatch(context.Background(), safeHead) b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e) require.NoError(t, e)
require.NotNil(t, b) require.NotNil(t, b)
require.Equal(t, safeHead.Time+2, b.Timestamp) require.Equal(t, safeHead.Time+2, b.Timestamp)
require.Equal(t, rollup.Epoch(1), b.EpochNum) require.Equal(t, rollup.Epoch(1), b.EpochNum)
...@@ -471,7 +514,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { ...@@ -471,7 +514,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
// Advance to origin 4. Should generate one empty batch. // Advance to origin 4. Should generate one empty batch.
input.origin = l1[4] input.origin = l1[4]
b, _, e = bq.NextBatch(context.Background(), safeHead) b, _, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e) require.NoError(t, e)
require.NotNil(t, b) require.NotNil(t, b)
require.Equal(t, rollup.Epoch(2), b.EpochNum) require.Equal(t, rollup.Epoch(2), b.EpochNum)
require.Equal(t, safeHead.Time+2, b.Timestamp) require.Equal(t, safeHead.Time+2, b.Timestamp)
...@@ -482,10 +525,9 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { ...@@ -482,10 +525,9 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) {
b, _, e = bq.NextBatch(context.Background(), safeHead) b, _, e = bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF) require.ErrorIs(t, e, io.EOF)
require.Nil(t, b) require.Nil(t, b)
} }
func BatchQueueMissing(t *testing.T, batchType int) { func testBatchQueue_Missing(t *testing.T, batchType int) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
l1 := L1Chain([]uint64{10, 15, 20, 25}) l1 := L1Chain([]uint64{10, 15, 20, 25})
chainId := big.NewInt(1234) chainId := big.NewInt(1234)
...@@ -600,9 +642,9 @@ func BatchQueueMissing(t *testing.T, batchType int) { ...@@ -600,9 +642,9 @@ func BatchQueueMissing(t *testing.T, batchType int) {
require.Equal(t, rollup.Epoch(1), b.EpochNum) require.Equal(t, rollup.Epoch(1), b.EpochNum)
} }
// BatchQueueAdvancedEpoch tests that batch queue derives consecutive valid batches with advancing epochs. // testBatchStage_AdvancedEpoch tests that batch queue derives consecutive valid batches with advancing epochs.
// Batch queue's l1blocks list should be updated along epochs. // Batch queue's l1blocks list should be updated along epochs.
func BatchQueueAdvancedEpoch(t *testing.T, batchType int) { func testBatchStage_AdvancedEpoch(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
l1 := L1Chain([]uint64{0, 6, 12, 18, 24}) // L1 block time: 6s l1 := L1Chain([]uint64{0, 6, 12, 18, 24}) // L1 block time: 6s
chainId := big.NewInt(1234) chainId := big.NewInt(1234)
...@@ -664,7 +706,7 @@ func BatchQueueAdvancedEpoch(t *testing.T, batchType int) { ...@@ -664,7 +706,7 @@ func BatchQueueAdvancedEpoch(t *testing.T, batchType int) {
origin: l1[inputOriginNumber], origin: l1[inputOriginNumber],
} }
bq := NewBatchQueue(log, cfg, input, nil) bq := newBatchStage(log, cfg, input, nil)
_ = bq.Reset(context.Background(), l1[1], eth.SystemConfig{}) _ = bq.Reset(context.Background(), l1[1], eth.SystemConfig{})
for i := 0; i < len(expectedOutputBatches); i++ { for i := 0; i < len(expectedOutputBatches); i++ {
...@@ -688,8 +730,8 @@ func BatchQueueAdvancedEpoch(t *testing.T, batchType int) { ...@@ -688,8 +730,8 @@ func BatchQueueAdvancedEpoch(t *testing.T, batchType int) {
} }
} }
// BatchQueueShuffle tests batch queue can reorder shuffled valid batches // testBatchQueue_Shuffle tests batch queue can reorder shuffled valid batches
func BatchQueueShuffle(t *testing.T, batchType int) { func testBatchQueue_Shuffle(t *testing.T, batchType int) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
l1 := L1Chain([]uint64{0, 6, 12, 18, 24}) // L1 block time: 6s l1 := L1Chain([]uint64{0, 6, 12, 18, 24}) // L1 block time: 6s
chainId := big.NewInt(1234) chainId := big.NewInt(1234)
......
package derive
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
type BatchStage struct {
baseBatchStage
}
func NewBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchStage {
return &BatchStage{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)}
}
func (bs *BatchStage) Reset(_ context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
bs.reset(base)
return io.EOF
}
func (bs *BatchStage) FlushChannel() {
bs.nextSpan = bs.nextSpan[:0]
bs.prev.FlushChannel()
}
func (bs *BatchStage) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) {
// with Holocene, we can always update (and prune) the origins because we don't backwards-invalidate.
bs.updateOrigins(parent)
// If origin behind (or at parent), we drain previous stage(s), and then return.
// Note that a channel from the parent's L1 origin block can only contain past batches, so we
// can just skip them.
// TODO(12444): we may be able to change the definition of originBehind to include equality,
// also for the pre-Holocene BatchQueue. This may also allow us to remove the edge case in
// updateOrigins.
if bs.originBehind(parent) || parent.L1Origin.Number == bs.origin.Number {
if _, err := bs.prev.NextBatch(ctx); err != nil {
// includes io.EOF and NotEnoughData
return nil, false, err
}
// continue draining
return nil, false, NotEnoughData
}
if len(bs.l1Blocks) < 2 {
// This can only happen if derivation erroneously doesn't start at a safe head.
// By now, the L1 origin of the first safe head and the following L1 block must be in the
// l1Blocks.
return nil, false, NewCriticalError(fmt.Errorf(
"unexpected low buffered origin count, origin: %v, parent: %v", bs.origin, parent))
}
// Note: epoch origin can now be one block ahead of the L2 Safe Head
// This is in the case where we auto generate all batches in an epoch & advance the epoch in
// deriveNextEmptyBatch but don't advance the L2 Safe Head's epoch
if epoch := bs.l1Blocks[0]; parent.L1Origin != epoch.ID() && parent.L1Origin.Number != epoch.Number-1 {
return nil, false, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, parent.L1Origin))
}
batch, err := bs.nextSingularBatchCandidate(ctx, parent)
if err == io.EOF {
// We only consider empty batch generation after we've drained all batches from the local
// span batch queue and the previous stage.
empty, err := bs.deriveNextEmptyBatch(ctx, true, parent)
return empty, false, err
} else if err != nil {
return nil, false, err
}
// check candidate validity
validity := checkSingularBatch(bs.config, bs.Log(), bs.l1Blocks, parent, batch, bs.origin)
switch validity {
case BatchAccept: // continue
batch.LogContext(bs.Log()).Debug("Found next singular batch")
return batch, len(bs.nextSpan) == 0, nil
case BatchPast:
batch.LogContext(bs.Log()).Warn("Dropping past singular batch")
// NotEnoughData to read in next batch until we're through all past batches
return nil, false, NotEnoughData
case BatchDrop: // drop, flush, move onto next channel
batch.LogContext(bs.Log()).Warn("Dropping invalid singular batch, flushing channel")
bs.FlushChannel()
// NotEnoughData will cause derivation from previous stages until they're empty, at which
// point empty batch derivation will happen.
return nil, false, NotEnoughData
case BatchUndecided: // l2 fetcher error, try again
batch.LogContext(bs.Log()).Warn("Undecided span batch")
return nil, false, NotEnoughData
case BatchFuture: // panic, can't happen
return nil, false, NewCriticalError(fmt.Errorf("impossible batch validity: %v", validity))
default:
return nil, false, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
}
}
func (bs *BatchStage) nextSingularBatchCandidate(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, error) {
// First check for next span-derived batch
nextBatch, _ := bs.nextFromSpanBatch(parent)
if nextBatch != nil {
return nextBatch, nil
}
// If the next batch is a singular batch, we forward it as the candidate.
// If it is a span batch, we check its validity and then forward its first singular batch.
batch, err := bs.prev.NextBatch(ctx)
if err != nil { // includes io.EOF
return nil, err
}
switch typ := batch.GetBatchType(); typ {
case SingularBatchType:
singularBatch, ok := batch.AsSingularBatch()
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
return singularBatch, nil
case SpanBatchType:
spanBatch, ok := batch.AsSpanBatch()
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
validity, _ := checkSpanBatchPrefix(ctx, bs.config, bs.Log(), bs.l1Blocks, parent, spanBatch, bs.origin, bs.l2)
switch validity {
case BatchAccept: // continue
spanBatch.LogContext(bs.Log()).Info("Found next valid span batch")
case BatchPast:
spanBatch.LogContext(bs.Log()).Warn("Dropping past span batch")
// NotEnoughData to read in next batch until we're through all past batches
return nil, NotEnoughData
case BatchDrop: // drop, try next
spanBatch.LogContext(bs.Log()).Warn("Dropping invalid span batch, flushing channel")
bs.FlushChannel()
return nil, NotEnoughData
case BatchUndecided: // l2 fetcher error, try again
spanBatch.LogContext(bs.Log()).Warn("Undecided span batch")
return nil, NotEnoughData
case BatchFuture: // can't happen with Holocene
return nil, NewCriticalError(errors.New("impossible future batch validity"))
}
// If next batch is SpanBatch, convert it to SingularBatches.
// TODO(12444): maybe create iterator here instead, save to nextSpan
// Need to make sure this doesn't error where the iterator wouldn't,
// otherwise this wouldn't be correctly implementing partial span batch invalidation.
// From what I can tell, it is fine because the only error case is if the l1Blocks are
// missing a block, which would be a logic error. Although, if the node restarts mid-way
// through a span batch and the sync start only goes back one channel timeout from the
// mid-way safe block, it may actually miss l1 blocks! Need to check.
// We could fix this by fast-dropping past batches from the span batch.
singularBatches, err := spanBatch.GetSingularBatches(bs.l1Blocks, parent)
if err != nil {
return nil, NewCriticalError(err)
}
bs.nextSpan = singularBatches
// span-batches are non-empty, so the below pop is safe.
return bs.popNextBatch(parent), nil
default:
return nil, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ))
}
}
...@@ -26,6 +26,9 @@ const ( ...@@ -26,6 +26,9 @@ const (
BatchUndecided BatchUndecided
// BatchFuture indicates that the batch may be valid, but cannot be processed yet and should be checked again later // BatchFuture indicates that the batch may be valid, but cannot be processed yet and should be checked again later
BatchFuture BatchFuture
// BatchPast indicates that the batch is from the past, i.e. its timestamp is smaller or equal
// to the safe head's timestamp.
BatchPast
) )
// CheckBatch checks if the given batch can be applied on top of the given l2SafeHead, given the contextual L1 blocks the batch was included in. // CheckBatch checks if the given batch can be applied on top of the given l2SafeHead, given the contextual L1 blocks the batch was included in.
...@@ -69,11 +72,18 @@ func checkSingularBatch(cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1Blo ...@@ -69,11 +72,18 @@ func checkSingularBatch(cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1Blo
nextTimestamp := l2SafeHead.Time + cfg.BlockTime nextTimestamp := l2SafeHead.Time + cfg.BlockTime
if batch.Timestamp > nextTimestamp { if batch.Timestamp > nextTimestamp {
if cfg.IsHolocene(l1InclusionBlock.Time) {
log.Warn("dropping future batch", "next_timestamp", nextTimestamp)
return BatchDrop
}
log.Trace("received out-of-order batch for future processing after next batch", "next_timestamp", nextTimestamp) log.Trace("received out-of-order batch for future processing after next batch", "next_timestamp", nextTimestamp)
return BatchFuture return BatchFuture
} }
if batch.Timestamp < nextTimestamp { if batch.Timestamp < nextTimestamp {
log.Warn("dropping batch with old timestamp", "min_timestamp", nextTimestamp) log.Warn("dropping past batch with old timestamp", "min_timestamp", nextTimestamp)
if cfg.IsHolocene(l1InclusionBlock.Time) {
return BatchPast
}
return BatchDrop return BatchDrop
} }
...@@ -166,17 +176,19 @@ func checkSingularBatch(cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1Blo ...@@ -166,17 +176,19 @@ func checkSingularBatch(cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1Blo
return BatchAccept return BatchAccept
} }
// checkSpanBatch implements SpanBatch validation rule. // checkSpanBatchPrefix performs the span batch prefix rules for Holocene.
func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, l2SafeHead eth.L2BlockRef, // Next to the validity, it also returns the parent L2 block as determined during the checks for
// further consumption.
func checkSpanBatchPrefix(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, l2SafeHead eth.L2BlockRef,
batch *SpanBatch, l1InclusionBlock eth.L1BlockRef, l2Fetcher SafeBlockFetcher, batch *SpanBatch, l1InclusionBlock eth.L1BlockRef, l2Fetcher SafeBlockFetcher,
) BatchValidity { ) (BatchValidity, eth.L2BlockRef) {
// add details to the log // add details to the log
log = batch.LogContext(log) log = batch.LogContext(log)
// sanity check we have consistent inputs // sanity check we have consistent inputs
if len(l1Blocks) == 0 { if len(l1Blocks) == 0 {
log.Warn("missing L1 block input, cannot proceed with batch checking") log.Warn("missing L1 block input, cannot proceed with batch checking")
return BatchUndecided return BatchUndecided, eth.L2BlockRef{}
} }
epoch := l1Blocks[0] epoch := l1Blocks[0]
...@@ -185,64 +197,70 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B ...@@ -185,64 +197,70 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
if startEpochNum == batchOrigin.Number+1 { if startEpochNum == batchOrigin.Number+1 {
if len(l1Blocks) < 2 { if len(l1Blocks) < 2 {
log.Info("eager batch wants to advance epoch, but could not without more L1 blocks", "current_epoch", epoch.ID()) log.Info("eager batch wants to advance epoch, but could not without more L1 blocks", "current_epoch", epoch.ID())
return BatchUndecided return BatchUndecided, eth.L2BlockRef{}
} }
batchOrigin = l1Blocks[1] batchOrigin = l1Blocks[1]
} }
if !cfg.IsDelta(batchOrigin.Time) { if !cfg.IsDelta(batchOrigin.Time) {
log.Warn("received SpanBatch with L1 origin before Delta hard fork", "l1_origin", batchOrigin.ID(), "l1_origin_time", batchOrigin.Time) log.Warn("received SpanBatch with L1 origin before Delta hard fork", "l1_origin", batchOrigin.ID(), "l1_origin_time", batchOrigin.Time)
return BatchDrop return BatchDrop, eth.L2BlockRef{}
} }
nextTimestamp := l2SafeHead.Time + cfg.BlockTime nextTimestamp := l2SafeHead.Time + cfg.BlockTime
if batch.GetTimestamp() > nextTimestamp { if batch.GetTimestamp() > nextTimestamp {
if cfg.IsHolocene(l1InclusionBlock.Time) {
log.Warn("dropping future span batch", "next_timestamp", nextTimestamp)
return BatchDrop, eth.L2BlockRef{}
}
log.Trace("received out-of-order batch for future processing after next batch", "next_timestamp", nextTimestamp) log.Trace("received out-of-order batch for future processing after next batch", "next_timestamp", nextTimestamp)
return BatchFuture return BatchFuture, eth.L2BlockRef{}
} }
if batch.GetBlockTimestamp(batch.GetBlockCount()-1) < nextTimestamp { if batch.GetBlockTimestamp(batch.GetBlockCount()-1) < nextTimestamp {
log.Warn("span batch has no new blocks after safe head") log.Warn("span batch has no new blocks after safe head")
return BatchDrop if cfg.IsHolocene(l1InclusionBlock.Time) {
return BatchPast, eth.L2BlockRef{}
}
return BatchDrop, eth.L2BlockRef{}
} }
// finding parent block of the span batch. // finding parent block of the span batch.
// if the span batch does not overlap the current safe chain, parentBLock should be l2SafeHead. // if the span batch does not overlap the current safe chain, parentBLock should be l2SafeHead.
parentNum := l2SafeHead.Number
parentBlock := l2SafeHead parentBlock := l2SafeHead
if batch.GetTimestamp() < nextTimestamp { if batch.GetTimestamp() < nextTimestamp {
if batch.GetTimestamp() > l2SafeHead.Time { if batch.GetTimestamp() > l2SafeHead.Time {
// batch timestamp cannot be between safe head and next timestamp // batch timestamp cannot be between safe head and next timestamp
log.Warn("batch has misaligned timestamp, block time is too short") log.Warn("batch has misaligned timestamp, block time is too short")
return BatchDrop return BatchDrop, eth.L2BlockRef{}
} }
if (l2SafeHead.Time-batch.GetTimestamp())%cfg.BlockTime != 0 { if (l2SafeHead.Time-batch.GetTimestamp())%cfg.BlockTime != 0 {
log.Warn("batch has misaligned timestamp, not overlapped exactly") log.Warn("batch has misaligned timestamp, not overlapped exactly")
return BatchDrop return BatchDrop, eth.L2BlockRef{}
} }
parentNum = l2SafeHead.Number - (l2SafeHead.Time-batch.GetTimestamp())/cfg.BlockTime - 1 parentNum := l2SafeHead.Number - (l2SafeHead.Time-batch.GetTimestamp())/cfg.BlockTime - 1
var err error var err error
parentBlock, err = l2Fetcher.L2BlockRefByNumber(ctx, parentNum) parentBlock, err = l2Fetcher.L2BlockRefByNumber(ctx, parentNum)
if err != nil { if err != nil {
log.Warn("failed to fetch L2 block", "number", parentNum, "err", err) log.Warn("failed to fetch L2 block", "number", parentNum, "err", err)
// unable to validate the batch for now. retry later. // unable to validate the batch for now. retry later.
return BatchUndecided return BatchUndecided, eth.L2BlockRef{}
} }
} }
if !batch.CheckParentHash(parentBlock.Hash) { if !batch.CheckParentHash(parentBlock.Hash) {
log.Warn("ignoring batch with mismatching parent hash", "parent_block", parentBlock.Hash) log.Warn("ignoring batch with mismatching parent hash", "parent_block", parentBlock.Hash)
return BatchDrop return BatchDrop, parentBlock
} }
// Filter out batches that were included too late. // Filter out batches that were included too late.
if startEpochNum+cfg.SeqWindowSize < l1InclusionBlock.Number { if startEpochNum+cfg.SeqWindowSize < l1InclusionBlock.Number {
log.Warn("batch was included too late, sequence window expired") log.Warn("batch was included too late, sequence window expired")
return BatchDrop return BatchDrop, parentBlock
} }
// Check the L1 origin of the batch // Check the L1 origin of the batch
if startEpochNum > parentBlock.L1Origin.Number+1 { if startEpochNum > parentBlock.L1Origin.Number+1 {
log.Warn("batch is for future epoch too far ahead, while it has the next timestamp, so it must be invalid", "current_epoch", epoch.ID()) log.Warn("batch is for future epoch too far ahead, while it has the next timestamp, so it must be invalid", "current_epoch", epoch.ID())
return BatchDrop return BatchDrop, parentBlock
} }
endEpochNum := batch.GetBlockEpochNum(batch.GetBlockCount() - 1) endEpochNum := batch.GetBlockEpochNum(batch.GetBlockCount() - 1)
...@@ -252,7 +270,7 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B ...@@ -252,7 +270,7 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
if l1Block.Number == endEpochNum { if l1Block.Number == endEpochNum {
if !batch.CheckOriginHash(l1Block.Hash) { if !batch.CheckOriginHash(l1Block.Hash) {
log.Warn("batch is for different L1 chain, epoch hash does not match", "expected", l1Block.Hash) log.Warn("batch is for different L1 chain, epoch hash does not match", "expected", l1Block.Hash)
return BatchDrop return BatchDrop, parentBlock
} }
originChecked = true originChecked = true
break break
...@@ -260,13 +278,26 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B ...@@ -260,13 +278,26 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
} }
if !originChecked { if !originChecked {
log.Info("need more l1 blocks to check entire origins of span batch") log.Info("need more l1 blocks to check entire origins of span batch")
return BatchUndecided return BatchUndecided, parentBlock
} }
if startEpochNum < parentBlock.L1Origin.Number { if startEpochNum < parentBlock.L1Origin.Number {
log.Warn("dropped batch, epoch is too old", "minimum", parentBlock.ID()) log.Warn("dropped batch, epoch is too old", "minimum", parentBlock.ID())
return BatchDrop return BatchDrop, parentBlock
} }
return BatchAccept, parentBlock
}
// checkSpanBatch performs the full SpanBatch validation rules.
func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, l2SafeHead eth.L2BlockRef,
batch *SpanBatch, l1InclusionBlock eth.L1BlockRef, l2Fetcher SafeBlockFetcher,
) BatchValidity {
prefixValidity, parentBlock := checkSpanBatchPrefix(ctx, cfg, log, l1Blocks, l2SafeHead, batch, l1InclusionBlock, l2Fetcher)
if prefixValidity != BatchAccept {
return prefixValidity
}
startEpochNum := uint64(batch.GetStartEpochNum())
originIdx := 0 originIdx := 0
originAdvanced := startEpochNum == parentBlock.L1Origin.Number+1 originAdvanced := startEpochNum == parentBlock.L1Origin.Number+1
...@@ -334,6 +365,8 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B ...@@ -334,6 +365,8 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B
} }
} }
parentNum := parentBlock.Number
nextTimestamp := l2SafeHead.Time + cfg.BlockTime
// Check overlapped blocks // Check overlapped blocks
if batch.GetTimestamp() < nextTimestamp { if batch.GetTimestamp() < nextTimestamp {
for i := uint64(0); i < l2SafeHead.Number-parentNum; i++ { for i := uint64(0); i < l2SafeHead.Number-parentNum; i++ {
......
...@@ -43,15 +43,16 @@ func deltaAt(t *uint64) func(*rollup.Config) { ...@@ -43,15 +43,16 @@ func deltaAt(t *uint64) func(*rollup.Config) {
func fjordAt(t *uint64) func(*rollup.Config) { func fjordAt(t *uint64) func(*rollup.Config) {
return func(c *rollup.Config) { return func(c *rollup.Config) {
c.DeltaTime = &zero64
c.FjordTime = t c.FjordTime = t
} }
} }
func multiMod[T any](mods ...func(T)) func(T) { func holoceneAt(t *uint64) func(*rollup.Config) {
return func(x T) { return func(c *rollup.Config) {
for _, mod := range mods { c.DeltaTime = &zero64
mod(x) c.FjordTime = &zero64
} c.HoloceneTime = t
} }
} }
...@@ -263,6 +264,23 @@ func TestValidBatch(t *testing.T) { ...@@ -263,6 +264,23 @@ func TestValidBatch(t *testing.T) {
}, },
Expected: BatchFuture, Expected: BatchFuture,
}, },
{
Name: "future timestamp with Holocene at L1 inc",
L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C},
L2SafeHead: l2A0,
Batch: BatchWithL1InclusionBlock{
L1InclusionBlock: l1B,
Batch: &SingularBatch{
ParentHash: l2A1.ParentHash,
EpochNum: rollup.Epoch(l2A1.L1Origin.Number),
EpochHash: l2A1.L1Origin.Hash,
Timestamp: l2A1.Time + 1, // 1 too high
},
},
Expected: BatchDrop,
ExpectedLog: "dropping future batch",
ConfigMod: holoceneAt(&l1B.Time),
},
{ {
Name: "old timestamp", Name: "old timestamp",
L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C},
...@@ -279,6 +297,23 @@ func TestValidBatch(t *testing.T) { ...@@ -279,6 +297,23 @@ func TestValidBatch(t *testing.T) {
}, },
Expected: BatchDrop, Expected: BatchDrop,
}, },
{
Name: "past timestamp with Holocene at L1 inc",
L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C},
L2SafeHead: l2A0,
Batch: BatchWithL1InclusionBlock{
L1InclusionBlock: l1B,
Batch: &SingularBatch{
ParentHash: l2A1.ParentHash,
EpochNum: rollup.Epoch(l2A1.L1Origin.Number),
EpochHash: l2A1.L1Origin.Hash,
Timestamp: l2A0.Time, // repeating the same time
},
},
Expected: BatchPast,
ExpectedLog: "dropping past batch with old timestamp",
ConfigMod: holoceneAt(&l1B.Time),
},
{ {
Name: "misaligned timestamp", Name: "misaligned timestamp",
L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C},
...@@ -636,6 +671,26 @@ func TestValidBatch(t *testing.T) { ...@@ -636,6 +671,26 @@ func TestValidBatch(t *testing.T) {
ExpectedLog: "received out-of-order batch for future processing after next batch", ExpectedLog: "received out-of-order batch for future processing after next batch",
ConfigMod: deltaAtGenesis, ConfigMod: deltaAtGenesis,
}, },
{
Name: "future timestamp with Holocene at L1 inc",
L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C},
L2SafeHead: l2A0,
Batch: BatchWithL1InclusionBlock{
L1InclusionBlock: l1B,
Batch: initializedSpanBatch([]*SingularBatch{
{
ParentHash: l2A1.ParentHash,
EpochNum: rollup.Epoch(l2A1.L1Origin.Number),
EpochHash: l2A1.L1Origin.Hash,
Timestamp: l2A1.Time + 1, // 1 too high
Transactions: nil,
},
}, uint64(0), big.NewInt(0)),
},
Expected: BatchDrop,
ExpectedLog: "dropping future span batch",
ConfigMod: holoceneAt(&l1B.Time),
},
{ {
Name: "misaligned timestamp", Name: "misaligned timestamp",
L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C},
...@@ -873,7 +928,7 @@ func TestValidBatch(t *testing.T) { ...@@ -873,7 +928,7 @@ func TestValidBatch(t *testing.T) {
}, uint64(0), big.NewInt(0)), }, uint64(0), big.NewInt(0)),
}, },
Expected: BatchAccept, Expected: BatchAccept,
ConfigMod: multiMod(deltaAtGenesis, fjordAt(&l1A.Time)), ConfigMod: fjordAt(&l1A.Time),
}, },
{ {
Name: "sequencer time drift on same epoch with non-empty txs - long span", Name: "sequencer time drift on same epoch with non-empty txs - long span",
...@@ -1277,6 +1332,33 @@ func TestValidBatch(t *testing.T) { ...@@ -1277,6 +1332,33 @@ func TestValidBatch(t *testing.T) {
ExpectedLog: "span batch has no new blocks after safe head", ExpectedLog: "span batch has no new blocks after safe head",
ConfigMod: deltaAtGenesis, ConfigMod: deltaAtGenesis,
}, },
{
Name: "fully overlapping batch with Holocene",
L1Blocks: []eth.L1BlockRef{l1A, l1B},
L2SafeHead: l2A2,
Batch: BatchWithL1InclusionBlock{
L1InclusionBlock: l1B,
Batch: initializedSpanBatch([]*SingularBatch{
{
ParentHash: l2A0.Hash,
EpochNum: rollup.Epoch(l2A1.L1Origin.Number),
EpochHash: l2A1.L1Origin.Hash,
Timestamp: l2A1.Time,
Transactions: nil,
},
{
ParentHash: l2A1.Hash,
EpochNum: rollup.Epoch(l2A2.L1Origin.Number),
EpochHash: l2A2.L1Origin.Hash,
Timestamp: l2A2.Time,
Transactions: nil,
},
}, uint64(0), big.NewInt(0)),
},
Expected: BatchPast,
ExpectedLog: "span batch has no new blocks after safe head",
ConfigMod: holoceneAt(&l1B.Time),
},
{ {
Name: "overlapping batch with invalid parent hash", Name: "overlapping batch with invalid parent hash",
L1Blocks: []eth.L1BlockRef{l1A, l1B}, L1Blocks: []eth.L1BlockRef{l1A, l1B},
......
...@@ -25,7 +25,10 @@ type ChannelInReader struct { ...@@ -25,7 +25,10 @@ type ChannelInReader struct {
metrics Metrics metrics Metrics
} }
var _ ResettableStage = (*ChannelInReader)(nil) var (
_ ResettableStage = (*ChannelInReader)(nil)
_ ChannelFlusher = (*ChannelInReader)(nil)
)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(cfg *rollup.Config, log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader { func NewChannelInReader(cfg *rollup.Config, log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader {
...@@ -122,3 +125,8 @@ func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.Sy ...@@ -122,3 +125,8 @@ func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.Sy
cr.nextBatchFn = nil cr.nextBatchFn = nil
return io.EOF return io.EOF
} }
func (cr *ChannelInReader) FlushChannel() {
cr.nextBatchFn = nil
// TODO(12157): cr.prev.FlushChannel() - when we do wiring with ChannelStage
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment