Commit c77bf2b0 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #3596 from ethereum-optimism/jg/channel_in_reader

op-node: Switch channel bank and channel in reader to a pull based stage
parents 5aa961dd 32f09d19
......@@ -22,58 +22,60 @@ import (
// This stage can be reset by clearing it's batch buffer.
// This stage does not need to retain any references to L1 blocks.
type AttributesQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
StageProgress
}
type AttributesQueue struct {
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
next AttributesQueueOutput
progress Progress
batches []*BatchData
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
prev *BatchQueue
batch *BatchData
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput) *AttributesQueue {
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue {
return &AttributesQueue{
log: log,
config: cfg,
dl: l1Fetcher,
next: next,
prev: prev,
}
}
func (aq *AttributesQueue) AddBatch(batch *BatchData) {
aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions))
aq.batches = append(aq.batches, batch)
func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}
func (aq *AttributesQueue) Progress() Progress {
return aq.progress
}
func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := aq.progress.Update(outer); err != nil || changed {
return err
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// Get a batch if we need it
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
if err != nil {
return nil, err
}
aq.batch = batch
}
if len(aq.batches) == 0 {
return io.EOF
// Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil {
return nil, err
} else {
// Clear out the local state once we will succeed
aq.batch = nil
return attrs, nil
}
batch := aq.batches[0]
safeL2Head := aq.next.SafeL2Head()
}
// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != safeL2Head.Hash {
return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash))
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
}
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch())
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch())
if err != nil {
return err
return nil, err
}
// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
......@@ -83,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp)
// Slice off the batch once we are guaranteed to succeed
aq.batches = aq.batches[1:]
aq.next.AddSafeAttributes(attrs)
return nil
return attrs, nil
}
func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
aq.batches = aq.batches[:0]
aq.progress = aq.next.Progress()
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
return io.EOF
}
func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef {
return aq.next.SafeL2Head()
}
......@@ -2,7 +2,6 @@ package derive
import (
"context"
"io"
"math/big"
"math/rand"
"testing"
......@@ -17,29 +16,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type MockAttributesQueueOutput struct {
MockOriginStage
}
func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.MethodCalled("AddSafeAttributes", attributes)
}
func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.On("AddSafeAttributes", attributes).Once().Return()
}
func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef {
return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef)
}
func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) {
m.Mock.On("SafeL2Head").Once().Return(head)
}
var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil)
func TestAttributesQueue_Step(t *testing.T) {
// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function
// (which is well tested) and that it properly sets NoTxPool and adds in the candidate
// transactions.
func TestAttributesQueue(t *testing.T) {
// test config, only init the necessary fields
cfg := &rollup.Config{
BlockTime: 2,
......@@ -56,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) {
l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil)
out := &MockAttributesQueueOutput{}
out.progress = Progress{
Origin: l1Info.BlockRef(),
Closed: false,
}
defer out.AssertExpectations(t)
safeHead := testutils.RandomL2BlockRef(rng)
safeHead.L1Origin = l1Info.ID()
out.ExpectSafeL2Head(safeHead)
batch := &BatchData{BatchV1{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
......@@ -85,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) {
Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")},
NoTxPool: true,
}
out.ExpectAddSafeAttributes(&attrs)
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out)
require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1))
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil)
aq.AddBatch(batch)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet")
require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches")
require.Nil(t, err)
require.Equal(t, attrs, *actual)
}
......@@ -32,13 +32,18 @@ type BatchQueueOutput interface {
SafeL2Head() eth.L2BlockRef
}
type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
}
// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
log log.Logger
config *rollup.Config
next BatchQueueOutput
progress Progress
log log.Logger
config *rollup.Config
prev NextBatchProvider
origin eth.L1BlockRef
l1Blocks []eth.L1BlockRef
......@@ -47,62 +52,91 @@ type BatchQueue struct {
}
// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, next BatchQueueOutput) *BatchQueue {
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider) *BatchQueue {
return &BatchQueue{
log: log,
config: cfg,
next: next,
prev: prev,
}
}
func (bq *BatchQueue) Progress() Progress {
return bq.progress
func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
}
func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := bq.progress.Update(outer); err != nil {
return err
} else if changed {
if !bq.progress.Closed { // init inputs if we moved to a new open origin
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
func (bq *BatchQueue) NextBatch(ctx context.Context, safeL2Head eth.L2BlockRef) (*BatchData, error) {
originBehind := bq.origin.Number < safeL2Head.L1Origin.Number
// Advance origin if needed
// Note: The entire pipeline has the same origin
// We just don't accept batches prior to the L1 origin of the L2 safe head
if bq.origin != bq.prev.Origin() {
bq.origin = bq.prev.Origin()
if !originBehind {
bq.l1Blocks = append(bq.l1Blocks, bq.origin)
} else {
// This is to handle the special case of startup. At startup we call Reset & include
// the L1 origin. That is the only time where immediately after `Reset` is called
// originBehind is false.
bq.l1Blocks = bq.l1Blocks[:0]
}
return nil
bq.log.Info("Advancing bq origin", "origin", bq.origin)
}
batch, err := bq.deriveNextBatch(ctx)
if err == io.EOF {
// very noisy, commented for now, or we should bump log level from trace to debug
// bq.log.Trace("need more L1 data before deriving next batch", "progress", bq.progress.Origin)
return io.EOF
// Load more data into the batch queue
outOfData := false
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
outOfData = true
} else if err != nil {
return err
return nil, err
} else if !originBehind {
bq.AddBatch(batch, safeL2Head)
}
bq.next.AddBatch(batch)
return nil
// Skip adding data unless we are up to date with the origin, but do fully
// empty the previous stages
if originBehind {
if outOfData {
return nil, io.EOF
} else {
return nil, NotEnoughData
}
}
// Finally attempt to derive more batches
batch, err := bq.deriveNextBatch(ctx, outOfData, safeL2Head)
if err == io.EOF && outOfData {
return nil, io.EOF
} else if err == io.EOF {
return nil, NotEnoughData
} else if err != nil {
return nil, err
}
return batch, nil
}
func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.progress = bq.next.Progress()
bq.origin = base
bq.batches = make(map[uint64][]*BatchWithL1InclusionBlock)
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, bq.progress.Origin)
bq.l1Blocks = append(bq.l1Blocks, base)
return io.EOF
}
func (bq *BatchQueue) AddBatch(batch *BatchData) {
if bq.progress.Closed {
panic("write batch while closed")
}
func (bq *BatchQueue) AddBatch(batch *BatchData, l2SafeHead eth.L2BlockRef) {
if len(bq.l1Blocks) == 0 {
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
}
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.progress.Origin,
L1InclusionBlock: bq.origin,
Batch: batch,
}
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, bq.next.SafeL2Head(), &data)
validity := CheckBatch(bq.config, bq.log, bq.l1Blocks, l2SafeHead, &data)
if validity == BatchDrop {
return // if we do drop the batch, CheckBatch will log the drop reason with WARN level.
}
......@@ -113,12 +147,11 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) {
// following the validity rules imposed on consecutive batches,
// based on currently available buffered batch and L1 origin information.
// If no batch can be derived yet, then (nil, io.EOF) is returned.
func (bq *BatchQueue) deriveNextBatch(ctx context.Context) (*BatchData, error) {
func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, l2SafeHead eth.L2BlockRef) (*BatchData, error) {
if len(bq.l1Blocks) == 0 {
return nil, NewCriticalError(errors.New("cannot derive next batch, no origin was prepared"))
}
epoch := bq.l1Blocks[0]
l2SafeHead := bq.next.SafeL2Head()
if l2SafeHead.L1Origin != epoch.ID() {
return nil, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head %s", epoch, l2SafeHead))
......@@ -183,8 +216,8 @@ batchLoop:
// i.e. if the sequence window expired, we create empty batches
expiryEpoch := epoch.Number + bq.config.SeqWindowSize
forceNextEpoch :=
(expiryEpoch == bq.progress.Origin.Number && bq.progress.Closed) ||
expiryEpoch < bq.progress.Origin.Number
(expiryEpoch == bq.origin.Number && outOfData) ||
expiryEpoch < bq.origin.Number
if !forceNextEpoch {
// sequence window did not expire yet, still room to receive batches for the current epoch,
......
This diff is collapsed.
......@@ -2,7 +2,6 @@ package derive
import (
"context"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
......@@ -11,6 +10,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextDataProvider interface {
NextData(ctx context.Context) ([]byte, error)
Origin() eth.L1BlockRef
}
// ChannelBank is a stateful stage that does the following:
// 1. Unmarshalls frames from L1 transaction data
// 2. Applies those frames to a channel
......@@ -22,11 +26,6 @@ import (
// Specifically, the channel bank is not allowed to become too large between successive calls
// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large.
type ChannelBankOutput interface {
StageProgress
WriteChannel(data []byte)
}
// ChannelBank buffers channel frames, and emits full channel data
type ChannelBank struct {
log log.Logger
......@@ -35,82 +34,78 @@ type ChannelBank struct {
channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order
progress Progress
next ChannelBankOutput
prev *L1Retrieval
prev NextDataProvider
fetcher L1Fetcher
}
var _ Stage = (*ChannelBank)(nil)
var _ PullStage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput, prev *L1Retrieval) *ChannelBank {
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
channels: make(map[ChannelID]*Channel),
channelQueue: make([]ChannelID, 0, 10),
next: next,
prev: prev,
fetcher: fetcher,
}
}
func (ib *ChannelBank) Progress() Progress {
return ib.progress
func (cb *ChannelBank) Origin() eth.L1BlockRef {
return cb.prev.Origin()
}
func (ib *ChannelBank) prune() {
func (cb *ChannelBank) prune() {
// check total size
totalSize := uint64(0)
for _, ch := range ib.channels {
for _, ch := range cb.channels {
totalSize += ch.size
}
// prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there.
for totalSize > MaxChannelBankSize {
id := ib.channelQueue[0]
ch := ib.channels[id]
ib.channelQueue = ib.channelQueue[1:]
delete(ib.channels, id)
id := cb.channelQueue[0]
ch := cb.channels[id]
cb.channelQueue = cb.channelQueue[1:]
delete(cb.channels, id)
totalSize -= ch.size
}
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.\
func (ib *ChannelBank) IngestData(data []byte) {
if ib.progress.Closed {
panic("write data to bank while closed")
}
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
func (cb *ChannelBank) IngestData(data []byte) {
origin := cb.Origin()
cb.log.Debug("channel bank got new data", "origin", origin, "data_len", len(data))
// TODO: Why is the prune here?
ib.prune()
cb.prune()
frames, err := ParseFrames(data)
if err != nil {
ib.log.Warn("malformed frame", "err", err)
cb.log.Warn("malformed frame", "err", err)
return
}
// Process each frame
for _, f := range frames {
currentCh, ok := ib.channels[f.ID]
currentCh, ok := cb.channels[f.ID]
if !ok {
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, ib.progress.Origin)
ib.channels[f.ID] = currentCh
ib.channelQueue = append(ib.channelQueue, f.ID)
currentCh = NewChannel(f.ID, origin)
cb.channels[f.ID] = currentCh
cb.channelQueue = append(cb.channelQueue, f.ID)
}
// check if the channel is not timed out
if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number {
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number {
cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
continue
}
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil {
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, origin); err != nil {
cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
continue
}
}
......@@ -118,98 +113,62 @@ func (ib *ChannelBank) IngestData(data []byte) {
// Read the raw data of the first channel, if it's timed-out or closed.
// Read returns io.EOF if there is nothing new to read.
func (ib *ChannelBank) Read() (data []byte, err error) {
if len(ib.channelQueue) == 0 {
func (cb *ChannelBank) Read() (data []byte, err error) {
if len(cb.channelQueue) == 0 {
return nil, io.EOF
}
first := ib.channelQueue[0]
ch := ib.channels[first]
timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number
first := cb.channelQueue[0]
ch := cb.channels[first]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number
if timedOut {
ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
cb.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
return nil, io.EOF
}
if !ch.IsReady() {
return nil, io.EOF
}
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
r := ch.Reader()
// Suprress error here. io.ReadAll does return nil instead of io.EOF though.
data, _ = io.ReadAll(r)
return data, nil
}
// Step does the advancement for the channel bank.
// Channel bank as the first non-pull stage does it's own progress maintentance.
// When closed, it checks against the previous origin to determine if to open itself
func (ib *ChannelBank) Step(ctx context.Context, _ Progress) error {
// Open ourselves
// This is ok to do b/c we would not have yielded control to the lower stages
// of the pipeline without being completely done reading from L1.
if ib.progress.Closed {
if ib.progress.Origin != ib.prev.Origin() {
ib.progress.Closed = false
ib.progress.Origin = ib.prev.Origin()
return nil
}
}
skipIngest := ib.next.Progress().Origin.Number > ib.progress.Origin.Number
outOfData := false
if data, err := ib.prev.NextData(ctx); err == io.EOF {
outOfData = true
// NextData pulls the next piece of data from the channel bank.
// Note that it attempts to pull data out of the channel bank prior to
// loading data in (unlike most other stages). This is to ensure maintain
// consistency around channel bank pruning which depends upon the order
// of operations.
func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) {
// Do the read from the channel bank first
data, err := cb.Read()
if err == io.EOF {
// continue - We will attempt to load data into the channel bank
} else if err != nil {
return err
return nil, err
} else {
ib.IngestData(data)
return data, nil
}
// otherwise, read the next channel data from the bank
data, err := ib.Read()
if err == io.EOF { // need new L1 data in the bank before we can read more channel data
if outOfData {
if !ib.progress.Closed {
ib.progress.Closed = true
return nil
}
return io.EOF
} else {
return nil
}
// Then load data into the channel bank
if data, err := cb.prev.NextData(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return err
return nil, err
} else {
if !skipIngest {
ib.next.WriteChannel(data)
return nil
}
cb.IngestData(data)
return nil, NotEnoughData
}
return nil
}
// ResetStep walks back the L1 chain, starting at the origin of the next stage,
// to find the origin that the channel bank should be reset to,
// to get consistent reads starting at origin.
// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin,
// so it is not relevant to replay it into the bank.
func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
ib.progress = ib.next.Progress()
ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin)
// go back in history if we are not distant enough from the next stage
resetBlock := ib.progress.Origin.Number - ib.cfg.ChannelTimeout
if ib.progress.Origin.Number < ib.cfg.ChannelTimeout {
resetBlock = 0 // don't underflow
}
parent, err := l1Fetcher.L1BlockRefByNumber(ctx, resetBlock)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err))
}
ib.progress.Origin = parent
func (cb *ChannelBank) Reset(ctx context.Context, base eth.L1BlockRef) error {
cb.channels = make(map[ChannelID]*Channel)
cb.channelQueue = make([]ChannelID, 0, 10)
return io.EOF
}
......
This diff is collapsed.
......@@ -5,6 +5,7 @@ import (
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log"
)
......@@ -13,41 +14,36 @@ import (
// This is a pure function from the channel, but each channel (or channel fragment)
// must be tagged with an L1 inclusion block to be passed to the the batch queue.
type BatchQueueStage interface {
StageProgress
AddBatch(batch *BatchData)
}
type ChannelInReader struct {
log log.Logger
nextBatchFn func() (BatchWithL1InclusionBlock, error)
progress Progress
next BatchQueueStage
prev *ChannelBank
}
var _ ChannelBankOutput = (*ChannelInReader)(nil)
var _ PullStage = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader {
return &ChannelInReader{log: log, next: next}
func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader {
return &ChannelInReader{
log: log,
prev: prev,
}
}
func (cr *ChannelInReader) Progress() Progress {
return cr.progress
func (cr *ChannelInReader) Origin() eth.L1BlockRef {
return cr.prev.Origin()
}
// TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) {
if cr.progress.Closed {
panic("write channel while closed")
}
if f, err := BatchReader(bytes.NewBuffer(data), cr.progress.Origin); err == nil {
func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil {
cr.nextBatchFn = f
return nil
} else {
cr.log.Error("Error creating batch reader from channel data", "err", err)
return err
}
}
......@@ -57,32 +53,37 @@ func (cr *ChannelInReader) NextChannel() {
cr.nextBatchFn = nil
}
func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
if changed, err := cr.progress.Update(outer); err != nil || changed {
return err
}
// NextBatch pulls out the next batch from the channel if it has it.
// It returns io.EOF when it cannot make any more progress.
// It will return a temporary error if it needs to be called again to advance some internal state.
func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
if cr.nextBatchFn == nil {
return io.EOF
if data, err := cr.prev.NextData(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return nil, err
} else {
if err := cr.WriteChannel(data); err != nil {
return nil, NewTemporaryError(err)
}
}
}
// TODO: can batch be non nil while err == io.EOF
// This depends on the behavior of rlp.Stream
batch, err := cr.nextBatchFn()
if err == io.EOF {
return io.EOF
cr.NextChannel()
return nil, NotEnoughData
} else if err != nil {
cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err)
cr.NextChannel()
return nil
return nil, NotEnoughData
}
cr.next.AddBatch(batch.Batch)
return nil
return batch.Batch, nil
}
func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef) error {
cr.nextBatchFn = nil
cr.progress = cr.next.Progress()
return io.EOF
}
......@@ -17,6 +17,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextAttributesProvider interface {
Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error)
}
type Engine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
......@@ -64,8 +69,6 @@ type EngineQueue struct {
finalizedL1 eth.BlockID
progress Progress
safeAttributes []*eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
......@@ -73,14 +76,15 @@ type EngineQueue struct {
finalityData []FinalityData
engine Engine
prev NextAttributesProvider
progress Progress // only used for pipeline resets
metrics Metrics
}
var _ AttributesQueueOutput = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
......@@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
},
prev: prev,
}
}
......@@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 {
return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}
func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := eq.progress.Update(outer); err != nil || changed {
return err
}
func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error {
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
outOfData := false
if len(eq.safeAttributes) == 0 {
if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
outOfData = true
} else if err != nil {
return err
} else {
eq.safeAttributes = append(eq.safeAttributes, next)
return NotEnoughData
}
}
if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx)
}
return io.EOF
if outOfData {
return io.EOF
} else {
return nil
}
}
// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
......@@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
}
// remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number {
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead,
L1Block: eq.progress.Origin.ID(),
L1Block: eq.prev.Origin().ID(),
})
} else {
// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
......@@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time,
"l1_derived", eq.progress.Origin,
"l1_derived", eq.prev.Origin(),
)
}
......@@ -398,6 +416,15 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time))
}
pipelineNumber := l1Origin.Number - eq.cfg.ChannelTimeout
if l1Origin.Number < eq.cfg.ChannelTimeout {
pipelineNumber = 0
}
pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err))
}
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.unsafeHead = unsafe
eq.safeHead = safe
......@@ -405,8 +432,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
eq.finalityData = eq.finalityData[:0]
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{
Origin: l1Origin,
Closed: false,
Origin: pipelineOrigin,
}
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
......
package derive
import (
"context"
"io"
"math/rand"
"testing"
......@@ -14,6 +16,20 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type fakeAttributesQueue struct {
origin eth.L1BlockRef
}
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
return nil, io.EOF
}
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
func TestEngineQueue_Finalize(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
......@@ -209,9 +225,12 @@ func TestEngineQueue_Finalize(t *testing.T) {
// and we fetch the L1 origin of that as starting point for engine queue
l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil)
l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil)
prev := &fakeAttributesQueue{}
eq := NewEngineQueue(logger, cfg, eng, metrics)
require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20))
eq := NewEngineQueue(logger, cfg, eng, metrics, prev)
require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B")
......@@ -219,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) {
// now say C1 was included in D and became the new safe head
eq.progress.Origin = refD
prev.origin = refD
eq.safeHead = refC1
eq.postProcessSafeL2()
// now say D0 was included in E and became the new safe head
eq.progress.Origin = refE
prev.origin = refE
eq.safeHead = refD0
eq.postProcessSafeL2()
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD.ID())
// Now a few steps later, without consuming any additional L1 inputs,
// we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C
require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10))
require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")
l1F.AssertExpectations(t)
......
package derive
import (
"errors"
"fmt"
)
......@@ -91,3 +92,7 @@ func NewCriticalError(err error) error {
var ErrTemporary = NewTemporaryError(nil)
var ErrReset = NewResetError(nil)
var ErrCritical = NewCriticalError(nil)
// NotEnoughData implies that the function currently does not have enough data to progress
// but if it is retried enough times, it will eventually return a real value or io.EOF
var NotEnoughData = errors.New("not enough data")
......@@ -98,16 +98,16 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Traversal := NewL1Traversal(log, l1Fetcher)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
bank := NewChannelBank(log, cfg, l1Src, l1Fetcher)
chInReader := NewChannelInReader(log, bank)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)
// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng)
batchQueue := NewBatchQueue(log, cfg, attributesQueue)
chInReader := NewChannelInReader(log, batchQueue)
bank := NewChannelBank(log, cfg, chInReader, l1Src)
stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank}
pullStages := []PullStage{l1Src, l1Traversal}
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue)
stages := []Stage{eng}
pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
return &DerivationPipeline{
log: log,
......
......@@ -440,6 +440,10 @@ func (s *state) eventLoop() {
} else if err != nil && errors.Is(err, derive.ErrCritical) {
s.log.Error("Derivation process critical error", "err", err)
return
} else if err != nil && errors.Is(err, derive.NotEnoughData) {
stepAttempts = 0 // don't do a backoff for this error
reqStep()
continue
} else if err != nil {
s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err)
reqStep()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment