Commit e937a7cc authored by Diederik Loerakker's avatar Diederik Loerakker Committed by GitHub

op-node: improve derivation pipeline error handling (#3212)

parent b1d5fd2c
......@@ -33,42 +33,28 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
if l2Parent.L1Origin.Number != epoch.Number {
info, _, receipts, err := dl.Fetch(ctx, epoch.Hash)
if err != nil {
return nil, NewTemporaryError(
err,
"failed to fetch L1 block info and receipts",
)
return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info and receipts: %w", err))
}
if l2Parent.L1Origin.Hash != info.ParentHash() {
return nil, NewResetError(
nil,
fmt.Sprintf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
epoch, info.ParentHash(), l2Parent.L1Origin),
)
fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
epoch, info.ParentHash(), l2Parent.L1Origin))
}
deposits, err := DeriveDeposits(receipts, cfg.DepositContractAddress)
if err != nil {
return nil, NewResetError(
err,
"failed to derive some deposits",
)
// deposits may never be ignored. Failing to process them is a critical error.
return nil, NewCriticalError(fmt.Errorf("failed to derive some deposits: %w", err))
}
l1Info = info
depositTxs = deposits
seqNumber = 0
} else {
if l2Parent.L1Origin.Hash != epoch.Hash {
return nil, NewResetError(
nil,
fmt.Sprintf("cannot create new block with L1 origin %s in conflict with L1 origin %s",
epoch, l2Parent.L1Origin),
)
return nil, NewResetError(fmt.Errorf("cannot create new block with L1 origin %s in conflict with L1 origin %s", epoch, l2Parent.L1Origin))
}
info, err := dl.InfoByHash(ctx, epoch.Hash)
if err != nil {
return nil, NewTemporaryError(
err,
"failed to fetch L1 block info",
)
return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info: %w", err))
}
l1Info = info
depositTxs = nil
......@@ -77,10 +63,7 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
l1InfoTx, err := L1InfoDepositBytes(seqNumber, l1Info)
if err != nil {
return nil, NewResetError(
err,
"failed to create l1InfoTx",
)
return nil, NewCriticalError(fmt.Errorf("failed to create l1InfoTx: %w", err))
}
txs := make([]hexutil.Bytes, 0, 1+len(depositTxs))
......
......@@ -2,7 +2,6 @@ package derive
import (
"context"
"errors"
"fmt"
"io"
"sort"
......@@ -74,9 +73,7 @@ func (bq *BatchQueue) Step(ctx context.Context, outer Progress) error {
bq.log.Trace("Out of batches")
return io.EOF
} else if err != nil {
bq.log.Error("Error deriving batches", "err", err)
// Suppress transient errors for when reporting back to the pipeline
return nil
return err
}
for _, batch := range batches {
......@@ -103,14 +100,14 @@ func (bq *BatchQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
return io.EOF
}
func (bq *BatchQueue) AddBatch(batch *BatchData) error {
func (bq *BatchQueue) AddBatch(batch *BatchData) {
if bq.progress.Closed {
panic("write batch while closed")
}
bq.log.Trace("queued batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp)
if len(bq.l1Blocks) == 0 {
return fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp)
panic(fmt.Errorf("cannot add batch with timestamp %d, no origin was prepared", batch.Timestamp))
}
bq.log.Trace("queuing batch", "origin", bq.progress.Origin, "tx_count", len(batch.Transactions), "timestamp", batch.Timestamp)
data := BatchWithL1InclusionBlock{
L1InclusionBlock: bq.progress.Origin,
......@@ -122,55 +119,45 @@ func (bq *BatchQueue) AddBatch(batch *BatchData) error {
for _, b := range batches {
if b.Batch.Timestamp == batch.Timestamp && b.Batch.Epoch() == batch.Epoch() {
bq.log.Warn("duplicate batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
return nil
return
}
}
} else {
bq.log.Debug("First seen batch", "epoch", batch.Epoch(), "timestamp", batch.Timestamp, "txs", len(batch.Transactions))
}
// May have duplicate block numbers or individual fields, but have limited complete duplicates
bq.batchesByTimestamp[batch.Timestamp] = append(batches, &data)
return nil
}
// validExtension determines if a batch follows the previous attributes
func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, prevEpoch uint64) bool {
func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, prevEpoch uint64) (valid bool, err error) {
if batch.Batch.Timestamp != prevTime+bq.config.BlockTime {
bq.log.Debug("Batch does not extend the block time properly", "time", batch.Batch.Timestamp, "prev_time", prevTime)
return false
return false, nil
}
if batch.Batch.EpochNum != rollup.Epoch(prevEpoch) && batch.Batch.EpochNum != rollup.Epoch(prevEpoch+1) {
bq.log.Debug("Batch does not extend the epoch properly", "epoch", batch.Batch.EpochNum, "prev_epoch", prevEpoch)
return false
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
l1BlockRef, err := bq.dl.L1BlockRefByNumber(ctx, batch.Batch.Epoch().Number)
cancel()
if err != nil {
bq.log.Warn("err fetching l1 block", "err", err)
if errors.Is(err, ErrTemporary) {
// Skipping validation in case of temporary RPC error
bq.log.Warn("temporary err - skipping epoch hash validation", "err", err)
return true
} else {
return false
}
return false, err
}
if l1BlockRef.Hash != batch.Batch.EpochHash {
return false
bq.log.Debug("Batch epoch hash does not match expected L1 block hash", "batch_epoch", batch.Batch.Epoch(), "expected", l1BlockRef.ID())
return false, nil
}
// Note: `Batch.EpochNum` is an external input, but it is constrained to be a reasonable size by the
// above equality checks.
if uint64(batch.Batch.EpochNum)+bq.config.SeqWindowSize < batch.L1InclusionBlock.Number {
bq.log.Debug("Batch submitted outside sequence window", "epoch", batch.Batch.EpochNum, "inclusion_block", batch.L1InclusionBlock.Number)
return false
return false, nil
}
return true
return true, nil
}
// deriveBatches pulls a single batch eagerly or a collection of batches if it is the end of
......@@ -230,15 +217,14 @@ func (bq *BatchQueue) deriveBatches(ctx context.Context, l2SafeHead eth.L2BlockR
} else {
bq.log.Trace("Trying to eagerly find batch")
var ret []*BatchData
next, err := bq.tryPopNextBatch(ctx, l2SafeHead)
if next != nil {
if err != nil {
return nil, err
} else {
bq.log.Info("found eager batch", "batch", next.Batch)
ret = append(ret, next.Batch)
return []*BatchData{next.Batch}, nil
}
return ret, err
}
}
// tryPopNextBatch tries to get the next batch from the batch queue using an eager approach.
......@@ -285,8 +271,7 @@ func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2Bloc
// Note: Don't check epoch change here, check it in `validExtension`
epoch, err := bq.dl.L1BlockRefByNumber(ctx, uint64(batch.Batch.EpochNum))
if err != nil {
bq.log.Warn("error fetching origin", "err", err)
return nil, err
return nil, NewTemporaryError(fmt.Errorf("error fetching origin: %w", err))
}
if err := ValidBatch(batch.Batch, bq.config, epoch.ID(), minL2Time, maxL2Time); err != nil {
bq.log.Warn("Invalid batch", "err", err)
......@@ -294,7 +279,9 @@ func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2Bloc
}
// We have a valid batch, no make sure that it builds off the previous L2 block
if bq.validExtension(batch, l2SafeHead.Time, l2SafeHead.L1Origin.Number) {
if valid, err := bq.validExtension(batch, l2SafeHead.Time, l2SafeHead.L1Origin.Number); err != nil {
return nil, err
} else if valid {
// Advance the epoch if needed
if l2SafeHead.L1Origin.Number != uint64(batch.Batch.EpochNum) {
bq.l1Blocks = bq.l1Blocks[1:]
......@@ -302,12 +289,12 @@ func (bq *BatchQueue) tryPopNextBatch(ctx context.Context, l2SafeHead eth.L2Bloc
// Don't leak data in the map
delete(bq.batchesByTimestamp, batch.Batch.Timestamp)
bq.log.Info("Batch was valid extension")
bq.log.Debug("Batch was valid extension")
// We have found the fist valid batch.
return batch, nil
} else {
bq.log.Info("batch was not valid extension")
bq.log.Warn("batch was not valid extension", "inclusion", batch.L1InclusionBlock, "safe_origin", l2SafeHead.L1Origin, "l2_time", l2SafeHead.Time)
}
}
......
......@@ -116,8 +116,7 @@ func TestBatchQueueEager(t *testing.T) {
// Add batches
batches := []*BatchData{b(12, l1[0]), b(14, l1[0])}
for _, batch := range batches {
err := bq.AddBatch(batch)
require.Nil(t, err)
bq.AddBatch(batch)
}
// Step
for {
......@@ -170,8 +169,7 @@ func TestBatchQueueFull(t *testing.T) {
// Add batches
batches := []*BatchData{b(14, l1[0]), b(16, l1[0]), b(18, l1[1])}
for _, batch := range batches {
err := bq.AddBatch(batch)
require.Nil(t, err)
bq.AddBatch(batch)
}
// Missing first batch
err = bq.Step(context.Background(), prevProgress)
......@@ -205,8 +203,7 @@ func TestBatchQueueFull(t *testing.T) {
// Finally add batch
firstBatch := b(12, l1[0])
err = bq.AddBatch(firstBatch)
require.Equal(t, err, nil)
bq.AddBatch(firstBatch)
// Close the origin
prevProgress.Closed = true
......@@ -271,8 +268,7 @@ func TestBatchQueueMissing(t *testing.T) {
// that batch timestamp 12 & 14 is created & 16 is used.
batches := []*BatchData{b(16, l1[0]), b(20, l1[1])}
for _, batch := range batches {
err := bq.AddBatch(batch)
require.Nil(t, err)
bq.AddBatch(batch)
}
// Missing first batch
err = bq.Step(context.Background(), prevProgress)
......
......@@ -66,29 +66,28 @@ func (ib *ChannelBank) prune() {
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.
// Then NextL1(ref) should be called to move forward to the next L1 input
func (ib *ChannelBank) IngestData(data []byte) error {
// Read() should be called repeatedly first, until everything has been read, before adding new data.\
func (ib *ChannelBank) IngestData(data []byte) {
if ib.progress.Closed {
panic("write data to bank while closed")
}
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
if len(data) < 1 {
return NewTemporaryError(
nil,
"data must be at least have a version byte, but got empty string",
)
ib.log.Warn("data must be at least have a version byte, but got empty string")
return
}
if data[0] != DerivationVersion0 {
return fmt.Errorf("unrecognized derivation version: %d", data)
ib.log.Warn("unrecognized derivation version", "version", data)
return
}
buf := bytes.NewBuffer(data[1:])
ib.prune()
if buf.Len() < minimumFrameSize {
return fmt.Errorf("data must be at least have one frame")
ib.log.Warn("data must be at least have one frame", "length", buf.Len())
return
}
// Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset.
......@@ -96,36 +95,37 @@ func (ib *ChannelBank) IngestData(data []byte) error {
// Don't try to unmarshal from an empty buffer.
// The if done checks should catch most/all of this case though.
if buf.Len() < ChannelIDDataSize+1 {
return nil
return
}
done := false
var f Frame
if err := (&f).UnmarshalBinary(buf); err == io.EOF {
done = true
} else if err != nil {
return fmt.Errorf("failed to unmarshal a frame: %w", err)
ib.log.Warn("malformed frame: %w", err)
return
}
// stop reading and ignore remaining data if we encounter a zeroed ID
// stop reading and ignore remaining data if we encounter a zeroed ID,
// this happens when there is zero padding at the end.
if f.ID == (ChannelID{}) {
ib.log.Info("empty channel ID")
return nil
ib.log.Trace("empty channel ID")
return
}
// check if the channel is not timed out
if f.ID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time {
ib.log.Info("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return nil
return
}
continue
}
// check if the channel is not included too soon (otherwise timeouts wouldn't be effective)
if f.ID.Time > ib.progress.Origin.Time {
ib.log.Info("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
ib.log.Warn("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return nil
return
}
continue
}
......@@ -137,17 +137,17 @@ func (ib *ChannelBank) IngestData(data []byte) error {
ib.channelQueue = append(ib.channelQueue, f.ID)
}
ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil {
ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
if done {
return nil
return
}
continue
}
if done {
return nil
return
}
}
}
......@@ -221,10 +221,7 @@ func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
// go back in history if we are not distant enough from the next stage
parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to find channel bank block, failed to retrieve L1 reference: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err))
}
ib.progress.Origin = parent
return nil
......
......@@ -118,8 +118,9 @@ func (tf testFrame) ToFrame() Frame {
}
func (bt *bankTestSetup) ingestData(data []byte) {
require.NoError(bt.t, bt.cb.IngestData(data))
bt.cb.IngestData(data)
}
func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
......
......@@ -19,7 +19,7 @@ type zlibReader interface {
type BatchQueueStage interface {
StageProgress
AddBatch(batch *BatchData) error
AddBatch(batch *BatchData)
}
type ChannelInReader struct {
......@@ -115,7 +115,8 @@ func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
cr.NextChannel()
return nil
}
return cr.next.AddBatch(&batch)
cr.next.AddBatch(&batch)
return nil
}
func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
......
......@@ -163,33 +163,21 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
}
fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to update forkchoice to prepare for new unsafe payload: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %v", err))
}
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
eq.unsafePayloads = eq.unsafePayloads[1:]
return NewTemporaryError(
nil,
fmt.Sprintf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)),
)
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
status, err := eq.engine.NewPayload(ctx, first)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to update insert payload: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %v", err))
}
if status.Status != eth.ExecutionValid {
eq.unsafePayloads = eq.unsafePayloads[1:]
return NewTemporaryError(
nil,
fmt.Sprintf("cannot process unsafe payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)),
)
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
eq.unsafeHead = ref
eq.unsafePayloads = eq.unsafePayloads[1:]
......@@ -220,10 +208,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
payload, err := eq.engine.PayloadByNumber(ctx, eq.safeHead.Number+1)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to get existing unsafe payload to compare against derived attributes from L1: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to get existing unsafe payload to compare against derived attributes from L1: %v", err))
}
if err := AttributesMatchBlock(eq.safeAttributes[0], eq.safeHead.Hash, payload); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err)
......@@ -232,10 +217,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
}
ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to decode L2 block ref from payload: %v", err),
)
return NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %v", err))
}
eq.safeHead = ref
// unsafe head stays the same, we did not reorg the chain.
......@@ -259,10 +241,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
payload, rpcErr, payloadErr := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true)
if rpcErr != nil {
// RPC errors are recoverable, we can retry the buffered payload attributes later.
return NewTemporaryError(
rpcErr,
fmt.Sprintf("failed to insert new block: %v", rpcErr),
)
return NewTemporaryError(fmt.Errorf("failed to insert new block: %v", rpcErr))
}
if payloadErr != nil {
eq.log.Warn("could not process payload derived from L1 data", "err", payloadErr)
......@@ -279,14 +258,11 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
eq.safeAttributes[0].Transactions = deposits
return nil
}
return NewCriticalError(payloadErr, "failed to process block with only deposit transactions")
return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", payloadErr))
}
ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to decode L2 block ref from payload: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to decode L2 block ref from payload: %v", err))
}
eq.safeHead = ref
eq.unsafeHead = ref
......@@ -299,31 +275,21 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
// ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l2Head, err := eq.engine.L2BlockRefHead(ctx)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to find the L2 Head block: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to find the L2 Head block: %w", err))
}
unsafe, safe, err := sync.FindL2Heads(ctx, l2Head, eq.cfg.SeqWindowSize, l1Fetcher, eq.engine, &eq.cfg.Genesis)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to find the L2 Heads to start from: %v", err),
)
return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
}
l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("failed to fetch the new L1 progress: origin: %v; err: %v", safe.L1Origin, err),
)
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %v", safe.L1Origin, err))
}
if safe.Time < l1Origin.Time {
return fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time)
return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time))
}
eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
eq.unsafeHead = unsafe
......@@ -333,5 +299,4 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
Closed: false,
}
return io.EOF
}
......@@ -7,6 +7,19 @@ import (
// Level is the severity level of the error.
type Level uint
func (lvl Level) String() string {
switch lvl {
case LevelTemporary:
return "temp"
case LevelReset:
return "reset"
case LevelCritical:
return "crit"
default:
return fmt.Sprintf("unknown(%d)", lvl)
}
}
// There are three levels currently, out of which only 2 are being used
// to classify error by severity. LevelTemporary
const (
......@@ -22,16 +35,15 @@ const (
// Error is a wrapper for error, description and a severity level.
type Error struct {
err error
desc string
level Level
}
// Error satisfies the error interface.
func (e Error) Error() string {
if e.err != nil {
return fmt.Errorf("%w: %s", e.err, e.desc).Error()
return fmt.Sprintf("%s: %v", e.level, e.err)
}
return e.desc
return e.level.String()
}
// Unwrap satisfies the Is/As interface.
......@@ -52,43 +64,30 @@ func (e Error) Is(target error) bool {
}
// NewError returns a custom Error.
func NewError(err error, desc string, level Level) error {
func NewError(err error, level Level) error {
return Error{
err: err,
desc: desc,
level: level,
}
}
// NewTemporaryError returns a temporary error.
func NewTemporaryError(err error, desc string) error {
return NewError(
err,
desc,
LevelTemporary,
)
func NewTemporaryError(err error) error {
return NewError(err, LevelTemporary)
}
// NewResetError returns a pipeline reset error.
func NewResetError(err error, desc string) error {
return NewError(
err,
desc,
LevelReset,
)
func NewResetError(err error) error {
return NewError(err, LevelReset)
}
// NewCriticalError returns a critical error.
func NewCriticalError(err error, desc string) error {
return NewError(
err,
desc,
LevelCritical,
)
func NewCriticalError(err error) error {
return NewError(err, LevelCritical)
}
// Sentinel errors, use these to get the severity of errors by calling
// errors.Is(err, ErrTemporary) for example.
var ErrTemporary = NewTemporaryError(nil, "temporary error")
var ErrReset = NewResetError(nil, "pipeline reset error")
var ErrCritical = NewCriticalError(nil, "critical error")
var ErrTemporary = NewTemporaryError(nil)
var ErrReset = NewResetError(nil)
var ErrCritical = NewCriticalError(nil)
......@@ -24,7 +24,7 @@ type DataAvailabilitySource interface {
type L1SourceOutput interface {
StageProgress
IngestData(data []byte) error
IngestData(data []byte)
}
type L1Retrieval struct {
......@@ -66,10 +66,7 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
if l1r.datas == nil {
datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
if err != nil {
return NewTemporaryError(
err,
fmt.Sprintf("can't fetch L1 data: %v, %v", l1r.progress.Origin, err),
)
return NewTemporaryError(fmt.Errorf("can't fetch L1 data: %v: %w", l1r.progress.Origin, err))
}
l1r.datas = datas
return nil
......@@ -79,25 +76,21 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
if l1r.data == nil {
l1r.log.Debug("fetching next piece of data")
data, err := l1r.datas.Next(ctx)
if err != nil && err == ctx.Err() {
l1r.log.Warn("context to retrieve next L1 data failed", "err", err)
return nil
} else if err == io.EOF {
if err == io.EOF {
l1r.progress.Closed = true
l1r.datas = nil
return io.EOF
} else if err != nil {
return err
return NewTemporaryError(fmt.Errorf("context to retrieve next L1 data failed: %w", err))
} else {
l1r.data = data
return nil
}
}
// try to flush the data to next stage
if err := l1r.next.IngestData(l1r.data); err != nil {
return err
}
// flush the data to next stage
l1r.next.IngestData(l1r.data)
// and nil the data, the next step will retrieve the next data
l1r.data = nil
return nil
}
......
......@@ -32,13 +32,12 @@ type MockIngestData struct {
MockOriginStage
}
func (im *MockIngestData) IngestData(data []byte) error {
out := im.Mock.MethodCalled("IngestData", data)
return *out[0].(*error)
func (im *MockIngestData) IngestData(data []byte) {
im.Mock.MethodCalled("IngestData", data)
}
func (im *MockIngestData) ExpectIngestData(data []byte, err error) {
im.Mock.On("IngestData", data).Return(&err)
func (im *MockIngestData) ExpectIngestData(data []byte) {
im.Mock.On("IngestData", data).Return()
}
var _ L1SourceOutput = (*MockIngestData)(nil)
......@@ -58,8 +57,8 @@ func TestL1Retrieval_Step(t *testing.T) {
// mock some L1 data to open for the origin that is opened by the outer stage
dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil)
next.ExpectIngestData(a, nil)
next.ExpectIngestData(b, nil)
next.ExpectIngestData(a)
next.ExpectIngestData(b)
defer dataSrc.AssertExpectations(t)
defer next.AssertExpectations(t)
......
......@@ -52,11 +52,10 @@ func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error {
l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin)
return io.EOF
} else if err != nil {
l1t.log.Warn("failed to find L1 block info by number", "number", origin.Number+1, "origin", origin, "err", err)
return nil // nil, don't make the pipeline restart if the RPC fails
return NewTemporaryError(fmt.Errorf("failed to find L1 block info by number, at origin %s next %d: %w", origin, origin.Number+1, err))
}
if l1t.progress.Origin.Hash != nextL1Origin.ParentHash {
return NewResetError(ReorgErr, fmt.Sprintf("detected L1 reorg from %s to %s", l1t.progress.Origin, nextL1Origin))
return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.progress.Origin, nextL1Origin, nextL1Origin.ParentID()))
}
l1t.progress.Origin = nextL1Origin
l1t.progress.Closed = false
......
......@@ -47,9 +47,10 @@ func TestL1Traversal_Step(t *testing.T) {
require.Equal(t, a, tr.Progress().Origin, "stage needs to adopt the origin of next stage on reset")
require.False(t, tr.Progress().Closed, "stage needs to be open after reset")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrTemporary, "expected temporary error because of RPC mock fail")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 10))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck on ethereum.NotFound on d")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 1))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck again, should get the EOF within 1 step")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ReorgErr, "completed pipeline, until L1 input f that causes a reorg")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrReset, "completed pipeline, until L1 input f that causes a reorg")
}
package derive
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
)
var ReorgErr = errors.New("reorg")
// Progress represents the progress of a derivation stage:
// the input L1 block that is being processed, and whether it's fully processed yet.
type Progress struct {
......@@ -24,12 +21,12 @@ func (pr *Progress) Update(outer Progress) (changed bool, err error) {
if pr.Closed {
if outer.Closed {
if pr.Origin.ID() != outer.Origin.ID() {
return true, NewResetError(ReorgErr, fmt.Sprintf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin))
return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin))
}
return false, nil
} else {
if pr.Origin.Hash != outer.Origin.ParentHash {
return true, NewResetError(ReorgErr, fmt.Sprintf("detected internal pipeline reorg of L1 origin data from %s to %s", pr.Origin, outer.Origin))
return true, NewResetError(fmt.Errorf("detected internal pipeline reorg of L1 origin data from %s to %s", pr.Origin, outer.Origin))
}
pr.Origin = outer.Origin
pr.Closed = false
......@@ -37,7 +34,7 @@ func (pr *Progress) Update(outer Progress) (changed bool, err error) {
}
} else {
if pr.Origin.ID() != outer.Origin.ID() {
return true, NewResetError(ReorgErr, fmt.Sprintf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin))
return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin))
}
if outer.Closed {
pr.Closed = true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment