Commit 32f09d19 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #3597 from ethereum-optimism/jg/batchqueue

op-node: Switch batch queue to be pull based
parents 94958ab1 102a9c22
...@@ -22,58 +22,60 @@ import ( ...@@ -22,58 +22,60 @@ import (
// This stage can be reset by clearing it's batch buffer. // This stage can be reset by clearing it's batch buffer.
// This stage does not need to retain any references to L1 blocks. // This stage does not need to retain any references to L1 blocks.
type AttributesQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
StageProgress
}
type AttributesQueue struct { type AttributesQueue struct {
log log.Logger log log.Logger
config *rollup.Config config *rollup.Config
dl L1ReceiptsFetcher dl L1ReceiptsFetcher
next AttributesQueueOutput prev *BatchQueue
progress Progress batch *BatchData
batches []*BatchData
} }
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue { func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue {
return &AttributesQueue{ return &AttributesQueue{
log: log, log: log,
config: cfg, config: cfg,
dl: l1Fetcher, dl: l1Fetcher,
next: next, prev: prev,
} }
} }
func (aq *AttributesQueue) AddBatch(batch *BatchData) { func (aq *AttributesQueue) Origin() eth.L1BlockRef {
aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions)) return aq.prev.Origin()
aq.batches = append(aq.batches, batch)
} }
func (aq *AttributesQueue) Progress() Progress { func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
return aq.progress // Get a batch if we need it
} if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { if err != nil {
if changed, err := aq.progress.Update(outer); err != nil || changed { return nil, err
return err }
aq.batch = batch
} }
if len(aq.batches) == 0 {
return io.EOF // Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil {
return nil, err
} else {
// Clear out the local state once we will succeed
aq.batch = nil
return attrs, nil
} }
batch := aq.batches[0]
safeL2Head := aq.next.SafeL2Head() }
// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash // sanity check parent hash
if batch.ParentHash != safeL2Head.Hash { if batch.ParentHash != l2SafeHead.Hash {
return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash)) return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
} }
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second) fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch()) attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch())
if err != nil { if err != nil {
return err return nil, err
} }
// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool // we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
...@@ -83,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { ...@@ -83,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp) aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp)
// Slice off the batch once we are guaranteed to succeed return attrs, nil
aq.batches = aq.batches[1:]
aq.next.AddSafeAttributes(attrs)
return nil
} }
func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
aq.batches = aq.batches[:0]
aq.progress = aq.next.Progress()
return io.EOF return io.EOF
} }
func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef {
return aq.next.SafeL2Head()
}
...@@ -2,7 +2,6 @@ package derive ...@@ -2,7 +2,6 @@ package derive
import ( import (
"context" "context"
"io"
"math/big" "math/big"
"math/rand" "math/rand"
"testing" "testing"
...@@ -17,29 +16,10 @@ import ( ...@@ -17,29 +16,10 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type MockAttributesQueueOutput struct { // TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function
MockOriginStage // (which is well tested) and that it properly sets NoTxPool and adds in the candidate
} // transactions.
func TestAttributesQueue(t *testing.T) {
func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.MethodCalled("AddSafeAttributes", attributes)
}
func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.On("AddSafeAttributes", attributes).Once().Return()
}
func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef {
return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef)
}
func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) {
m.Mock.On("SafeL2Head").Once().Return(head)
}
var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil)
func TestAttributesQueue_Step(t *testing.T) {
// test config, only init the necessary fields // test config, only init the necessary fields
cfg := &rollup.Config{ cfg := &rollup.Config{
BlockTime: 2, BlockTime: 2,
...@@ -56,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) { ...@@ -56,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) {
l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil) l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil)
out := &MockAttributesQueueOutput{}
out.progress = Progress{
Origin: l1Info.BlockRef(),
Closed: false,
}
defer out.AssertExpectations(t)
safeHead := testutils.RandomL2BlockRef(rng) safeHead := testutils.RandomL2BlockRef(rng)
safeHead.L1Origin = l1Info.ID() safeHead.L1Origin = l1Info.ID()
out.ExpectSafeL2Head(safeHead)
batch := &BatchData{BatchV1{ batch := &BatchData{BatchV1{
ParentHash: safeHead.Hash, ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum), EpochNum: rollup.Epoch(l1Info.InfoNum),
...@@ -85,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) { ...@@ -85,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) {
Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")}, Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")},
NoTxPool: true, NoTxPool: true,
} }
out.ExpectAddSafeAttributes(&attrs)
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out) aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil)
require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1))
aq.AddBatch(batch) actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet") require.Nil(t, err)
require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches") require.Equal(t, attrs, *actual)
} }
...@@ -40,11 +40,10 @@ type NextBatchProvider interface { ...@@ -40,11 +40,10 @@ type NextBatchProvider interface {
// BatchQueue contains a set of batches for every L1 block. // BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs. // L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct { type BatchQueue struct {
log log.Logger log log.Logger
config *rollup.Config config *rollup.Config
next BatchQueueOutput prev NextBatchProvider
prev NextBatchProvider origin eth.L1BlockRef
progress Progress
l1Blocks []eth.L1BlockRef l1Blocks []eth.L1BlockRef
...@@ -53,102 +52,91 @@ type BatchQueue struct { ...@@ -53,102 +52,91 @@ type BatchQueue struct {
} }
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. // NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput, prev NextBatchProvider) *BatchQueue { func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
return &BatchQueue{ return &BatchQueue{
log: log, log: log,
config: cfg, config: cfg,
next: next,
prev: prev, prev: prev,
} }
} }
func (bq *BatchQueue) Progress() Progress { func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.progress return bq.prev.Origin()
} }
func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error { func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
originBehind := bq.origin.Number < safeL2Head.L1Origin.Number
originBehind := bq.progress.Origin.Number < bq.next.SafeL2Head().L1Origin.Number
// Advance origin if needed // Advance origin if needed
// Note: The entire pipeline has the same origin // Note: The entire pipeline has the same origin
// We just don't accept batches prior to the L1 origin of the L2 safe head // We just don't accept batches prior to the L1 origin of the L2 safe head
if bq.progress.Origin != bq.prev.Origin() { if bq.origin != bq.prev.Origin() {
bq.progress.Closed = false bq.origin = bq.prev.Origin()
bq.progress.Origin = bq.prev.Origin()
if !originBehind { if !originBehind {
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) bq.l1Blocks = append(bq.l1Blocks, bq.origin)
}
bq.log.Info("Advancing bq origin", "origin", bq.progress.Origin)
return nil
}
if !bq.progress.Closed {
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
bq.log.Info("Closing batch queue origin")
bq.progress.Closed = true
return nil
} else if err != nil {
return err
} else { } else {
bq.log.Info("have batch") // This is to handle the special case of startup. At startup we call Reset & include
if !originBehind { // the L1 origin. That is the only time where immediately after `Reset` is called
bq.AddBatch(batch) // originBehind is false.
} else { bq.l1Blocks = bq.l1Blocks[:0]
bq.log.Warn("Skipping old batch")
}
} }
bq.log.Info("Advancing bq origin", "origin", bq.origin)
}
// Load more data into the batch queue
outOfData := false
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
outOfData = true
} else if err != nil {
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
} }
// Skip adding batches / blocks to the internal state until they are from the same L1 origin // Skip adding data unless we are up to date with the origin, but do fully
// as the current safe head. // empty the previous stages
if originBehind { if originBehind {
if bq.progress.Closed { if outOfData {
return io.EOF return nil, io.EOF
} else { } else {
// Immediately close the stage return nil, NotEnoughData
bq.progress.Closed = true
return nil
} }
} }
batch, err := bq.deriveNextBatch(ctx) // Finally attempt to derive more batches
if err == io.EOF { batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
bq.log.Info("no more batches in deriveNextBatch") if err == io.EOF && outOfData {
if bq.progress.Closed { return nil, io.EOF
return io.EOF } else if err == io.EOF {
} else { return nil, NotEnoughData
return nil
}
} else if err != nil { } else if err != nil {
return err return nil, err
} }
bq.next.AddBatch(batch) return batch, nil
return nil
} }
func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
// Copy over the Origin from the next stage // Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress // It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.progress = bq.next.Progress() bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock) bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
// Include the new origin as an origin to build on // Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0] bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin) bq.l1Blocks = append(bq.l1Blocks, base)
return io.EOF return io.EOF
} }
func (bq *BatchQueue) AddBatch(batch *BatchData) { func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
if bq.progress.Closed {
panic("write batch while closed")
}
if len(bq.l1Blocks) == 0 { if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)) panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
} }
data := BatchWithL1InclusionBlock{ data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.progress.Origin, L1InclusionBlock: bq.origin,
Batch: batch, Batch: batch,
} }
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data) validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
if validity == BatchDrop { if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level. return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
} }
...@@ -159,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) { ...@@ -159,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) {
// following the validity rules imposed on consecutive batches, // following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information. // based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned. // If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) { func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
if len(bq.l1Blocks) == 0 { if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared")) return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
} }
epoch := bq.l1Blocks[0] epoch := bq.l1Blocks[0]
l2SafeHead := bq.next.SafeL2Head()
if l2SafeHead.L1Origin != epoch.ID() { if l2SafeHead.L1Origin != epoch.ID() {
return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead)) return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead))
...@@ -229,8 +216,8 @@ batchLoop: ...@@ -229,8 +216,8 @@ batchLoop:
// i.e. if the sequence window expired, we create empty batches // i.e. if the sequence window expired, we create empty batches
expiryEpoch := epoch.Number + bq.config.SeqWindowSize expiryEpoch := epoch.Number + bq.config.SeqWindowSize
forceNextEpoch := forceNextEpoch :=
(expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) || (expiryEpoch == bq.origin.Number && outOfData) ||
expiryEpoch < bq.progress.Origin.Number expiryEpoch < bq.origin.Number
if !forceNextEpoch { if !forceNextEpoch {
// sequence window did not expire yet, still room to receive batches for the current epoch, // sequence window did not expire yet, still room to receive batches for the current epoch,
......
This diff is collapsed.
...@@ -17,6 +17,11 @@ import ( ...@@ -17,6 +17,11 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type NextAttributesProvider interface {
Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error)
}
type Engine interface { type Engine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error) GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
...@@ -64,8 +69,6 @@ type EngineQueue struct { ...@@ -64,8 +69,6 @@ type EngineQueue struct {
finalizedL1 eth.BlockID finalizedL1 eth.BlockID
progress Progress
safeAttributes []*eth.PayloadAttributes safeAttributes []*eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
...@@ -73,14 +76,15 @@ type EngineQueue struct { ...@@ -73,14 +76,15 @@ type EngineQueue struct {
finalityData []FinalityData finalityData []FinalityData
engine Engine engine Engine
prev NextAttributesProvider
progress Progress // only used for pipeline resets
metrics Metrics metrics Metrics
} }
var _ AttributesQueueOutput = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue { func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue {
return &EngineQueue{ return &EngineQueue{
log: log, log: log,
cfg: cfg, cfg: cfg,
...@@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M ...@@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
MaxSize: maxUnsafePayloadsMemory, MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize, SizeFn: payloadMemSize,
}, },
prev: prev,
} }
} }
...@@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 { ...@@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 {
return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp) return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
} }
func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error { func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error {
if changed, err := eq.progress.Update(outer); err != nil || changed {
return err
}
if len(eq.safeAttributes) > 0 { if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx) return eq.tryNextSafeAttributes(ctx)
} }
outOfData := false
if len(eq.safeAttributes) == 0 {
if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
outOfData = true
} else if err != nil {
return err
} else {
eq.safeAttributes = append(eq.safeAttributes, next)
return NotEnoughData
}
}
if eq.unsafePayloads.Len() > 0 { if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx) return eq.tryNextUnsafePayload(ctx)
} }
return io.EOF
if outOfData {
return io.EOF
} else {
return nil
}
} }
// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized, // tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
...@@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() { ...@@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...) eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
} }
// remember the last L2 block that we fully derived from the given finality data // remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number { if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number {
// append entry for new L1 block // append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{ eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead, L2Block: eq.safeHead,
L1Block: eq.progress.Origin.ID(), L1Block: eq.prev.Origin().ID(),
}) })
} else { } else {
// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry // if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
...@@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { ...@@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.safeHead, "l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead, "l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time, "l2_time", eq.unsafeHead.Time,
"l1_derived", eq.progress.Origin, "l1_derived", eq.prev.Origin(),
) )
} }
...@@ -415,7 +433,6 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error ...@@ -415,7 +433,6 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads. // note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{ eq.progress = Progress{
Origin: pipelineOrigin, Origin: pipelineOrigin,
Closed: false,
} }
eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe) eq.metrics.RecordL2Ref("l2_safe", safe)
......
package derive package derive
import ( import (
"context"
"io"
"math/rand" "math/rand"
"testing" "testing"
...@@ -14,6 +16,20 @@ import ( ...@@ -14,6 +16,20 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type fakeAttributesQueue struct {
origin eth.L1BlockRef
}
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
return nil, io.EOF
}
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
func TestEngineQueue_Finalize(t *testing.T) { func TestEngineQueue_Finalize(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
...@@ -211,8 +227,10 @@ func TestEngineQueue_Finalize(t *testing.T) { ...@@ -211,8 +227,10 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil) l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil)
l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil) l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil)
eq := NewEngineQueue(logger, cfg, eng, metrics) prev := &fakeAttributesQueue{}
require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20))
eq := NewEngineQueue(logger, cfg, eng, metrics, prev)
require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B") require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B")
...@@ -220,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) { ...@@ -220,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) {
// now say C1 was included in D and became the new safe head // now say C1 was included in D and became the new safe head
eq.progress.Origin = refD eq.progress.Origin = refD
prev.origin = refD
eq.safeHead = refC1 eq.safeHead = refC1
eq.postProcessSafeL2() eq.postProcessSafeL2()
// now say D0 was included in E and became the new safe head // now say D0 was included in E and became the new safe head
eq.progress.Origin = refE eq.progress.Origin = refE
prev.origin = refE
eq.safeHead = refD0 eq.safeHead = refD0
eq.postProcessSafeL2() eq.postProcessSafeL2()
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD.ID()) eq.Finalize(refD.ID())
// Now a few steps later, without consuming any additional L1 inputs,
// we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C
require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10))
require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized") require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")
l1F.AssertExpectations(t) l1F.AssertExpectations(t)
......
...@@ -100,14 +100,14 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch ...@@ -100,14 +100,14 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
bank := NewChannelBank(log, cfg, l1Src, l1Fetcher) bank := NewChannelBank(log, cfg, l1Src, l1Fetcher)
chInReader := NewChannelInReader(log, bank) chInReader := NewChannelInReader(log, bank)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)
// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages) // Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics) eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng)
batchQueue := NewBatchQueue(log, cfg, attributesQueue, chInReader)
stages := []Stage{eng, attributesQueue, batchQueue} stages := []Stage{eng}
pullStages := []PullStage{chInReader, bank, l1Src, l1Traversal} pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
return &DerivationPipeline{ return &DerivationPipeline{
log: log, log: log,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment