Commit 6a5fbf1b authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #3583 from ethereum-optimism/jg/l1traversal

op-node: Switch L1 Traversal to a pull based model
parents cfec0345 6890e775
......@@ -22,58 +22,60 @@ import (
// This stage can be reset by clearing it's batch buffer.
// This stage does not need to retain any references to L1 blocks.
type AttributesQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
StageProgress
}
type AttributesQueue struct {
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
next AttributesQueueOutput
progress Progress
batches []*BatchData
prev *BatchQueue
batch *BatchData
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue {
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue {
return &AttributesQueue{
log: log,
config: cfg,
dl: l1Fetcher,
next: next,
prev: prev,
}
}
func (aq *AttributesQueue) AddBatch(batch *BatchData) {
aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions))
aq.batches = append(aq.batches, batch)
}
func (aq *AttributesQueue) Progress() Progress {
return aq.progress
func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}
func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := aq.progress.Update(outer); err != nil || changed {
return err
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// Get a batch if we need it
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
if err != nil {
return nil, err
}
if len(aq.batches) == 0 {
return io.EOF
aq.batch = batch
}
batch := aq.batches[0]
safeL2Head := aq.next.SafeL2Head()
// Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil {
return nil, err
} else {
// Clear out the local state once we will succeed
aq.batch = nil
return attrs, nil
}
}
// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != safeL2Head.Hash {
return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash))
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
}
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch())
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch())
if err != nil {
return err
return nil, err
}
// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
......@@ -83,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp)
// Slice off the batch once we are guaranteed to succeed
aq.batches = aq.batches[1:]
aq.next.AddSafeAttributes(attrs)
return nil
return attrs, nil
}
func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
aq.batches = aq.batches[:0]
aq.progress = aq.next.Progress()
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
return io.EOF
}
func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef {
return aq.next.SafeL2Head()
}
......@@ -2,7 +2,6 @@ package derive
import (
"context"
"io"
"math/big"
"math/rand"
"testing"
......@@ -17,29 +16,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type MockAttributesQueueOutput struct {
MockOriginStage
}
func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.MethodCalled("AddSafeAttributes", attributes)
}
func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.On("AddSafeAttributes", attributes).Once().Return()
}
func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef {
return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef)
}
func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) {
m.Mock.On("SafeL2Head").Once().Return(head)
}
var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil)
func TestAttributesQueue_Step(t *testing.T) {
// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function
// (which is well tested) and that it properly sets NoTxPool and adds in the candidate
// transactions.
func TestAttributesQueue(t *testing.T) {
// test config, only init the necessary fields
cfg := &rollup.Config{
BlockTime: 2,
......@@ -56,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) {
l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil)
out := &MockAttributesQueueOutput{}
out.progress = Progress{
Origin: l1Info.BlockRef(),
Closed: false,
}
defer out.AssertExpectations(t)
safeHead := testutils.RandomL2BlockRef(rng)
safeHead.L1Origin = l1Info.ID()
out.ExpectSafeL2Head(safeHead)
batch := &BatchData{BatchV1{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
......@@ -85,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) {
Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")},
NoTxPool: true,
}
out.ExpectAddSafeAttributes(&attrs)
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out)
require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1))
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil)
aq.AddBatch(batch)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet")
require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches")
require.Nil(t, err)
require.Equal(t, attrs, *actual)
}
......@@ -32,13 +32,18 @@ type BatchQueueOutput interface {
SafeL2Head() eth.L2BlockRef
}
type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
}
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
log log.Logger
config *rollup.Config
next BatchQueueOutput
progress Progress
prev NextBatchProvider
origin eth.L1BlockRef
l1Blocks []eth.L1BlockRef
......@@ -47,62 +52,91 @@ type BatchQueue struct {
}
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
next: next,
prev: prev,
}
}
func (bq *BatchQueue) Progress() Progress {
return bq.progress
func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
}
func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := bq.progress.Update(outer); err != nil {
return err
} else if changed {
if !bq.progress.Closed { // init inputs if we moved to a new open origin
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
originBehind := bq.origin.Number < safeL2Head.L1Origin.Number
// Advance origin if needed
// Note: The entire pipeline has the same origin
// We just don't accept batches prior to the L1 origin of the L2 safe head
if bq.origin != bq.prev.Origin() {
bq.origin = bq.prev.Origin()
if !originBehind {
bq.l1Blocks = append(bq.l1Blocks, bq.origin)
} else {
// This is to handle the special case of startup. At startup we call Reset & include
// the L1 origin. That is the only time where immediately after `Reset` is called
// originBehind is false.
bq.l1Blocks = bq.l1Blocks[:0]
}
return nil
bq.log.Info("Advancing bq origin", "origin", bq.origin)
}
batch, err := bq.deriveNextBatch(ctx)
if err == io.EOF {
// very noisy, commented for now, or we should bump log level from trace to debug
// bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin)
return io.EOF
// Load more data into the batch queue
outOfData := false
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
outOfData = true
} else if err != nil {
return err
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
}
bq.next.AddBatch(batch)
return nil
// Skip adding data unless we are up to date with the origin, but do fully
// empty the previous stages
if originBehind {
if outOfData {
return nil, io.EOF
} else {
return nil, NotEnoughData
}
}
// Finally attempt to derive more batches
batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
if err == io.EOF && outOfData {
return nil, io.EOF
} else if err == io.EOF {
return nil, NotEnoughData
} else if err != nil {
return nil, err
}
return batch, nil
}
func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.progress = bq.next.Progress()
bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
bq.l1Blocks = append(bq.l1Blocks, base)
return io.EOF
}
func (bq *BatchQueue) AddBatch(batch *BatchData) {
if bq.progress.Closed {
panic("write batch while closed")
}
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
}
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.progress.Origin,
L1InclusionBlock: bq.origin,
Batch: batch,
}
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data)
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
}
......@@ -113,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) {
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) {
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
}
epoch := bq.l1Blocks[0]
l2SafeHead := bq.next.SafeL2Head()
if l2SafeHead.L1Origin != epoch.ID() {
return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead))
......@@ -183,8 +216,8 @@ batchLoop:
// i.e. if the sequence window expired, we create empty batches
expiryEpoch := epoch.Number + bq.config.SeqWindowSize
forceNextEpoch :=
(expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) ||
expiryEpoch < bq.progress.Origin.Number
(expiryEpoch == bq.origin.Number && outOfData) ||
expiryEpoch < bq.origin.Number
if !forceNextEpoch {
// sequence window did not expire yet, still room to receive batches for the current epoch,
......
......@@ -7,8 +7,6 @@ import (
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
......@@ -16,44 +14,28 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// fakeBatchQueueOutput fakes the next stage (receive only) for the batch queue
// It tracks the progress state of the next stage.
// Upon receiving a batch, relevant characteristic of safeL2Head are immediately advanced.
type fakeBatchQueueOutput struct {
progress Progress
type fakeBatchQueueInput struct {
i int
batches []*BatchData
safeL2Head eth.L2BlockRef
errors []error
origin eth.L1BlockRef
}
var _ BatchQueueOutput = (*fakeBatchQueueOutput)(nil)
func (f *fakeBatchQueueOutput) AddBatch(batch *BatchData) {
f.batches = append(f.batches, batch)
if batch.ParentHash != f.safeL2Head.Hash {
panic("batch has wrong parent hash")
}
newEpoch := f.safeL2Head.L1Origin.Hash != batch.EpochHash
// Advance SafeL2Head
f.safeL2Head.Time = batch.Timestamp
f.safeL2Head.L1Origin.Number = uint64(batch.EpochNum)
f.safeL2Head.L1Origin.Hash = batch.EpochHash
if newEpoch {
f.safeL2Head.SequenceNumber = 0
} else {
f.safeL2Head.SequenceNumber += 1
}
f.safeL2Head.ParentHash = batch.ParentHash
f.safeL2Head.Hash = mockHash(batch.Timestamp, 2)
func (f *fakeBatchQueueInput) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeBatchQueueOutput) SafeL2Head() eth.L2BlockRef {
return f.safeL2Head
}
func (f *fakeBatchQueueOutput) Progress() Progress {
return f.progress
func (f *fakeBatchQueueInput) NextBatch(ctx context.Context) (*BatchData, error) {
if f.i >= len(f.batches) {
return nil, io.EOF
}
b := f.batches[f.i]
e := f.errors[f.i]
f.i += 1
return b, e
}
func mockHash(time uint64, layer uint8) common.Hash {
......@@ -90,22 +72,18 @@ func L1Chain(l1Times []uint64) []eth.L1BlockRef {
return out
}
// TestBatchQueueEager adds a bunch of contiguous batches and asserts that
// enough calls to `NextBatch` return all of those batches.
func TestBatchQueueEager(t *testing.T) {
log := testlog.Logger(t, log.LvlTrace)
log := testlog.Logger(t, log.LvlCrit)
l1 := L1Chain([]uint64{10, 20, 30})
next := &fakeBatchQueueOutput{
safeL2Head: eth.L2BlockRef{
safeHead := eth.L2BlockRef{
Hash: mockHash(10, 2),
Number: 0,
ParentHash: common.Hash{},
Time: 10,
L1Origin: l1[0].ID(),
SequenceNumber: 0,
},
progress: Progress{
Origin: l1[0],
Closed: false,
},
}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
......@@ -116,129 +94,44 @@ func TestBatchQueueEager(t *testing.T) {
SeqWindowSize: 30,
}
bq := NewBatchQueue(log, cfg, next)
require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step")
// We start with an open L1 origin as progress in the first step
progress := bq.progress
require.Equal(t, bq.progress.Closed, false)
batches := []*BatchData{b(12, l1[0]), b(14, l1[0]), b(16, l1[0]), b(18, l1[0]), b(20, l1[0]), b(22, l1[0]), b(24, l1[1]), nil}
errors := []error{nil, nil, nil, nil, nil, nil, nil, io.EOF}
// Add batches
batches := []*BatchData{b(12, l1[0]), b(14, l1[0])}
for _, batch := range batches {
bq.AddBatch(batch)
input := &fakeBatchQueueInput{
batches: batches,
errors: errors,
origin: l1[0],
}
// Step
require.NoError(t, RepeatStep(t, bq.Step, progress, 10))
// Verify Output
require.Equal(t, batches, next.batches)
}
func TestBatchQueueFull(t *testing.T) {
log := testlog.Logger(t, log.LvlTrace)
l1 := L1Chain([]uint64{10, 15, 20})
next := &fakeBatchQueueOutput{
safeL2Head: eth.L2BlockRef{
Hash: mockHash(10, 2),
Number: 0,
ParentHash: common.Hash{},
Time: 10,
L1Origin: l1[0].ID(),
SequenceNumber: 0,
},
progress: Progress{
Origin: l1[0],
Closed: false,
},
bq := NewBatchQueue(log, cfg, input)
_ = bq.Reset(context.Background(), l1[0])
// Advance the origin
input.origin = l1[1]
for i := 0; i < len(batches); i++ {
b, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, errors[i])
require.Equal(t, batches[i], b)
if b != nil {
safeHead.Number += 1
safeHead.Time += 2
safeHead.Hash = mockHash(b.Timestamp, 2)
safeHead.L1Origin = b.Epoch()
}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L2Time: 10,
},
BlockTime: 2,
MaxSequencerDrift: 600,
SeqWindowSize: 2,
}
bq := NewBatchQueue(log, cfg, next)
require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step")
// We start with an open L1 origin as progress in the first step
progress := bq.progress
require.Equal(t, bq.progress.Closed, false)
// Add batches
batches := []*BatchData{b(14, l1[0]), b(16, l1[0]), b(18, l1[1])}
for _, batch := range batches {
bq.AddBatch(batch)
}
// Missing first batch
err := bq.Step(context.Background(), progress)
require.Equal(t, err, io.EOF)
// Close previous to close bq
progress.Closed = true
err = bq.Step(context.Background(), progress)
require.Equal(t, err, nil)
require.Equal(t, bq.progress.Closed, true)
// Open previous to open bq with the new inclusion block
progress.Closed = false
progress.Origin = l1[1]
err = bq.Step(context.Background(), progress)
require.Equal(t, err, nil)
require.Equal(t, bq.progress.Closed, false)
// Close previous to close bq (for epoch 2)
progress.Closed = true
err = bq.Step(context.Background(), progress)
require.Equal(t, err, nil)
require.Equal(t, bq.progress.Closed, true)
// Open previous to open bq with the new inclusion block (epoch 2)
progress.Closed = false
progress.Origin = l1[2]
err = bq.Step(context.Background(), progress)
require.Equal(t, err, nil)
require.Equal(t, bq.progress.Closed, false)
// Finally add batch
firstBatch := b(12, l1[0])
bq.AddBatch(firstBatch)
// Close the origin
progress.Closed = true
err = bq.Step(context.Background(), progress)
require.Equal(t, err, nil)
require.Equal(t, bq.progress.Closed, true)
// Step, but should have full epoch now
require.NoError(t, RepeatStep(t, bq.Step, progress, 10))
// Verify Output
var final []*BatchData
final = append(final, firstBatch)
final = append(final, batches...)
require.Equal(t, final, next.batches)
}
func TestBatchQueueMissing(t *testing.T) {
log := testlog.Logger(t, log.LvlTrace)
log := testlog.Logger(t, log.LvlCrit)
l1 := L1Chain([]uint64{10, 15, 20})
next := &fakeBatchQueueOutput{
safeL2Head: eth.L2BlockRef{
safeHead := eth.L2BlockRef{
Hash: mockHash(10, 2),
Number: 0,
ParentHash: common.Hash{},
Time: 10,
L1Origin: l1[0].ID(),
SequenceNumber: 0,
},
progress: Progress{
Origin: l1[0],
Closed: false,
},
}
cfg := &rollup.Config{
Genesis: rollup.Genesis{
......@@ -249,56 +142,68 @@ func TestBatchQueueMissing(t *testing.T) {
SeqWindowSize: 2,
}
bq := NewBatchQueue(log, cfg, next)
require.Equal(t, io.EOF, bq.ResetStep(context.Background(), nil), "reset should complete without l1 fetcher, single step")
// We start with an open L1 origin as progress in the first step
progress := bq.progress
require.Equal(t, bq.progress.Closed, false)
// The batches at 18 and 20 are skipped to stop 22 from being eagerly processed.
// This test checks that batch timestamp 12 & 14 are created, 16 is used, and 18 is advancing the epoch.
// Due to the large sequencer time drift 16 is perfectly valid to have epoch 0 as origin.
batches := []*BatchData{b(16, l1[0]), b(22, l1[1])}
for _, batch := range batches {
bq.AddBatch(batch)
}
// Missing first batches with timestamp 12 and 14, nothing to do yet.
err := bq.Step(context.Background(), progress)
require.Equal(t, err, io.EOF)
errors := []error{nil, nil}
// Close l1[0]
progress.Closed = true
require.NoError(t, RepeatStep(t, bq.Step, progress, 10))
require.Equal(t, bq.progress.Closed, true)
input := &fakeBatchQueueInput{
batches: batches,
errors: errors,
origin: l1[0],
}
// Open l1[1]
progress.Closed = false
progress.Origin = l1[1]
require.NoError(t, RepeatStep(t, bq.Step, progress, 10))
require.Equal(t, bq.progress.Closed, false)
require.Empty(t, next.batches, "no batches yet, sequence window did not expire, waiting for 12 and 14")
bq := NewBatchQueue(log, cfg, input)
_ = bq.Reset(context.Background(), l1[0])
// Close l1[1]
progress.Closed = true
require.NoError(t, RepeatStep(t, bq.Step, progress, 10))
require.Equal(t, bq.progress.Closed, true)
for i := 0; i < len(batches); i++ {
b, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, NotEnoughData)
require.Nil(t, b)
}
// Open l1[2]
progress.Closed = false
progress.Origin = l1[2]
require.NoError(t, RepeatStep(t, bq.Step, progress, 10))
require.Equal(t, bq.progress.Closed, false)
// advance origin. Underlying stage still has no more batches
// This is not enough to auto advance yet
input.origin = l1[1]
b, e := bq.NextBatch(context.Background(), safeHead)
require.ErrorIs(t, e, io.EOF)
require.Nil(t, b)
// Advance the origin. At this point batch timestamps 12 and 14 will be created
input.origin = l1[2]
// Check for a generated batch at t = 12
b, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b.Timestamp, uint64(12))
require.Empty(t, b.BatchV1.Transactions)
safeHead.Number += 1
safeHead.Time += 2
safeHead.Hash = mockHash(b.Timestamp, 2)
// Check for generated batch at t = 14
b, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b.Timestamp, uint64(14))
require.Empty(t, b.BatchV1.Transactions)
safeHead.Number += 1
safeHead.Time += 2
safeHead.Hash = mockHash(b.Timestamp, 2)
// Check for the inputted batch at t = 16
b, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b, batches[0])
safeHead.Number += 1
safeHead.Time += 2
safeHead.Hash = mockHash(b.Timestamp, 2)
// Check for the generated batch at t = 18. This batch advances the epoch
b, e = bq.NextBatch(context.Background(), safeHead)
require.Nil(t, e)
require.Equal(t, b.Timestamp, uint64(18))
require.Empty(t, b.BatchV1.Transactions)
require.Equal(t, rollup.Epoch(1), b.EpochNum)
// Close l1[2], this is the moment that l1[0] expires and empty batches 12 and 14 can be created,
// and batch 16 can then be used.
progress.Closed = true
require.NoError(t, RepeatStep(t, bq.Step, progress, 10))
require.Equal(t, bq.progress.Closed, true)
require.Equal(t, 4, len(next.batches), "expecting empty batches with timestamp 12 and 14 to be created and existing batch 16 to follow")
require.Equal(t, uint64(12), next.batches[0].Timestamp)
require.Equal(t, uint64(14), next.batches[1].Timestamp)
require.Equal(t, batches[0], next.batches[2])
require.Equal(t, uint64(18), next.batches[3].Timestamp)
require.Equal(t, rollup.Epoch(1), next.batches[3].EpochNum)
}
......@@ -2,7 +2,6 @@ package derive
import (
"context"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
......@@ -11,6 +10,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextDataProvider interface {
NextData(ctx context.Context) ([]byte, error)
Origin() eth.L1BlockRef
}
// ChannelBank is a stateful stage that does the following:
// 1. Unmarshalls frames from L1 transaction data
// 2. Applies those frames to a channel
......@@ -22,11 +26,6 @@ import (
// Specifically, the channel bank is not allowed to become too large between successive calls
// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large.
type ChannelBankOutput interface {
StageProgress
WriteChannel(data []byte)
}
// ChannelBank buffers channel frames, and emits full channel data
type ChannelBank struct {
log log.Logger
......@@ -35,80 +34,78 @@ type ChannelBank struct {
channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order
progress Progress
next ChannelBankOutput
prev NextDataProvider
fetcher L1Fetcher
}
var _ Stage = (*ChannelBank)(nil)
var _ PullStage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput) *ChannelBank {
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
channels: make(map[ChannelID]*Channel),
channelQueue: make([]ChannelID, 0, 10),
next: next,
prev: prev,
fetcher: fetcher,
}
}
func (ib *ChannelBank) Progress() Progress {
return ib.progress
func (cb *ChannelBank) Origin() eth.L1BlockRef {
return cb.prev.Origin()
}
func (ib *ChannelBank) prune() {
func (cb *ChannelBank) prune() {
// check total size
totalSize := uint64(0)
for _, ch := range ib.channels {
for _, ch := range cb.channels {
totalSize += ch.size
}
// prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there.
for totalSize > MaxChannelBankSize {
id := ib.channelQueue[0]
ch := ib.channels[id]
ib.channelQueue = ib.channelQueue[1:]
delete(ib.channels, id)
id := cb.channelQueue[0]
ch := cb.channels[id]
cb.channelQueue = cb.channelQueue[1:]
delete(cb.channels, id)
totalSize -= ch.size
}
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.\
func (ib *ChannelBank) IngestData(data []byte) {
if ib.progress.Closed {
panic("write data to bank while closed")
}
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
func (cb *ChannelBank) IngestData(data []byte) {
origin := cb.Origin()
cb.log.Debug("channel bank got new data", "origin", origin, "data_len", len(data))
// TODO: Why is the prune here?
ib.prune()
cb.prune()
frames, err := ParseFrames(data)
if err != nil {
ib.log.Warn("malformed frame", "err", err)
cb.log.Warn("malformed frame", "err", err)
return
}
// Process each frame
for _, f := range frames {
currentCh, ok := ib.channels[f.ID]
currentCh, ok := cb.channels[f.ID]
if !ok {
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, ib.progress.Origin)
ib.channels[f.ID] = currentCh
ib.channelQueue = append(ib.channelQueue, f.ID)
currentCh = NewChannel(f.ID, origin)
cb.channels[f.ID] = currentCh
cb.channelQueue = append(cb.channelQueue, f.ID)
}
// check if the channel is not timed out
if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number {
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number {
cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
continue
}
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil {
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, origin); err != nil {
cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
continue
}
}
......@@ -116,72 +113,62 @@ func (ib *ChannelBank) IngestData(data []byte) {
// Read the raw data of the first channel, if it's timed-out or closed.
// Read returns io.EOF if there is nothing new to read.
func (ib *ChannelBank) Read() (data []byte, err error) {
if len(ib.channelQueue) == 0 {
func (cb *ChannelBank) Read() (data []byte, err error) {
if len(cb.channelQueue) == 0 {
return nil, io.EOF
}
first := ib.channelQueue[0]
ch := ib.channels[first]
timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number
first := cb.channelQueue[0]
ch := cb.channels[first]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number
if timedOut {
ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
cb.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
return nil, io.EOF
}
if !ch.IsReady() {
return nil, io.EOF
}
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
r := ch.Reader()
// Suprress error here. io.ReadAll does return nil instead of io.EOF though.
data, _ = io.ReadAll(r)
return data, nil
}
func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error {
if changed, err := ib.progress.Update(outer); err != nil || changed {
return err
}
// If the bank is behind the channel reader, then we are replaying old data to prepare the bank.
// Read if we can, and drop if it gives anything
if ib.next.Progress().Origin.Number > ib.progress.Origin.Number {
_, err := ib.Read()
return err
// NextData pulls the next piece of data from the channel bank.
// Note that it attempts to pull data out of the channel bank prior to
// loading data in (unlike most other stages). This is to ensure maintain
// consistency around channel bank pruning which depends upon the order
// of operations.
func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) {
// Do the read from the channel bank first
data, err := cb.Read()
if err == io.EOF {
// continue - We will attempt to load data into the channel bank
} else if err != nil {
return nil, err
} else {
return data, nil
}
// otherwise, read the next channel data from the bank
data, err := ib.Read()
if err == io.EOF { // need new L1 data in the bank before we can read more channel data
return io.EOF
// Then load data into the channel bank
if data, err := cb.prev.NextData(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return err
return nil, err
} else {
cb.IngestData(data)
return nil, NotEnoughData
}
ib.next.WriteChannel(data)
return nil
}
// ResetStep walks back the L1 chain, starting at the origin of the next stage,
// to find the origin that the channel bank should be reset to,
// to get consistent reads starting at origin.
// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin,
// so it is not relevant to replay it into the bank.
func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
ib.progress = ib.next.Progress()
ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin)
// go back in history if we are not distant enough from the next stage
resetBlock := ib.progress.Origin.Number - ib.cfg.ChannelTimeout
if ib.progress.Origin.Number < ib.cfg.ChannelTimeout {
resetBlock = 0 // don't underflow
}
parent, err := l1Fetcher.L1BlockRefByNumber(ctx, resetBlock)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err))
}
ib.progress.Origin = parent
func (cb *ChannelBank) Reset(ctx context.Context, base eth.L1BlockRef) error {
cb.channels = make(map[ChannelID]*Channel)
cb.channelQueue = make([]ChannelID, 0, 10)
return io.EOF
}
......
......@@ -2,78 +2,63 @@ package derive
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type MockChannelBankOutput struct {
MockOriginStage
}
func (m *MockChannelBankOutput) WriteChannel(data []byte) {
m.MethodCalled("WriteChannel", data)
type fakeChannelBankInput struct {
origin eth.L1BlockRef
data []struct {
data []byte
err error
}
}
func (m *MockChannelBankOutput) ExpectWriteChannel(data []byte) {
m.On("WriteChannel", data).Once().Return()
func (f *fakeChannelBankInput) Origin() eth.L1BlockRef {
return f.origin
}
var _ ChannelBankOutput = (*MockChannelBankOutput)(nil)
type bankTestSetup struct {
origins []eth.L1BlockRef
t *testing.T
rng *rand.Rand
cb *ChannelBank
out *MockChannelBankOutput
l1 *testutils.MockL1Source
func (f *fakeChannelBankInput) NextData(_ context.Context) ([]byte, error) {
out := f.data[0]
f.data = f.data[1:]
return out.data, out.err
}
type channelBankTestCase struct {
name string
originTimes []uint64
nextStartsAt int
channelTimeout uint64
fn func(bt *bankTestSetup)
func (f *fakeChannelBankInput) AddOutput(data []byte, err error) {
f.data = append(f.data, struct {
data []byte
err error
}{data: data, err: err})
}
func (ct *channelBankTestCase) Run(t *testing.T) {
cfg := &rollup.Config{
ChannelTimeout: ct.channelTimeout,
}
bt := &bankTestSetup{
t: t,
rng: rand.New(rand.NewSource(1234)),
l1: &testutils.MockL1Source{},
}
bt.origins = append(bt.origins, testutils.RandomBlockRef(bt.rng))
for i := range ct.originTimes[1:] {
ref := testutils.NextRandomRef(bt.rng, bt.origins[i])
bt.origins = append(bt.origins, ref)
// ExpectNextFrameData takes a set of test frame & turns into the raw data
// for reading into the channel bank via `NextData`
func (f *fakeChannelBankInput) AddFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, frame := range frames {
ff := frame.ToFrame()
if err := ff.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
}
for i, x := range ct.originTimes {
bt.origins[i].Time = x
}
bt.out = &MockChannelBankOutput{MockOriginStage{progress: Progress{Origin: bt.origins[ct.nextStartsAt], Closed: false}}}
bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out)
ct.fn(bt)
f.AddOutput(data.Bytes(), nil)
}
var _ NextDataProvider = (*fakeChannelBankInput)(nil)
// format: <channelID-data>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:0:helloworld!"
type testFrame string
......@@ -113,153 +98,76 @@ func (tf testFrame) ToFrame() Frame {
}
}
func (bt *bankTestSetup) ingestData(data []byte) {
bt.cb.IngestData(data)
}
func TestChannelBankSimple(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, fr := range frames {
f := fr.ToFrame()
if err := f.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
}
}
bt.ingestData(data.Bytes())
}
func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) {
require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max))
}
func (bt *bankTestSetup) repeatResetStep(max int, err error) {
require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max))
}
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
func (bt *bankTestSetup) assertOrigin(i int) {
require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i])
}
func (bt *bankTestSetup) assertOriginTime(x uint64) {
require.Equal(bt.t, x, bt.cb.progress.Origin.Time)
}
func (bt *bankTestSetup) expectChannel(data string) {
bt.out.ExpectWriteChannel([]byte(data))
}
func (bt *bankTestSetup) expectL1BlockRefByNumber(i int) {
bt.l1.ExpectL1BlockRefByNumber(bt.origins[i].Number, bt.origins[i], nil)
}
func (bt *bankTestSetup) assertExpectations() {
bt.l1.AssertExpectations(bt.t)
bt.l1.ExpectedCalls = nil
bt.out.AssertExpectations(bt.t)
bt.out.ExpectedCalls = nil
}
cfg := &rollup.Config{ChannelTimeout: 10}
func TestL1ChannelBank(t *testing.T) {
testCases := []channelBankTestCase{
{
name: "time outs and buffering",
originTimes: []uint64{0, 1, 2, 3, 4, 5},
nextStartsAt: 3, // Start next stage at block #3
channelTimeout: 2, // Start at block #1
fn: func(bt *bankTestSetup) {
bt.expectL1BlockRefByNumber(1)
bt.repeatResetStep(10, nil)
bt.ingestFrames("a:0:first") // will time out b/c not closed
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
bt.repeatStep(10, 1, true, nil)
bt.repeatStep(10, 2, false, nil)
bt.assertOrigin(2)
// Load the first + third frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
bt.repeatStep(10, 2, true, nil)
bt.repeatStep(10, 3, false, nil)
bt.assertOrigin(3)
// Load the second frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
bt.repeatStep(10, 3, true, nil)
bt.repeatStep(10, 4, false, nil)
bt.assertOrigin(4)
// Pull out the channel data
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// Properly closed channel
bt.expectChannel("foobarclosed")
bt.ingestFrames("b:0:foobar")
bt.ingestFrames("b:1:closed!")
bt.repeatStep(10, 4, true, nil)
bt.assertExpectations()
},
},
{
name: "duplicate frames",
originTimes: []uint64{0, 1, 2, 3, 4, 5},
nextStartsAt: 3, // Start next stage at block #3
channelTimeout: 2, // Start at block #1c
fn: func(bt *bankTestSetup) {
bt.expectL1BlockRefByNumber(1)
bt.repeatResetStep(10, nil)
bt.ingestFrames("a:0:first") // will time out b/c not closed
// No more data
out, err = cb.NextData(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
bt.repeatStep(10, 1, true, nil)
bt.repeatStep(10, 2, false, nil)
bt.assertOrigin(2)
func TestChannelBankDuplicates(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
bt.repeatStep(10, 2, true, nil)
bt.repeatStep(10, 3, false, nil)
bt.assertOrigin(3)
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:0:altfirst", "a:2:altthird!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
bt.repeatStep(10, 3, true, nil)
bt.repeatStep(10, 4, false, nil)
bt.assertOrigin(4)
cfg := &rollup.Config{ChannelTimeout: 10}
bt.ingestFrames("a:0:first")
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:1:second")
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:0:altfirst") // ignored as duplicate
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:1:altsecond") // ignored as duplicate
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("b:0:new")
bt.repeatStep(1, 4, false, nil)
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
// close origin 4
bt.repeatStep(2, 4, true, nil)
// Load the first + third frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// open origin 1
bt.repeatStep(2, 5, false, nil)
bt.ingestFrames("b:1:hi!") // close the other one first, but blocked
bt.repeatStep(1, 5, false, nil)
bt.ingestFrames("a:2:!") // empty closing frame
bt.expectChannel("firstsecond")
bt.expectChannel("newhi")
bt.repeatStep(5, 5, false, nil)
bt.assertExpectations()
},
},
{
name: "skip bad frames",
originTimes: []uint64{101, 102},
nextStartsAt: 0,
channelTimeout: 3,
fn: func(bt *bankTestSetup) {
// don't do the whole setup process, just override where the stages are
bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.out.progress = Progress{Origin: bt.origins[0], Closed: false}
// Load the duplicate frames
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
bt.assertOriginTime(101)
// Load the second frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
badTx := new(bytes.Buffer)
badTx.WriteByte(DerivationVersion0)
goodFrame := testFrame("a:0:helloworld!").ToFrame()
if err := goodFrame.MarshalBinary(badTx); err != nil {
panic(fmt.Errorf("error in marshalling frame: %w", err))
}
badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data
bt.ingestData(badTx.Bytes())
// Expect the bad frame to render the entire chunk invalid.
bt.repeatStep(2, 0, false, nil)
bt.assertExpectations()
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, testCase.Run)
}
// Pull out the channel data. Expect to see the original set & not the duplicates
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// No more data
out, err = cb.NextData(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
......@@ -5,6 +5,7 @@ import (
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log"
)
......@@ -13,41 +14,36 @@ import (
// This is a pure function from the channel, but each channel (or channel fragment)
// must be tagged with an L1 inclusion block to be passed to the the batch queue.
type BatchQueueStage interface {
StageProgress
AddBatch(batch *BatchData)
}
type ChannelInReader struct {
log log.Logger
nextBatchFn func() (BatchWithL1InclusionBlock, error)
progress Progress
next BatchQueueStage
prev *ChannelBank
}
var _ ChannelBankOutput = (*ChannelInReader)(nil)
var _ PullStage = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader {
return &ChannelInReader{log: log, next: next}
func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader {
return &ChannelInReader{
log: log,
prev: prev,
}
}
func (cr *ChannelInReader) Progress() Progress {
return cr.progress
func (cr *ChannelInReader) Origin() eth.L1BlockRef {
return cr.prev.Origin()
}
// TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) {
if cr.progress.Closed {
panic("write channel while closed")
}
if f, err := BatchReader(bytes.NewBuffer(data), cr.progress.Origin); err == nil {
func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil {
cr.nextBatchFn = f
return nil
} else {
cr.log.Error("Error creating batch reader from channel data", "err", err)
return err
}
}
......@@ -57,32 +53,37 @@ func (cr *ChannelInReader) NextChannel() {
cr.nextBatchFn = nil
}
func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
if changed, err := cr.progress.Update(outer); err != nil || changed {
return err
}
// NextBatch pulls out the next batch from the channel if it has it.
// It returns io.EOF when it cannot make any more progress.
// It will return a temporary error if it needs to be called again to advance some internal state.
func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
if cr.nextBatchFn == nil {
return io.EOF
if data, err := cr.prev.NextData(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return nil, err
} else {
if err := cr.WriteChannel(data); err != nil {
return nil, NewTemporaryError(err)
}
}
}
// TODO: can batch be non nil while err == io.EOF
// This depends on the behavior of rlp.Stream
batch, err := cr.nextBatchFn()
if err == io.EOF {
return io.EOF
cr.NextChannel()
return nil, NotEnoughData
} else if err != nil {
cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err)
cr.NextChannel()
return nil
return nil, NotEnoughData
}
cr.next.AddBatch(batch.Batch)
return nil
return batch.Batch, nil
}
func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef) error {
cr.nextBatchFn = nil
cr.progress = cr.next.Progress()
return io.EOF
}
......@@ -17,6 +17,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextAttributesProvider interface {
Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error)
}
type Engine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
......@@ -64,8 +69,6 @@ type EngineQueue struct {
finalizedL1 eth.BlockID
progress Progress
safeAttributes []*eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
......@@ -73,14 +76,15 @@ type EngineQueue struct {
finalityData []FinalityData
engine Engine
prev NextAttributesProvider
progress Progress // only used for pipeline resets
metrics Metrics
}
var _ AttributesQueueOutput = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
......@@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
},
prev: prev,
}
}
......@@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 {
return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}
func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := eq.progress.Update(outer); err != nil || changed {
return err
}
func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error {
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
outOfData := false
if len(eq.safeAttributes) == 0 {
if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
outOfData = true
} else if err != nil {
return err
} else {
eq.safeAttributes = append(eq.safeAttributes, next)
return NotEnoughData
}
}
if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx)
}
if outOfData {
return io.EOF
} else {
return nil
}
}
// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
......@@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
}
// remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number {
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead,
L1Block: eq.progress.Origin.ID(),
L1Block: eq.prev.Origin().ID(),
})
} else {
// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
......@@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time,
"l1_derived", eq.progress.Origin,
"l1_derived", eq.prev.Origin(),
)
}
......@@ -398,6 +416,15 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time))
}
pipelineNumber := l1Origin.Number - eq.cfg.ChannelTimeout
if l1Origin.Number < eq.cfg.ChannelTimeout {
pipelineNumber = 0
}
pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err))
}
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.unsafeHead = unsafe
eq.safeHead = safe
......@@ -405,8 +432,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
eq.finalityData = eq.finalityData[:0]
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{
Origin: l1Origin,
Closed: false,
Origin: pipelineOrigin,
}
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
......
package derive
import (
"context"
"io"
"math/rand"
"testing"
......@@ -14,6 +16,20 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type fakeAttributesQueue struct {
origin eth.L1BlockRef
}
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
return nil, io.EOF
}
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
func TestEngineQueue_Finalize(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
......@@ -209,9 +225,12 @@ func TestEngineQueue_Finalize(t *testing.T) {
// and we fetch the L1 origin of that as starting point for engine queue
l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil)
l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil)
prev := &fakeAttributesQueue{}
eq := NewEngineQueue(logger, cfg, eng, metrics)
require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20))
eq := NewEngineQueue(logger, cfg, eng, metrics, prev)
require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B")
......@@ -219,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) {
// now say C1 was included in D and became the new safe head
eq.progress.Origin = refD
prev.origin = refD
eq.safeHead = refC1
eq.postProcessSafeL2()
// now say D0 was included in E and became the new safe head
eq.progress.Origin = refE
prev.origin = refE
eq.safeHead = refD0
eq.postProcessSafeL2()
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD.ID())
// Now a few steps later, without consuming any additional L1 inputs,
// we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C
require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10))
require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")
l1F.AssertExpectations(t)
......
package derive
import (
"errors"
"fmt"
)
......@@ -91,3 +92,7 @@ func NewCriticalError(err error) error {
var ErrTemporary = NewTemporaryError(nil)
var ErrReset = NewResetError(nil)
var ErrCritical = NewCriticalError(nil)
// NotEnoughData implies that the function currently does not have enough data to progress
// but if it is retried enough times, it will eventually return a real value or io.EOF
var NotEnoughData = errors.New("not enough data")
......@@ -8,82 +8,69 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type L1SourceOutput interface {
StageProgress
IngestData(data []byte)
}
type DataAvailabilitySource interface {
OpenData(ctx context.Context, id eth.BlockID) DataIter
}
type NextBlockProvider interface {
NextL1Block(context.Context) (eth.L1BlockRef, error)
Origin() eth.L1BlockRef
}
type L1Retrieval struct {
log log.Logger
dataSrc DataAvailabilitySource
next L1SourceOutput
progress Progress
prev NextBlockProvider
data eth.Data
datas DataIter
}
var _ Stage = (*L1Retrieval)(nil)
var _ PullStage = (*L1Retrieval)(nil)
func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput) *L1Retrieval {
func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, prev NextBlockProvider) *L1Retrieval {
return &L1Retrieval{
log: log,
dataSrc: dataSrc,
next: next,
prev: prev,
}
}
func (l1r *L1Retrieval) Progress() Progress {
return l1r.progress
func (l1r *L1Retrieval) Origin() eth.L1BlockRef {
return l1r.prev.Origin()
}
func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
if changed, err := l1r.progress.Update(outer); err != nil || changed {
return err
}
// specific to L1 source: if the L1 origin is closed, there is no more data to retrieve.
if l1r.progress.Closed {
return io.EOF
}
// create a source if we have none
// NextData does an action in the L1 Retrieval stage
// If there is data, it pushes it to the next stage.
// If there is no more data open ourselves if we are closed or close ourselves if we are open
func (l1r *L1Retrieval) NextData(ctx context.Context) ([]byte, error) {
if l1r.datas == nil {
l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
return nil
next, err := l1r.prev.NextL1Block(ctx)
if err == io.EOF {
return nil, io.EOF
} else if err != nil {
return nil, err
}
l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID())
}
// buffer data if we have none
if l1r.data == nil {
l1r.log.Debug("fetching next piece of data")
data, err := l1r.datas.Next(ctx)
if err == io.EOF {
l1r.progress.Closed = true
l1r.datas = nil
return io.EOF
return nil, io.EOF
} else if err != nil {
return err
// CalldataSource appropriately wraps the error so avoid double wrapping errors here.
return nil, err
} else {
l1r.data = data
return nil
}
return data, nil
}
// flush the data to next stage
l1r.next.IngestData(l1r.data)
// and nil the data, the next step will retrieve the next data
l1r.data = nil
return nil
}
func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1r.progress = l1r.next.Progress()
l1r.datas = nil
l1r.data = nil
// ResetStep re-initializes the L1 Retrieval stage to block of it's `next` progress.
// Note that we open up the `l1r.datas` here because it is requires to maintain the
// internal invariants that later propagate up the derivation pipeline.
func (l1r *L1Retrieval) Reset(ctx context.Context, base eth.L1BlockRef) error {
l1r.datas = l1r.dataSrc.OpenData(ctx, base.ID())
l1r.log.Info("Reset of L1Retrieval done", "origin", base)
return io.EOF
}
......@@ -12,21 +12,21 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
type fakeDataIter struct {
idx int
data []eth.Data
errs []error
}
func (cs *fakeDataIter) Next(ctx context.Context) (eth.Data, error) {
if len(cs.data) == 0 {
return nil, io.EOF
} else {
data := cs.data[0]
cs.data = cs.data[1:]
return data, nil
}
i := cs.idx
cs.idx += 1
return cs.data[i], cs.errs[i]
}
type MockDataSource struct {
......@@ -38,53 +38,119 @@ func (m *MockDataSource) OpenData(ctx context.Context, id eth.BlockID) DataIter
return out[0].(DataIter)
}
func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error) {
m.Mock.On("OpenData", id).Return(iter, &err)
func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter) {
m.Mock.On("OpenData", id).Return(iter)
}
var _ DataAvailabilitySource = (*MockDataSource)(nil)
type MockIngestData struct {
MockOriginStage
type MockL1Traversal struct {
mock.Mock
}
func (im *MockIngestData) IngestData(data []byte) {
im.Mock.MethodCalled("IngestData", data)
func (m *MockL1Traversal) Origin() eth.L1BlockRef {
out := m.Mock.MethodCalled("Origin")
return out[0].(eth.L1BlockRef)
}
func (im *MockIngestData) ExpectIngestData(data []byte) {
im.Mock.On("IngestData", data).Return()
func (m *MockL1Traversal) ExpectOrigin(block eth.L1BlockRef) {
m.Mock.On("Origin").Return(block)
}
var _ L1SourceOutput = (*MockIngestData)(nil)
func (m *MockL1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) {
out := m.Mock.MethodCalled("NextL1Block")
return out[0].(eth.L1BlockRef), *out[1].(*error)
}
func TestL1Retrieval_Step(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
func (m *MockL1Traversal) ExpectNextL1Block(block eth.L1BlockRef, err error) {
m.Mock.On("NextL1Block").Return(block, &err)
}
next := &MockIngestData{MockOriginStage{progress: Progress{Origin: testutils.RandomBlockRef(rng), Closed: true}}}
var _ NextBlockProvider = (*MockL1Traversal)(nil)
// TestL1RetrievalReset tests the reset. The reset just opens up a new
// data for the specified block.
func TestL1RetrievalReset(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
dataSrc := &MockDataSource{}
a := testutils.RandomBlockRef(rng)
dataSrc.ExpectOpenData(a.ID(), &fakeDataIter{})
defer dataSrc.AssertExpectations(t)
a := testutils.RandomData(rng, 10)
b := testutils.RandomData(rng, 15)
iter := &fakeDataIter{data: []eth.Data{a, b}}
l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, nil)
outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false}
// We assert that it opens up the correct data on a reset
_ = l1r.Reset(context.Background(), a)
}
// mock some L1 data to open for the origin that is opened by the outer stage
dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil)
// TestL1RetrievalNextData tests that the `NextData` function properly
// handles different error cases and returns the expected data
// if there is no error.
func TestL1RetrievalNextData(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
tests := []struct {
name string
prevBlock eth.L1BlockRef
prevErr error // error returned by prev.NextL1Block
openErr error // error returned by NextData if prev.NextL1Block fails
datas []eth.Data
datasErrs []error
expectedErrs []error
}{
{
name: "simple retrieval",
prevBlock: a,
prevErr: nil,
openErr: nil,
datas: []eth.Data{testutils.RandomData(rng, 10), testutils.RandomData(rng, 10), testutils.RandomData(rng, 10), nil},
datasErrs: []error{nil, nil, nil, io.EOF},
expectedErrs: []error{nil, nil, nil, io.EOF},
},
{
name: "out of data",
prevErr: io.EOF,
openErr: io.EOF,
},
{
name: "fail to open data",
prevBlock: a,
prevErr: nil,
openErr: nil,
datas: []eth.Data{nil},
datasErrs: []error{NewCriticalError(ethereum.NotFound)},
expectedErrs: []error{ErrCritical},
},
}
next.ExpectIngestData(a)
next.ExpectIngestData(b)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
l1t := &MockL1Traversal{}
l1t.ExpectNextL1Block(test.prevBlock, test.prevErr)
dataSrc := &MockDataSource{}
dataSrc.ExpectOpenData(test.prevBlock.ID(), &fakeDataIter{data: test.datas, errs: test.datasErrs})
defer dataSrc.AssertExpectations(t)
defer next.AssertExpectations(t)
ret := NewL1Retrieval(testlog.Logger(t, log.LvlCrit), dataSrc, l1t)
l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next)
// If prevErr != nil we forced an error while getting data from the previous stage
if test.openErr != nil {
data, err := ret.NextData(context.Background())
require.Nil(t, data)
require.ErrorIs(t, err, test.openErr)
}
// Go through the fake data an assert that data is passed through and the correct
// errors are returned.
for i := range test.expectedErrs {
data, err := ret.NextData(context.Background())
require.Equal(t, test.datas[i], hexutil.Bytes(data))
require.ErrorIs(t, err, test.expectedErrs[i])
}
// first we expect the stage to reset to the origin of the inner stage
require.NoError(t, RepeatResetStep(t, l1r.ResetStep, nil, 1))
require.Equal(t, next.Progress(), l1r.Progress(), "stage needs to adopt the progress of next stage on reset")
l1t.AssertExpectations(t)
})
}
// and then start processing the data of the next stage
require.NoError(t, RepeatStep(t, l1r.Step, outer, 10))
}
......@@ -11,42 +11,46 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// L1 Traversal fetches the next L1 block and exposes it through the progress API
type L1BlockRefByNumberFetcher interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
}
type L1Traversal struct {
log log.Logger
block eth.L1BlockRef
done bool
l1Blocks L1BlockRefByNumberFetcher
next StageProgress
progress Progress
log log.Logger
}
var _ Stage = (*L1Traversal)(nil)
var _ PullStage = (*L1Traversal)(nil)
func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher, next StageProgress) *L1Traversal {
func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal {
return &L1Traversal{
log: log,
l1Blocks: l1Blocks,
next: next,
}
}
func (l1t *L1Traversal) Progress() Progress {
return l1t.progress
func (l1t *L1Traversal) Origin() eth.L1BlockRef {
return l1t.block
}
func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error {
if !l1t.progress.Closed { // close origin and do another pipeline sweep, before we try to move to the next origin
l1t.progress.Closed = true
return nil
// NextL1Block returns the next block. It does not advance, but it can only be
// called once before returning io.EOF
func (l1t *L1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) {
if !l1t.done {
l1t.done = true
return l1t.block, nil
} else {
return eth.L1BlockRef{}, io.EOF
}
}
// If we reorg to a shorter chain, then we'll only derive new L2 data once the L1 reorg
// becomes longer than the previous L1 chain.
// This is fine, assuming the new L1 chain is live, but we may want to reconsider this.
origin := l1t.progress.Origin
// AdvanceL1Block advances the internal state of L1 Traversal
func (l1t *L1Traversal) AdvanceL1Block(ctx context.Context) error {
origin := l1t.block
nextL1Origin, err := l1t.l1Blocks.L1BlockRefByNumber(ctx, origin.Number+1)
if errors.Is(err, ethereum.NotFound) {
l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin)
......@@ -54,16 +58,20 @@ func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error {
} else if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find L1 block info by number, at origin %s next %d: %w", origin, origin.Number+1, err))
}
if l1t.progress.Origin.Hash != nextL1Origin.ParentHash {
return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.progress.Origin, nextL1Origin, nextL1Origin.ParentID()))
if l1t.block.Hash != nextL1Origin.ParentHash {
return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.block, nextL1Origin, nextL1Origin.ParentID()))
}
l1t.progress.Origin = nextL1Origin
l1t.progress.Closed = false
l1t.block = nextL1Origin
l1t.done = false
return nil
}
func (l1t *L1Traversal) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1t.progress = l1t.next.Progress()
l1t.log.Info("completed reset of derivation pipeline", "origin", l1t.progress.Origin)
// Reset sets the internal L1 block to the supplied base.
// Note that the next call to `NextL1Block` will return the block after `base`
// TODO: Walk one back/figure this out.
func (l1t *L1Traversal) Reset(ctx context.Context, base eth.L1BlockRef) error {
l1t.block = base
l1t.done = false
l1t.log.Info("completed reset of derivation pipeline", "origin", base)
return io.EOF
}
package derive
import (
"context"
"errors"
"io"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
)
func TestL1Traversal_Step(t *testing.T) {
// TestL1TraversalNext tests that the `Next` function only returns
// a block reference once and then properly returns io.EOF afterwards
func TestL1TraversalNext(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), nil)
// Load up the initial state with a reset
_ = tr.Reset(context.Background(), a)
// First call should always succeed
ref, err := tr.NextL1Block(context.Background())
require.Nil(t, err)
require.Equal(t, a, ref)
// Subsequent calls should return io.EOF
ref, err = tr.NextL1Block(context.Background())
require.Equal(t, eth.L1BlockRef{}, ref)
require.Equal(t, io.EOF, err)
ref, err = tr.NextL1Block(context.Background())
require.Equal(t, eth.L1BlockRef{}, ref)
require.Equal(t, io.EOF, err)
}
// TestL1TraversalAdvance tests that the `Advance` function properly
// handles different error cases and returns the expected block ref
// if there is no error.
func TestL1TraversalAdvance(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
b := testutils.NextRandomRef(rng, a)
c := testutils.NextRandomRef(rng, b)
d := testutils.NextRandomRef(rng, c)
e := testutils.NextRandomRef(rng, d)
f := testutils.RandomBlockRef(rng) // a fork, doesn't build on d
f.Number = e.Number + 1 // even though it might be the next number
l1Fetcher := &testutils.MockL1Source{}
l1Fetcher.ExpectL1BlockRefByNumber(b.Number, b, nil)
// pretend there's an RPC error
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, errors.New("rpc error - check back later"))
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, nil)
// pretend the block is not there yet for a while
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
// it will show up though
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, nil)
l1Fetcher.ExpectL1BlockRefByNumber(e.Number, e, nil)
l1Fetcher.ExpectL1BlockRefByNumber(f.Number, f, nil)
next := &MockOriginStage{progress: Progress{Origin: a, Closed: false}}
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), l1Fetcher, next)
defer l1Fetcher.AssertExpectations(t)
defer next.AssertExpectations(t)
require.NoError(t, RepeatResetStep(t, tr.ResetStep, nil, 1))
require.Equal(t, a, tr.Progress().Origin, "stage needs to adopt the origin of next stage on reset")
require.False(t, tr.Progress().Closed, "stage needs to be open after reset")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrTemporary, "expected temporary error because of RPC mock fail")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 10))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck on ethereum.NotFound on d")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 1))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck again, should get the EOF within 1 step")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrReset, "completed pipeline, until L1 input f that causes a reorg")
// x is at the same height as b but does not extend `a`
x := testutils.RandomBlockRef(rng)
x.Number = b.Number
tests := []struct {
name string
startBlock eth.L1BlockRef
nextBlock eth.L1BlockRef
fetcherErr error
expectedErr error
}{
{
name: "simple extension",
startBlock: a,
nextBlock: b,
fetcherErr: nil,
expectedErr: nil,
},
{
name: "reorg",
startBlock: a,
nextBlock: x,
fetcherErr: nil,
expectedErr: ErrReset,
},
{
name: "not found",
startBlock: a,
nextBlock: eth.L1BlockRef{},
fetcherErr: ethereum.NotFound,
expectedErr: io.EOF,
},
{
name: "temporary error",
startBlock: a,
nextBlock: eth.L1BlockRef{},
fetcherErr: errors.New("interrupted connection"),
expectedErr: ErrTemporary,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
src := &testutils.MockL1Source{}
src.ExpectL1BlockRefByNumber(test.startBlock.Number+1, test.nextBlock, test.fetcherErr)
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), src)
// Load up the initial state with a reset
_ = tr.Reset(context.Background(), test.startBlock)
// Advance it + assert output
err := tr.AdvanceL1Block(context.Background())
require.ErrorIs(t, err, test.expectedErr)
if test.expectedErr == nil {
ref, err := tr.NextL1Block(context.Background())
require.Nil(t, err)
require.Equal(t, test.nextBlock, ref)
}
src.AssertExpectations(t)
})
}
}
......@@ -28,6 +28,12 @@ type StageProgress interface {
Progress() Progress
}
type PullStage interface {
// Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to.
// TODO: Return L1 Block reference
Reset(ctx context.Context, base eth.L1BlockRef) error
}
type Stage interface {
StageProgress
......@@ -69,6 +75,7 @@ type DerivationPipeline struct {
// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
pullResetIdx int
// Index of the stage that is currently being processed.
active int
......@@ -76,6 +83,9 @@ type DerivationPipeline struct {
// stages in execution order. A stage Step that:
stages []Stage
pullStages []PullStage
traversal *L1Traversal
eng EngineQueueStage
metrics Metrics
......@@ -83,15 +93,21 @@ type DerivationPipeline struct {
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline {
eng := NewEngineQueue(log, cfg, engine, metrics)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng)
batchQueue := NewBatchQueue(log, cfg, attributesQueue)
chInReader := NewChannelInReader(log, batchQueue)
bank := NewChannelBank(log, cfg, chInReader)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher)
l1Src := NewL1Retrieval(log, dataSrc, bank)
l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src)
stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
// Pull stages
l1Traversal := NewL1Traversal(log, l1Fetcher)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
bank := NewChannelBank(log, cfg, l1Src, l1Fetcher)
chInReader := NewChannelInReader(log, bank)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)
// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue)
stages := []Stage{eng}
pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
return &DerivationPipeline{
log: log,
......@@ -100,13 +116,16 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
resetting: 0,
active: 0,
stages: stages,
pullStages: pullStages,
eng: eng,
metrics: metrics,
traversal: l1Traversal,
}
}
func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
dp.pullResetIdx = 0
}
func (dp *DerivationPipeline) Progress() Progress {
......@@ -160,7 +179,24 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
return nil
}
}
// Then reset the pull based stages
if dp.pullResetIdx < len(dp.pullStages) {
// Use the last stage's progress as the one to pull from
inner := dp.stages[len(dp.stages)-1].Progress()
// Do the reset
if err := dp.pullStages[dp.pullResetIdx].Reset(ctx, inner.Origin); err == io.EOF {
// dp.log.Debug("reset of stage completed", "stage", dp.pullResetIdx, "origin", dp.pullStages[dp.pullResetIdx].Progress().Origin)
dp.pullResetIdx += 1
return nil
} else if err != nil {
return fmt.Errorf("stage %d failed resetting: %w", dp.pullResetIdx, err)
} else {
return nil
}
}
// Lastly advance the stages
for i, stage := range dp.stages {
var outer Progress
if i+1 < len(dp.stages) {
......@@ -174,5 +210,6 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
return nil
}
}
return io.EOF
// If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx)
}
......@@ -440,6 +440,10 @@ func (s *state) eventLoop() {
} else if err != nil && errors.Is(err, derive.ErrCritical) {
s.log.Error("Derivation process critical error", "err", err)
return
} else if err != nil && errors.Is(err, derive.NotEnoughData) {
stepAttempts = 0 // don't do a backoff for this error
reqStep()
continue
} else if err != nil {
s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err)
reqStep()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment