Commit faf2e493 authored by Javed Khan's avatar Javed Khan Committed by GitHub

pkg driver: use derive error types to optionally retry with backoff (#3130)

* pkg derive: wrap errors by severity

* pkg derive: wrap errors by severity

* pkg derive: fix call with 2 return values

* ci: fix golangci-lint warnings

* ci: fix attributes test

* pkg derive: update tests, restore err types

* driver: update to use err severity from derive

* op-node: fix retry callers

* pkg derive: reimplement the custom error type

* pkg derive: error - nits

* pkg driver: update Step in event loop to backoff

* pkg derive: match epoch hash with l1 block hash

* pkg driver: handle backoff error

* pkg driver: handle temporary errs with backoff

also include additional if / else statements to log errors

* op-node: req next step, with backoff, without blocking other events

* pkg derive: update error types

* pkg derive: typo
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent dcd715a6
...@@ -22,7 +22,7 @@ type L1ReceiptsFetcher interface { ...@@ -22,7 +22,7 @@ type L1ReceiptsFetcher interface {
// by setting NoTxPool=false as sequencer, or by appending batch transactions as verifier. // by setting NoTxPool=false as sequencer, or by appending batch transactions as verifier.
// The severity of the error is returned; a crit=false error means there was a temporary issue, like a failed RPC or time-out. // The severity of the error is returned; a crit=false error means there was a temporary issue, like a failed RPC or time-out.
// A crit=true error means the input arguments are inconsistent or invalid. // A crit=true error means the input arguments are inconsistent or invalid.
func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1ReceiptsFetcher, l2Parent eth.L2BlockRef, timestamp uint64, epoch eth.BlockID) (attrs *eth.PayloadAttributes, crit bool, err error) { func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1ReceiptsFetcher, l2Parent eth.L2BlockRef, timestamp uint64, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) {
var l1Info eth.L1Info var l1Info eth.L1Info
var depositTxs []hexutil.Bytes var depositTxs []hexutil.Bytes
var seqNumber uint64 var seqNumber uint64
...@@ -33,25 +33,42 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece ...@@ -33,25 +33,42 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
if l2Parent.L1Origin.Number != epoch.Number { if l2Parent.L1Origin.Number != epoch.Number {
info, _, receipts, err := dl.Fetch(ctx, epoch.Hash) info, _, receipts, err := dl.Fetch(ctx, epoch.Hash)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to fetch L1 block info and receipts: %w", err) return nil, NewTemporaryError(
err,
"failed to fetch L1 block info and receipts",
)
} }
if l2Parent.L1Origin.Hash != info.ParentHash() { if l2Parent.L1Origin.Hash != info.ParentHash() {
return nil, true, fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s", epoch, info.ParentHash(), l2Parent.L1Origin) return nil, NewResetError(
nil,
fmt.Sprintf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
epoch, info.ParentHash(), l2Parent.L1Origin),
)
} }
deposits, err := DeriveDeposits(receipts, cfg.DepositContractAddress) deposits, err := DeriveDeposits(receipts, cfg.DepositContractAddress)
if err != nil { if err != nil {
return nil, true, fmt.Errorf("failed to derive some deposits: %w", err) return nil, NewResetError(
err,
"failed to derive some deposits",
)
} }
l1Info = info l1Info = info
depositTxs = deposits depositTxs = deposits
seqNumber = 0 seqNumber = 0
} else { } else {
if l2Parent.L1Origin.Hash != epoch.Hash { if l2Parent.L1Origin.Hash != epoch.Hash {
return nil, true, fmt.Errorf("cannot create new block with L1 origin %s in conflict with L1 origin %s", epoch, l2Parent.L1Origin) return nil, NewResetError(
nil,
fmt.Sprintf("cannot create new block with L1 origin %s in conflict with L1 origin %s",
epoch, l2Parent.L1Origin),
)
} }
info, err := dl.InfoByHash(ctx, epoch.Hash) info, err := dl.InfoByHash(ctx, epoch.Hash)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to fetch L1 block info: %w", err) return nil, NewTemporaryError(
err,
"failed to fetch L1 block info",
)
} }
l1Info = info l1Info = info
depositTxs = nil depositTxs = nil
...@@ -60,7 +77,10 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece ...@@ -60,7 +77,10 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
l1InfoTx, err := L1InfoDepositBytes(seqNumber, l1Info) l1InfoTx, err := L1InfoDepositBytes(seqNumber, l1Info)
if err != nil { if err != nil {
return nil, true, fmt.Errorf("failed to create l1InfoTx: %w", err) return nil, NewResetError(
err,
"failed to create l1InfoTx",
)
} }
txs := make([]hexutil.Bytes, 0, 1+len(depositTxs)) txs := make([]hexutil.Bytes, 0, 1+len(depositTxs))
...@@ -73,5 +93,5 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece ...@@ -73,5 +93,5 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
SuggestedFeeRecipient: cfg.FeeRecipientAddress, SuggestedFeeRecipient: cfg.FeeRecipientAddress,
Transactions: txs, Transactions: txs,
NoTxPool: true, NoTxPool: true,
}, false, nil }, nil
} }
...@@ -2,7 +2,6 @@ package derive ...@@ -2,7 +2,6 @@ package derive
import ( import (
"context" "context"
"fmt"
"io" "io"
"time" "time"
...@@ -55,14 +54,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error { ...@@ -55,14 +54,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second) fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
attrs, crit, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, aq.next.SafeL2Head(), batch.Timestamp, batch.Epoch()) attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, aq.next.SafeL2Head(), batch.Timestamp, batch.Epoch())
if err != nil { if err != nil {
if crit { return err
return fmt.Errorf("failed to prepare payload attributes for batch: %v", err)
} else {
aq.log.Error("temporarily failing to prepare payload attributes for batch", "err", err)
return nil
}
} }
// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool // we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
......
...@@ -36,9 +36,9 @@ func TestPreparePayloadAttributes(t *testing.T) { ...@@ -36,9 +36,9 @@ func TestPreparePayloadAttributes(t *testing.T) {
l1Info.InfoNum = l2Parent.L1Origin.Number + 1 l1Info.InfoNum = l2Parent.L1Origin.Number + 1
epoch := l1Info.ID() epoch := l1Info.ID()
l1Fetcher.ExpectFetch(epoch.Hash, l1Info, nil, nil, nil) l1Fetcher.ExpectFetch(epoch.Hash, l1Info, nil, nil, nil)
_, crit, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch) _, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch)
require.NotNil(t, err, "inconsistent L1 origin error expected") require.NotNil(t, err, "inconsistent L1 origin error expected")
require.True(t, crit, "inconsistent L1 origin transition must be handled like a critical error with reorg") require.ErrorIs(t, err, ErrReset, "inconsistent L1 origin transition must be handled like a critical error with reorg")
}) })
t.Run("inconsistent equal height origin", func(t *testing.T) { t.Run("inconsistent equal height origin", func(t *testing.T) {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
...@@ -49,9 +49,9 @@ func TestPreparePayloadAttributes(t *testing.T) { ...@@ -49,9 +49,9 @@ func TestPreparePayloadAttributes(t *testing.T) {
l1Info := testutils.RandomL1Info(rng) l1Info := testutils.RandomL1Info(rng)
l1Info.InfoNum = l2Parent.L1Origin.Number l1Info.InfoNum = l2Parent.L1Origin.Number
epoch := l1Info.ID() epoch := l1Info.ID()
_, crit, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch) _, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch)
require.NotNil(t, err, "inconsistent L1 origin error expected") require.NotNil(t, err, "inconsistent L1 origin error expected")
require.True(t, crit, "inconsistent L1 origin transition must be handled like a critical error with reorg") require.ErrorIs(t, err, ErrReset, "inconsistent L1 origin transition must be handled like a critical error with reorg")
}) })
t.Run("rpc fail Fetch", func(t *testing.T) { t.Run("rpc fail Fetch", func(t *testing.T) {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
...@@ -63,9 +63,9 @@ func TestPreparePayloadAttributes(t *testing.T) { ...@@ -63,9 +63,9 @@ func TestPreparePayloadAttributes(t *testing.T) {
epoch.Number += 1 epoch.Number += 1
mockRPCErr := errors.New("mock rpc error") mockRPCErr := errors.New("mock rpc error")
l1Fetcher.ExpectFetch(epoch.Hash, nil, nil, nil, mockRPCErr) l1Fetcher.ExpectFetch(epoch.Hash, nil, nil, nil, mockRPCErr)
_, crit, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch) _, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch)
require.ErrorIs(t, err, mockRPCErr, "mock rpc error expected") require.ErrorIs(t, err, mockRPCErr, "mock rpc error expected")
require.False(t, crit, "rpc errors should not be critical, it is not necessary to reorg") require.ErrorIs(t, err, ErrTemporary, "rpc errors should not be critical, it is not necessary to reorg")
}) })
t.Run("rpc fail InfoByHash", func(t *testing.T) { t.Run("rpc fail InfoByHash", func(t *testing.T) {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
...@@ -76,9 +76,9 @@ func TestPreparePayloadAttributes(t *testing.T) { ...@@ -76,9 +76,9 @@ func TestPreparePayloadAttributes(t *testing.T) {
epoch := l2Parent.L1Origin epoch := l2Parent.L1Origin
mockRPCErr := errors.New("mock rpc error") mockRPCErr := errors.New("mock rpc error")
l1Fetcher.ExpectInfoByHash(epoch.Hash, nil, mockRPCErr) l1Fetcher.ExpectInfoByHash(epoch.Hash, nil, mockRPCErr)
_, crit, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch) _, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch)
require.ErrorIs(t, err, mockRPCErr, "mock rpc error expected") require.ErrorIs(t, err, mockRPCErr, "mock rpc error expected")
require.False(t, crit, "rpc errors should not be critical, it is not necessary to reorg") require.ErrorIs(t, err, ErrTemporary, "rpc errors should not be critical, it is not necessary to reorg")
}) })
t.Run("next origin without deposits", func(t *testing.T) { t.Run("next origin without deposits", func(t *testing.T) {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
...@@ -93,9 +93,8 @@ func TestPreparePayloadAttributes(t *testing.T) { ...@@ -93,9 +93,8 @@ func TestPreparePayloadAttributes(t *testing.T) {
l1InfoTx, err := L1InfoDepositBytes(0, l1Info) l1InfoTx, err := L1InfoDepositBytes(0, l1Info)
require.NoError(t, err) require.NoError(t, err)
l1Fetcher.ExpectFetch(epoch.Hash, l1Info, nil, nil, nil) l1Fetcher.ExpectFetch(epoch.Hash, l1Info, nil, nil, nil)
attrs, crit, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch) attrs, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch)
require.NoError(t, err) require.NoError(t, err)
require.False(t, crit)
require.NotNil(t, attrs) require.NotNil(t, attrs)
require.Equal(t, l2Parent.Time+cfg.BlockTime, uint64(attrs.Timestamp)) require.Equal(t, l2Parent.Time+cfg.BlockTime, uint64(attrs.Timestamp))
require.Equal(t, eth.Bytes32(l1Info.InfoMixDigest), attrs.PrevRandao) require.Equal(t, eth.Bytes32(l1Info.InfoMixDigest), attrs.PrevRandao)
...@@ -132,9 +131,8 @@ func TestPreparePayloadAttributes(t *testing.T) { ...@@ -132,9 +131,8 @@ func TestPreparePayloadAttributes(t *testing.T) {
// txs are ignored, API is a bit bloated to previous approach. Only l1Info and receipts matter. // txs are ignored, API is a bit bloated to previous approach. Only l1Info and receipts matter.
l1Txs := make(types.Transactions, len(receipts)) l1Txs := make(types.Transactions, len(receipts))
l1Fetcher.ExpectFetch(epoch.Hash, l1Info, l1Txs, receipts, nil) l1Fetcher.ExpectFetch(epoch.Hash, l1Info, l1Txs, receipts, nil)
attrs, crit, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch) attrs, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch)
require.NoError(t, err) require.NoError(t, err)
require.False(t, crit)
require.NotNil(t, attrs) require.NotNil(t, attrs)
require.Equal(t, l2Parent.Time+cfg.BlockTime, uint64(attrs.Timestamp)) require.Equal(t, l2Parent.Time+cfg.BlockTime, uint64(attrs.Timestamp))
require.Equal(t, eth.Bytes32(l1Info.InfoMixDigest), attrs.PrevRandao) require.Equal(t, eth.Bytes32(l1Info.InfoMixDigest), attrs.PrevRandao)
...@@ -158,9 +156,8 @@ func TestPreparePayloadAttributes(t *testing.T) { ...@@ -158,9 +156,8 @@ func TestPreparePayloadAttributes(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
l1Fetcher.ExpectInfoByHash(epoch.Hash, l1Info, nil) l1Fetcher.ExpectInfoByHash(epoch.Hash, l1Info, nil)
attrs, crit, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch) attrs, err := PreparePayloadAttributes(context.Background(), cfg, l1Fetcher, l2Parent, l2Time, epoch)
require.NoError(t, err) require.NoError(t, err)
require.False(t, crit)
require.NotNil(t, attrs) require.NotNil(t, attrs)
require.Equal(t, l2Parent.Time+cfg.BlockTime, uint64(attrs.Timestamp)) require.Equal(t, l2Parent.Time+cfg.BlockTime, uint64(attrs.Timestamp))
require.Equal(t, eth.Bytes32(l1Info.InfoMixDigest), attrs.PrevRandao) require.Equal(t, eth.Bytes32(l1Info.InfoMixDigest), attrs.PrevRandao)
......
...@@ -2,9 +2,11 @@ package derive ...@@ -2,9 +2,11 @@ package derive
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"sort" "sort"
"time"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -144,7 +146,23 @@ func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime, ...@@ -144,7 +146,23 @@ func (bq *BatchQueue) validExtension(batch *BatchWithL1InclusionBlock, prevTime,
return false return false
} }
// TODO: Also check EpochHash (hard b/c maybe extension) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
l1BlockRef, err := bq.dl.L1BlockRefByNumber(ctx, batch.Batch.Epoch().Number)
cancel()
if err != nil {
bq.log.Warn("err fetching l1 block", "err", err)
if errors.Is(err, ErrTemporary) {
// Skipping validation in case of temporary RPC error
bq.log.Warn("temporary err - skipping epoch hash validation", "err", err)
return true
} else {
return false
}
}
if l1BlockRef.Hash != batch.Batch.EpochHash {
return false
}
// Note: `Batch.EpochNum` is an external input, but it is constrained to be a reasonable size by the // Note: `Batch.EpochNum` is an external input, but it is constrained to be a reasonable size by the
// above equality checks. // above equality checks.
......
...@@ -74,8 +74,10 @@ func (ib *ChannelBank) IngestData(data []byte) error { ...@@ -74,8 +74,10 @@ func (ib *ChannelBank) IngestData(data []byte) error {
} }
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data)) ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
if len(data) < 1 { if len(data) < 1 {
ib.log.Error("data must be at least have a version byte, but got empty string") return NewTemporaryError(
return nil nil,
"data must be at least have a version byte, but got empty string",
)
} }
if data[0] != DerivationVersion0 { if data[0] != DerivationVersion0 {
...@@ -219,8 +221,10 @@ func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error ...@@ -219,8 +221,10 @@ func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
// go back in history if we are not distant enough from the next stage // go back in history if we are not distant enough from the next stage
parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash) parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash)
if err != nil { if err != nil {
ib.log.Error("failed to find channel bank block, failed to retrieve L1 reference", "err", err) return NewTemporaryError(
return nil err,
fmt.Sprintf("failed to find channel bank block, failed to retrieve L1 reference: %v", err),
)
} }
ib.progress.Origin = parent ib.progress.Origin = parent
return nil return nil
......
...@@ -163,23 +163,33 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -163,23 +163,33 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
} }
fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil) fcRes, err := eq.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
eq.log.Error("failed to update forkchoice to prepare for new unsafe payload", "err", err) return NewTemporaryError(
return nil // we can try again later err,
fmt.Sprintf("failed to update forkchoice to prepare for new unsafe payload: %v", err),
)
} }
if fcRes.PayloadStatus.Status != eth.ExecutionValid { if fcRes.PayloadStatus.Status != eth.ExecutionValid {
eq.log.Error("cannot prepare unsafe chain for new payload", "new", first.ID(), "parent", first.ParentID(), "err", eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))
eq.unsafePayloads = eq.unsafePayloads[1:] eq.unsafePayloads = eq.unsafePayloads[1:]
return nil return NewTemporaryError(
nil,
fmt.Sprintf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)),
)
} }
status, err := eq.engine.NewPayload(ctx, first) status, err := eq.engine.NewPayload(ctx, first)
if err != nil { if err != nil {
eq.log.Error("failed to update insert payload", "err", err) return NewTemporaryError(
return nil // we can try again later err,
fmt.Sprintf("failed to update insert payload: %v", err),
)
} }
if status.Status != eth.ExecutionValid { if status.Status != eth.ExecutionValid {
eq.log.Error("cannot process unsafe payload", "new", first.ID(), "parent", first.ParentID(), "err", eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))
eq.unsafePayloads = eq.unsafePayloads[1:] eq.unsafePayloads = eq.unsafePayloads[1:]
return nil return NewTemporaryError(
nil,
fmt.Sprintf("cannot process unsafe payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)),
)
} }
eq.unsafeHead = ref eq.unsafeHead = ref
eq.unsafePayloads = eq.unsafePayloads[1:] eq.unsafePayloads = eq.unsafePayloads[1:]
...@@ -210,8 +220,10 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error ...@@ -210,8 +220,10 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
payload, err := eq.engine.PayloadByNumber(ctx, eq.safeHead.Number+1) payload, err := eq.engine.PayloadByNumber(ctx, eq.safeHead.Number+1)
if err != nil { if err != nil {
eq.log.Error("failed to get existing unsafe payload to compare against derived attributes from L1", "err", err) return NewTemporaryError(
return nil err,
fmt.Sprintf("failed to get existing unsafe payload to compare against derived attributes from L1: %v", err),
)
} }
if err := AttributesMatchBlock(eq.safeAttributes[0], eq.safeHead.Hash, payload); err != nil { if err := AttributesMatchBlock(eq.safeAttributes[0], eq.safeHead.Hash, payload); err != nil {
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err) eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1", "err", err)
...@@ -220,8 +232,10 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error ...@@ -220,8 +232,10 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
} }
ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis) ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
if err != nil { if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err) return NewTemporaryError(
return nil err,
fmt.Sprintf("failed to decode L2 block ref from payload: %v", err),
)
} }
eq.safeHead = ref eq.safeHead = ref
// unsafe head stays the same, we did not reorg the chain. // unsafe head stays the same, we did not reorg the chain.
...@@ -245,8 +259,10 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -245,8 +259,10 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
payload, rpcErr, payloadErr := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true) payload, rpcErr, payloadErr := InsertHeadBlock(ctx, eq.log, eq.engine, fc, attrs, true)
if rpcErr != nil { if rpcErr != nil {
// RPC errors are recoverable, we can retry the buffered payload attributes later. // RPC errors are recoverable, we can retry the buffered payload attributes later.
eq.log.Error("failed to insert new block", "err", rpcErr) return NewTemporaryError(
return nil rpcErr,
fmt.Sprintf("failed to insert new block: %v", rpcErr),
)
} }
if payloadErr != nil { if payloadErr != nil {
eq.log.Warn("could not process payload derived from L1 data", "err", payloadErr) eq.log.Warn("could not process payload derived from L1 data", "err", payloadErr)
...@@ -267,8 +283,10 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -267,8 +283,10 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
} }
ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis) ref, err := PayloadToBlockRef(payload, &eq.cfg.Genesis)
if err != nil { if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err) return NewTemporaryError(
return nil err,
fmt.Sprintf("failed to decode L2 block ref from payload: %v", err),
)
} }
eq.safeHead = ref eq.safeHead = ref
eq.unsafeHead = ref eq.unsafeHead = ref
...@@ -284,18 +302,24 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error ...@@ -284,18 +302,24 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
l2Head, err := eq.engine.L2BlockRefHead(ctx) l2Head, err := eq.engine.L2BlockRefHead(ctx)
if err != nil { if err != nil {
eq.log.Error("failed to find the L2 Head block", "err", err) return NewTemporaryError(
return nil err,
fmt.Sprintf("failed to find the L2 Head block: %v", err),
)
} }
unsafe, safe, err := sync.FindL2Heads(ctx, l2Head, eq.cfg.SeqWindowSize, l1Fetcher, eq.engine, &eq.cfg.Genesis) unsafe, safe, err := sync.FindL2Heads(ctx, l2Head, eq.cfg.SeqWindowSize, l1Fetcher, eq.engine, &eq.cfg.Genesis)
if err != nil { if err != nil {
eq.log.Error("failed to find the L2 Heads to start from", "err", err) return NewTemporaryError(
return nil err,
fmt.Sprintf("failed to find the L2 Heads to start from: %v", err),
)
} }
l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash) l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
if err != nil { if err != nil {
eq.log.Error("failed to fetch the new L1 progress", "err", err, "origin", safe.L1Origin) return NewTemporaryError(
return nil err,
fmt.Sprintf("failed to fetch the new L1 progress: origin: %v; err: %v", safe.L1Origin, err),
)
} }
if safe.Time < l1Origin.Time { if safe.Time < l1Origin.Time {
return fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken", return fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
......
package derive
import (
"fmt"
)
// Level is the severity level of the error.
type Level uint
// There are three levels currently, out of which only 2 are being used
// to classify error by severity. LevelTemporary
const (
// LevelTemporary is a temporary error for example due to an RPC or
// connection issue, and can be safely ignored and retried by the caller
LevelTemporary Level = iota
// LevelReset is a pipeline reset error. It must be treated like a reorg.
LevelReset
// LevelCritical is a critical error.
LevelCritical
)
// Error is a wrapper for error, description and a severity level.
type Error struct {
err error
desc string
level Level
}
// Error satisfies the error interface.
func (e Error) Error() string {
if e.err != nil {
return fmt.Errorf("%w: %s", e.err, e.desc).Error()
}
return e.desc
}
// Unwrap satisfies the Is/As interface.
func (e Error) Unwrap() error {
return e.err
}
// Is satisfies the error Unwrap interface.
func (e Error) Is(target error) bool {
if target == nil {
return e == target
}
err, ok := target.(Error)
if !ok {
return false
}
return e.level == err.level
}
// NewError returns a custom Error.
func NewError(err error, desc string, level Level) error {
return Error{
err: err,
desc: desc,
level: level,
}
}
// NewTemporaryError returns a temporary error.
func NewTemporaryError(err error, desc string) error {
return NewError(
err,
desc,
LevelTemporary,
)
}
// NewResetError returns a pipeline reset error.
func NewResetError(err error, desc string) error {
return NewError(
err,
desc,
LevelReset,
)
}
// NewCriticalError returns a critical error.
func NewCriticalError(err error, desc string) error {
return NewError(
err,
desc,
LevelCritical,
)
}
// Sentinel errors, use these to get the severity of errors by calling
// errors.Is(err, ErrTemporary) for example.
var ErrTemporary = NewTemporaryError(nil, "temporary error")
var ErrReset = NewResetError(nil, "pipeline reset error")
var ErrCritical = NewCriticalError(nil, "critical error")
...@@ -2,6 +2,7 @@ package derive ...@@ -2,6 +2,7 @@ package derive
import ( import (
"context" "context"
"fmt"
"io" "io"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
...@@ -65,8 +66,10 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error { ...@@ -65,8 +66,10 @@ func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
if l1r.datas == nil { if l1r.datas == nil {
datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID()) datas, err := l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
if err != nil { if err != nil {
l1r.log.Error("can't fetch L1 data", "origin", l1r.progress.Origin, "err", err) return NewTemporaryError(
return nil err,
fmt.Sprintf("can't fetch L1 data: %v, %v", l1r.progress.Origin, err),
)
} }
l1r.datas = datas l1r.datas = datas
return nil return nil
......
...@@ -3,14 +3,17 @@ package driver ...@@ -3,14 +3,17 @@ package driver
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
gosync "sync" gosync "sync"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/backoff"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -290,8 +293,15 @@ func (s *state) eventLoop() { ...@@ -290,8 +293,15 @@ func (s *state) eventLoop() {
} }
} }
// reqStep requests a derivation step to be taken. Won't deadlock if the channel is full. // channel, nil by default (not firing), but used to schedule re-attempts with delay
reqStep := func() { var delayedStepReq <-chan time.Time
// keep track of consecutive failed attempts, to adjust the backoff time accordingly
bOffStrategy := backoff.Exponential()
stepAttempts := 0
// step requests a derivation step to be taken. Won't deadlock if the channel is full.
step := func() {
select { select {
case stepReqCh <- struct{}{}: case stepReqCh <- struct{}{}:
// Don't deadlock if the channel is already full // Don't deadlock if the channel is already full
...@@ -299,6 +309,22 @@ func (s *state) eventLoop() { ...@@ -299,6 +309,22 @@ func (s *state) eventLoop() {
} }
} }
// reqStep requests a derivation step nicely, with a delay if this is a reattempt, or not at all if we already scheduled a reattempt.
reqStep := func() {
if stepAttempts > 0 {
// if this is not the first attempt, we re-schedule with a backoff, *without blocking other events*
if delayedStepReq == nil {
delay := bOffStrategy.Duration(stepAttempts)
s.log.Debug("scheduling re-attempt with delay", "attempts", stepAttempts, "delay", delay)
delayedStepReq = time.After(delay)
} else {
s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", stepAttempts)
}
} else {
step()
}
}
// We call reqStep right away to finish syncing to the tip of the chain if we're behind. // We call reqStep right away to finish syncing to the tip of the chain if we're behind.
// reqStep will also be triggered when the L1 head moves forward or if there was a reorg on the // reqStep will also be triggered when the L1 head moves forward or if there was a reorg on the
// L1 chain that we need to handle. // L1 chain that we need to handle.
...@@ -323,6 +349,7 @@ func (s *state) eventLoop() { ...@@ -323,6 +349,7 @@ func (s *state) eventLoop() {
if err != nil { if err != nil {
s.log.Error("Error creating new L2 block", "err", err) s.log.Error("Error creating new L2 block", "err", err)
s.metrics.DerivationErrorsTotal.Inc() s.metrics.DerivationErrorsTotal.Inc()
break // if we fail, we wait for the next block creation trigger.
} }
// We need to catch up to the next origin as quickly as possible. We can do this by // We need to catch up to the next origin as quickly as possible. We can do this by
...@@ -346,24 +373,42 @@ func (s *state) eventLoop() { ...@@ -346,24 +373,42 @@ func (s *state) eventLoop() {
s.snapshot("New L1 Head") s.snapshot("New L1 Head")
s.handleNewL1Block(newL1Head) s.handleNewL1Block(newL1Head)
reqStep() // a new L1 head may mean we have the data to not get an EOF again. reqStep() // a new L1 head may mean we have the data to not get an EOF again.
case <-delayedStepReq:
delayedStepReq = nil
step()
case <-stepReqCh: case <-stepReqCh:
s.metrics.SetDerivationIdle(false) s.metrics.SetDerivationIdle(false)
s.idleDerivation = false s.idleDerivation = false
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Progress().Origin, "onto_closed", s.derivation.Progress().Closed) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Progress().Origin, "onto_closed", s.derivation.Progress().Closed, "attempts", stepAttempts)
stepCtx, cancel := context.WithTimeout(ctx, time.Second*10) // TODO pick a timeout for executing a single step stepCtx, cancel := context.WithTimeout(ctx, time.Second*10) // TODO pick a timeout for executing a single step
err := s.derivation.Step(stepCtx) err := s.derivation.Step(stepCtx)
cancel() cancel()
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF { if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Progress().Origin) s.log.Debug("Derivation process went idle", "progress", s.derivation.Progress().Origin)
s.idleDerivation = true s.idleDerivation = true
stepAttempts = 0
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
continue continue
} else if err != nil { } else if err != nil && errors.Is(err, derive.ErrReset) {
// If the pipeline corrupts, e.g. due to a reorg, simply reset it // If the pipeline corrupts, e.g. due to a reorg, simply reset it
s.log.Warn("Derivation pipeline is reset", "err", err) s.log.Warn("Derivation pipeline is reset", "err", err)
s.derivation.Reset() s.derivation.Reset()
s.metrics.RecordPipelineReset() s.metrics.RecordPipelineReset()
continue
} else if err != nil && errors.Is(err, derive.ErrTemporary) {
s.log.Warn("Derivation process temporary error", "attempts", stepAttempts, "err", err)
reqStep()
continue
} else if err != nil && errors.Is(err, derive.ErrCritical) {
s.log.Error("Derivation process critical error", "err", err)
return
} else if err != nil {
s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err)
reqStep()
continue
} else { } else {
stepAttempts = 0
finalized, safe, unsafe := s.derivation.Finalized(), s.derivation.SafeL2Head(), s.derivation.UnsafeL2Head() finalized, safe, unsafe := s.derivation.Finalized(), s.derivation.SafeL2Head(), s.derivation.UnsafeL2Head()
// log sync progress when it changes // log sync progress when it changes
if s.l2Finalized != finalized || s.l2SafeHead != safe || s.l2Head != unsafe { if s.l2Finalized != finalized || s.l2SafeHead != safe || s.l2Head != unsafe {
......
...@@ -25,7 +25,7 @@ func (d *outputImpl) createNewBlock(ctx context.Context, l2Head eth.L2BlockRef, ...@@ -25,7 +25,7 @@ func (d *outputImpl) createNewBlock(ctx context.Context, l2Head eth.L2BlockRef,
fetchCtx, cancel := context.WithTimeout(ctx, time.Second*20) fetchCtx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel() defer cancel()
attrs, _, err := derive.PreparePayloadAttributes(fetchCtx, d.Config, d.dl, l2Head, l2Head.Time+d.Config.BlockTime, l1Origin.ID()) attrs, err := derive.PreparePayloadAttributes(fetchCtx, d.Config, d.dl, l2Head, l2Head.Time+d.Config.BlockTime, l1Origin.ID())
if err != nil { if err != nil {
return l2Head, nil, err return l2Head, nil, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment