Commit 24edceb6 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #4930 from ethereum-optimism/fix-sequencer-err-handling

op-node: handle sequencer errors with derivation-levels, reset when L1 origins are inconsistent
parents aa766f26 36dca770
...@@ -2,11 +2,13 @@ package actions ...@@ -2,11 +2,13 @@ package actions
import ( import (
"context" "context"
"errors"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
...@@ -46,7 +48,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c ...@@ -46,7 +48,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c
} }
return &L2Sequencer{ return &L2Sequencer{
L2Verifier: *ver, L2Verifier: *ver,
sequencer: driver.NewSequencer(log, cfg, ver.derivation, attrBuilder, l1OriginSelector), sequencer: driver.NewSequencer(log, cfg, ver.derivation, attrBuilder, l1OriginSelector, metrics.NoopMetrics),
mockL1OriginSelector: l1OriginSelector, mockL1OriginSelector: l1OriginSelector,
failL2GossipUnsafeBlock: nil, failL2GossipUnsafeBlock: nil,
} }
...@@ -54,6 +56,10 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c ...@@ -54,6 +56,10 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c
// ActL2StartBlock starts building of a new L2 block on top of the head // ActL2StartBlock starts building of a new L2 block on top of the head
func (s *L2Sequencer) ActL2StartBlock(t Testing) { func (s *L2Sequencer) ActL2StartBlock(t Testing) {
s.ActL2StartBlockCheckErr(t, nil)
}
func (s *L2Sequencer) ActL2StartBlockCheckErr(t Testing, checkErr error) {
if !s.l2PipelineIdle { if !s.l2PipelineIdle {
t.InvalidAction("cannot start L2 build when derivation is not idle") t.InvalidAction("cannot start L2 build when derivation is not idle")
return return
...@@ -64,9 +70,19 @@ func (s *L2Sequencer) ActL2StartBlock(t Testing) { ...@@ -64,9 +70,19 @@ func (s *L2Sequencer) ActL2StartBlock(t Testing) {
} }
err := s.sequencer.StartBuildingBlock(t.Ctx()) err := s.sequencer.StartBuildingBlock(t.Ctx())
require.NoError(t, err, "failed to start block building") if checkErr == nil {
require.NoError(t, err, "failed to start block building")
} else {
require.ErrorIs(t, err, checkErr, "expected typed error")
}
s.l2Building = true if errors.Is(err, derive.ErrReset) {
s.derivation.Reset()
}
if err == nil {
s.l2Building = true
}
} }
// ActL2EndBlock completes a new L2 block and applies it to the L2 chain as new canonical unsafe head // ActL2EndBlock completes a new L2 block and applies it to the L2 chain as new canonical unsafe head
...@@ -103,7 +119,16 @@ func (s *L2Sequencer) ActBuildToL1Head(t Testing) { ...@@ -103,7 +119,16 @@ func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
} }
} }
// ActBuildToL1HeadExcl builds empty blocks until (excl.) the L1 head becomes the L2 origin // ActBuildToL1HeadUnsafe builds empty blocks until (incl.) the L1 head becomes the L1 origin of the L2 head
func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) {
for s.derivation.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
}
}
// ActBuildToL1HeadExcl builds empty blocks until (excl.) the L1 head becomes the L1 origin of the L2 head
func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) { func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
for { for {
s.ActL2PipelineFull(t) s.ActL2PipelineFull(t)
...@@ -116,3 +141,17 @@ func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) { ...@@ -116,3 +141,17 @@ func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
s.ActL2EndBlock(t) s.ActL2EndBlock(t)
} }
} }
// ActBuildToL1HeadExclUnsafe builds empty blocks until (excl.) the L1 head becomes the L1 origin of the L2 head, without safe-head progression.
func (s *L2Sequencer) ActBuildToL1HeadExclUnsafe(t Testing) {
for {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.derivation.UnsafeL2Head())
require.NoError(t, err)
if nextOrigin.Number >= s.l1State.L1Head().Number {
break
}
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
}
}
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"math/big" "math/big"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
...@@ -100,3 +102,67 @@ func TestL2Sequencer_SequencerDrift(gt *testing.T) { ...@@ -100,3 +102,67 @@ func TestL2Sequencer_SequencerDrift(gt *testing.T) {
sequencer.ActL2StartBlock(t) sequencer.ActL2StartBlock(t)
require.True(t, engine.l2ForceEmpty, "engine should not be allowed to include anything after sequencer drift is surpassed") require.True(t, engine.l2ForceEmpty, "engine should not be allowed to include anything after sequencer drift is surpassed")
} }
// TestL2Sequencer_SequencerOnlyReorg regression-tests a Goerli halt where the sequencer
// would build an unsafe L2 block with a L1 origin that then gets reorged out,
// while the verifier-codepath only ever sees the valid post-reorg L1 chain.
func TestL2Sequencer_SequencerOnlyReorg(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlDebug)
miner, _, sequencer := setupSequencerTest(t, sd, log)
// Sequencer at first only recognizes the genesis as safe.
// The rest of the L1 chain will be incorporated as L1 origins into unsafe L2 blocks.
sequencer.ActL2PipelineFull(t)
// build L1 block with coinbase A
miner.ActL1SetFeeRecipient(common.Address{'A'})
miner.ActEmptyBlock(t)
// sequencer builds L2 blocks, until (incl.) it creates a L2 block with a L1 origin that has A as coinbase address
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t)
status := sequencer.SyncStatus()
require.Zero(t, status.SafeL2.L1Origin.Number, "no safe head progress")
require.Equal(t, status.HeadL1.Hash, status.UnsafeL2.L1Origin.Hash, "have head L1 origin")
// reorg out block with coinbase A, and make a block with coinbase B
miner.ActL1RewindToParent(t)
miner.ActL1SetFeeRecipient(common.Address{'B'})
miner.ActEmptyBlock(t)
// and a second block, for derivation to pick up on the new L1 chain
// (height is used as heuristic to not flip-flop between chains too frequently)
miner.ActEmptyBlock(t)
// Make the sequencer aware of the new head, and try to sync it.
// Since the safe chain never incorporated the now reorged L1 block with coinbase A,
// it will sync the new L1 chain fine.
// No batches are submitted yet however,
// so it'll keep the L2 block with the old L1 origin, since no conflict is detected.
sequencer.ActL1HeadSignal(t)
sequencer.ActL2PipelineFull(t)
// TODO: CLI-3405 we can detect the inconsistency of the L1 origin of the unsafe L2 head:
// as verifier, there is no need to wait for sequencer to recognize it.
newStatus := sequencer.SyncStatus()
require.Equal(t, status.HeadL1.Hash, newStatus.UnsafeL2.L1Origin.Hash, "still have old bad L1 origin")
require.NotEqual(t, status.HeadL1.Hash, newStatus.HeadL1.Hash, "did see the new L1 head change")
require.Equal(t, newStatus.HeadL1.Hash, newStatus.CurrentL1.Hash, "did sync the new L1 head as verifier")
// the block N+1 cannot build on the old N which still refers to the now orphaned L1 origin
require.Equal(t, status.UnsafeL2.L1Origin.Number, newStatus.HeadL1.Number-1, "seeing N+1 to attempt to build on N")
require.NotEqual(t, status.UnsafeL2.L1Origin.Hash, newStatus.HeadL1.ParentHash, "but N+1 cannot fit on N")
sequencer.ActL1HeadSignal(t)
// sequence more L2 blocks, until we actually need the next L1 origin
sequencer.ActBuildToL1HeadExclUnsafe(t)
// We expect block building to fail when the next L1 block is not consistent with the existing L1 origin
sequencer.ActL2StartBlockCheckErr(t, derive.ErrReset)
// After hitting a reset error, it reset derivation, and drops the old L1 chain
sequencer.ActL2PipelineFull(t)
require.Zero(t, sequencer.SyncStatus().UnsafeL2.L1Origin.Number, "back to genesis block with good L1 origin, drop old unsafe L2 chain with bad L1 origins")
// Can build new L2 blocks with good L1 origin
sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, newStatus.HeadL1.Hash, sequencer.SyncStatus().UnsafeL2.L1Origin.Hash, "build L2 chain with new correct L1 origins")
}
...@@ -53,6 +53,8 @@ type Metricer interface { ...@@ -53,6 +53,8 @@ type Metricer interface {
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
CountSequencedTxs(count int) CountSequencedTxs(count int)
RecordL1ReorgDepth(d uint64) RecordL1ReorgDepth(d uint64)
RecordSequencerInconsistentL1Origin(from eth.BlockID, to eth.BlockID)
RecordSequencerReset()
RecordGossipEvent(evType int32) RecordGossipEvent(evType int32)
IncPeerCount() IncPeerCount()
DecPeerCount() DecPeerCount()
...@@ -86,6 +88,9 @@ type Metrics struct { ...@@ -86,6 +88,9 @@ type Metrics struct {
SequencingErrors *EventMetrics SequencingErrors *EventMetrics
PublishingErrors *EventMetrics PublishingErrors *EventMetrics
SequencerInconsistentL1Origin *EventMetrics
SequencerResets *EventMetrics
SequencerBuildingDiffDurationSeconds prometheus.Histogram SequencerBuildingDiffDurationSeconds prometheus.Histogram
SequencerBuildingDiffTotal prometheus.Counter SequencerBuildingDiffTotal prometheus.Counter
...@@ -204,6 +209,9 @@ func NewMetrics(procName string) *Metrics { ...@@ -204,6 +209,9 @@ func NewMetrics(procName string) *Metrics {
SequencingErrors: NewEventMetrics(factory, ns, "sequencing_errors", "sequencing errors"), SequencingErrors: NewEventMetrics(factory, ns, "sequencing_errors", "sequencing errors"),
PublishingErrors: NewEventMetrics(factory, ns, "publishing_errors", "p2p publishing errors"), PublishingErrors: NewEventMetrics(factory, ns, "publishing_errors", "p2p publishing errors"),
SequencerInconsistentL1Origin: NewEventMetrics(factory, ns, "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"),
SequencerResets: NewEventMetrics(factory, ns, "sequencer_resets", "sequencer resets"),
UnsafePayloadsBufferLen: factory.NewGauge(prometheus.GaugeOpts{ UnsafePayloadsBufferLen: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "unsafe_payloads_buffer_len", Name: "unsafe_payloads_buffer_len",
...@@ -455,6 +463,16 @@ func (m *Metrics) RecordL1ReorgDepth(d uint64) { ...@@ -455,6 +463,16 @@ func (m *Metrics) RecordL1ReorgDepth(d uint64) {
m.L1ReorgDepth.Observe(float64(d)) m.L1ReorgDepth.Observe(float64(d))
} }
func (m *Metrics) RecordSequencerInconsistentL1Origin(from eth.BlockID, to eth.BlockID) {
m.SequencerInconsistentL1Origin.RecordEvent()
m.recordRef("l1_origin", "inconsistent_from", from.Number, 0, from.Hash)
m.recordRef("l1_origin", "inconsistent_to", to.Number, 0, to.Hash)
}
func (m *Metrics) RecordSequencerReset() {
m.SequencerResets.RecordEvent()
}
func (m *Metrics) RecordGossipEvent(evType int32) { func (m *Metrics) RecordGossipEvent(evType int32) {
m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc() m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc()
} }
...@@ -584,6 +602,12 @@ func (n *noopMetricer) CountSequencedTxs(count int) { ...@@ -584,6 +602,12 @@ func (n *noopMetricer) CountSequencedTxs(count int) {
func (n *noopMetricer) RecordL1ReorgDepth(d uint64) { func (n *noopMetricer) RecordL1ReorgDepth(d uint64) {
} }
func (n *noopMetricer) RecordSequencerInconsistentL1Origin(from eth.BlockID, to eth.BlockID) {
}
func (n *noopMetricer) RecordSequencerReset() {
}
func (n *noopMetricer) RecordGossipEvent(evType int32) { func (n *noopMetricer) RecordGossipEvent(evType int32) {
} }
......
...@@ -546,6 +546,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa ...@@ -546,6 +546,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa
} }
func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error { func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error {
if eq.buildingID == (eth.PayloadID{}) { // only cancel if there is something to cancel.
return nil
}
// the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API // the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API
eq.log.Error("cancelling old block sealing job", "payload", eq.buildingID) eq.log.Error("cancelling old block sealing job", "payload", eq.buildingID)
_, err := eq.engine.GetPayload(ctx, eq.buildingID) _, err := eq.engine.GetPayload(ctx, eq.buildingID)
......
...@@ -25,6 +25,14 @@ type L1Fetcher interface { ...@@ -25,6 +25,14 @@ type L1Fetcher interface {
L1TransactionFetcher L1TransactionFetcher
} }
// ResettableEngineControl wraps EngineControl with reset-functionality,
// which handles reorgs like the derivation pipeline:
// by determining the last valid block references to continue from.
type ResettableEngineControl interface {
EngineControl
Reset()
}
type ResetableStage interface { type ResetableStage interface {
// Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to, with corresponding configuration. // Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to, with corresponding configuration.
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
......
...@@ -29,6 +29,7 @@ type Metrics interface { ...@@ -29,6 +29,7 @@ type Metrics interface {
RecordL1ReorgDepth(d uint64) RecordL1ReorgDepth(d uint64)
EngineMetrics EngineMetrics
SequencerMetrics
} }
type L1Chain interface { type L1Chain interface {
...@@ -69,7 +70,7 @@ type SequencerIface interface { ...@@ -69,7 +70,7 @@ type SequencerIface interface {
StartBuildingBlock(ctx context.Context) error StartBuildingBlock(ctx context.Context) error
CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayload, error) CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayload, error)
PlanNextSequencerAction() time.Duration PlanNextSequencerAction() time.Duration
RunNextSequencerAction(ctx context.Context) *eth.ExecutionPayload RunNextSequencerAction(ctx context.Context) (*eth.ExecutionPayload, error)
BuildingOnto() eth.L2BlockRef BuildingOnto() eth.L2BlockRef
} }
...@@ -88,12 +89,11 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne ...@@ -88,12 +89,11 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
engine := derivationPipeline engine := derivationPipeline
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) meteredEngine := NewMeteredEngine(cfg, engine, metrics, log)
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin) sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
return &Driver{ return &Driver{
l1State: l1State, l1State: l1State,
derivation: derivationPipeline, derivation: derivationPipeline,
idleDerivation: false,
stateReq: make(chan chan struct{}), stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10), forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10), startSequencer: make(chan hashAndErrorChannel, 10),
......
...@@ -21,7 +21,7 @@ type EngineMetrics interface { ...@@ -21,7 +21,7 @@ type EngineMetrics interface {
// MeteredEngine wraps an EngineControl and adds metrics such as block building time diff and sealing time // MeteredEngine wraps an EngineControl and adds metrics such as block building time diff and sealing time
type MeteredEngine struct { type MeteredEngine struct {
inner derive.EngineControl inner derive.ResettableEngineControl
cfg *rollup.Config cfg *rollup.Config
metrics EngineMetrics metrics EngineMetrics
...@@ -30,10 +30,10 @@ type MeteredEngine struct { ...@@ -30,10 +30,10 @@ type MeteredEngine struct {
buildingStartTime time.Time buildingStartTime time.Time
} }
// MeteredEngine implements derive.EngineControl // MeteredEngine implements derive.ResettableEngineControl
var _ derive.EngineControl = (*MeteredEngine)(nil) var _ derive.ResettableEngineControl = (*MeteredEngine)(nil)
func NewMeteredEngine(cfg *rollup.Config, inner derive.EngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine { func NewMeteredEngine(cfg *rollup.Config, inner derive.ResettableEngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine {
return &MeteredEngine{ return &MeteredEngine{
inner: inner, inner: inner,
cfg: cfg, cfg: cfg,
...@@ -93,3 +93,7 @@ func (m *MeteredEngine) CancelPayload(ctx context.Context, force bool) error { ...@@ -93,3 +93,7 @@ func (m *MeteredEngine) CancelPayload(ctx context.Context, force bool) error {
func (m *MeteredEngine) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) { func (m *MeteredEngine) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return m.inner.BuildingPayload() return m.inner.BuildingPayload()
} }
func (m *MeteredEngine) Reset() {
m.inner.Reset()
}
...@@ -2,6 +2,7 @@ package driver ...@@ -2,6 +2,7 @@ package driver
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
...@@ -23,23 +24,30 @@ type L1OriginSelectorIface interface { ...@@ -23,23 +24,30 @@ type L1OriginSelectorIface interface {
FindL1Origin(ctx context.Context, l2Head eth.L2BlockRef) (eth.L1BlockRef, error) FindL1Origin(ctx context.Context, l2Head eth.L2BlockRef) (eth.L1BlockRef, error)
} }
type SequencerMetrics interface {
RecordSequencerInconsistentL1Origin(from eth.BlockID, to eth.BlockID)
RecordSequencerReset()
}
// Sequencer implements the sequencing interface of the driver: it starts and completes block building jobs. // Sequencer implements the sequencing interface of the driver: it starts and completes block building jobs.
type Sequencer struct { type Sequencer struct {
log log.Logger log log.Logger
config *rollup.Config config *rollup.Config
engine derive.EngineControl engine derive.ResettableEngineControl
attrBuilder derive.AttributesBuilder attrBuilder derive.AttributesBuilder
l1OriginSelector L1OriginSelectorIface l1OriginSelector L1OriginSelectorIface
metrics SequencerMetrics
// timeNow enables sequencer testing to mock the time // timeNow enables sequencer testing to mock the time
timeNow func() time.Time timeNow func() time.Time
nextAction time.Time nextAction time.Time
} }
func NewSequencer(log log.Logger, cfg *rollup.Config, engine derive.EngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface) *Sequencer { func NewSequencer(log log.Logger, cfg *rollup.Config, engine derive.ResettableEngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer {
return &Sequencer{ return &Sequencer{
log: log, log: log,
config: cfg, config: cfg,
...@@ -47,6 +55,7 @@ func NewSequencer(log log.Logger, cfg *rollup.Config, engine derive.EngineContro ...@@ -47,6 +55,7 @@ func NewSequencer(log log.Logger, cfg *rollup.Config, engine derive.EngineContro
timeNow: time.Now, timeNow: time.Now,
attrBuilder: attributesBuilder, attrBuilder: attributesBuilder,
l1OriginSelector: l1OriginSelector, l1OriginSelector: l1OriginSelector,
metrics: metrics,
} }
} }
...@@ -62,7 +71,8 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error { ...@@ -62,7 +71,8 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error {
} }
if !(l2Head.L1Origin.Hash == l1Origin.ParentHash || l2Head.L1Origin.Hash == l1Origin.Hash) { if !(l2Head.L1Origin.Hash == l1Origin.ParentHash || l2Head.L1Origin.Hash == l1Origin.Hash) {
return fmt.Errorf("cannot build new L2 block with L1 origin %s (parent L1 %s) on current L2 head %s with L1 origin %s", l1Origin, l1Origin.ParentHash, l2Head, l2Head.L1Origin) d.metrics.RecordSequencerInconsistentL1Origin(l2Head.L1Origin, l1Origin.ID())
return derive.NewResetError(fmt.Errorf("cannot build new L2 block with L1 origin %s (parent L1 %s) on current L2 head %s with L1 origin %s", l1Origin, l1Origin.ParentHash, l2Head, l2Head.L1Origin))
} }
d.log.Info("creating new block", "parent", l2Head, "l1Origin", l1Origin) d.log.Info("creating new block", "parent", l2Head, "l1Origin", l1Origin)
...@@ -159,29 +169,72 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef { ...@@ -159,29 +169,72 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef {
// RunNextSequencerAction starts new block building work, or seals existing work, // RunNextSequencerAction starts new block building work, or seals existing work,
// and is best timed by first awaiting the delay returned by PlanNextSequencerAction. // and is best timed by first awaiting the delay returned by PlanNextSequencerAction.
// If a new block is successfully sealed, it will be returned for publishing, nil otherwise. // If a new block is successfully sealed, it will be returned for publishing, nil otherwise.
func (d *Sequencer) RunNextSequencerAction(ctx context.Context) *eth.ExecutionPayload { //
// Only critical errors are bubbled up, other errors are handled internally.
// Internally starting or sealing of a block may fail with a derivation-like error:
// - If it is a critical error, the error is bubbled up to the caller.
// - If it is a reset error, the ResettableEngineControl used to build blocks is requested to reset, and a backoff aplies.
// No attempt is made at completing the block building.
// - If it is a temporary error, a backoff is applied to reattempt building later.
// - If it is any other error, a backoff is applied and building is cancelled.
//
// Upon L1 reorgs that are deep enough to affect the L1 origin selection, a reset-error may occur,
// to direct the engine to follow the new L1 chain before continuing to sequence blocks.
// It is up to the EngineControl implementation to handle conflicting build jobs of the derivation
// process (as verifier) and sequencing process.
// Generally it is expected that the latest call interrupts any ongoing work,
// and the derivation process does not interrupt in the happy case,
// since it can consolidate previously sequenced blocks by comparing sequenced inputs with derived inputs.
// If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish,
// but the derivation can continue to reset until the chain is correct.
func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionPayload, error) {
if _, buildingID, _ := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) { if _, buildingID, _ := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) {
payload, err := d.CompleteBuildingBlock(ctx) payload, err := d.CompleteBuildingBlock(ctx)
if err != nil { if err != nil {
d.log.Error("sequencer failed to seal new block", "err", err) if errors.Is(err, derive.ErrCritical) {
d.nextAction = d.timeNow().Add(time.Second) return nil, err // bubble up critical errors.
if buildingID != (eth.PayloadID{}) { // don't keep stale block building jobs around, try to cancel them } else if errors.Is(err, derive.ErrReset) {
d.log.Error("sequencer failed to seal new block, requiring derivation reset", "err", err)
d.metrics.RecordSequencerReset()
d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.config.BlockTime)) // hold off from sequencing for a full block
d.CancelBuildingBlock(ctx)
d.engine.Reset()
} else if errors.Is(err, derive.ErrTemporary) {
d.log.Error("sequencer failed temporarily to seal new block", "err", err)
d.nextAction = d.timeNow().Add(time.Second)
// We don't explicitly cancel block building jobs upon temporary errors: we may still finish the block.
// Any unfinished block building work eventually times out, and will be cleaned up that way.
} else {
d.log.Error("sequencer failed to seal block with unclassified error", "err", err)
d.nextAction = d.timeNow().Add(time.Second)
d.CancelBuildingBlock(ctx) d.CancelBuildingBlock(ctx)
} }
return nil return nil, nil
} else { } else {
d.log.Info("sequencer successfully built a new block", "block", payload.ID(), "time", uint64(payload.Timestamp), "txs", len(payload.Transactions)) d.log.Info("sequencer successfully built a new block", "block", payload.ID(), "time", uint64(payload.Timestamp), "txs", len(payload.Transactions))
return payload return payload, nil
} }
} else { } else {
err := d.StartBuildingBlock(ctx) err := d.StartBuildingBlock(ctx)
if err != nil { if err != nil {
d.log.Error("sequencer failed to start building new block", "err", err) if errors.Is(err, derive.ErrCritical) {
d.nextAction = d.timeNow().Add(time.Second) return nil, err
} else if errors.Is(err, derive.ErrReset) {
d.log.Error("sequencer failed to seal new block, requiring derivation reset", "err", err)
d.metrics.RecordSequencerReset()
d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.config.BlockTime)) // hold off from sequencing for a full block
d.engine.Reset()
} else if errors.Is(err, derive.ErrTemporary) {
d.log.Error("sequencer temporarily failed to start building new block", "err", err)
d.nextAction = d.timeNow().Add(time.Second)
} else {
d.log.Error("sequencer failed to start building new block with unclassified error", "err", err)
d.nextAction = d.timeNow().Add(time.Second)
}
} else { } else {
parent, buildingID, _ := d.engine.BuildingPayload() // we should have a new payload ID now that we're building a block parent, buildingID, _ := d.engine.BuildingPayload() // we should have a new payload ID now that we're building a block
d.log.Info("sequencer started building new block", "payload_id", buildingID, "l2_parent_block", parent, "l2_parent_block_time", parent.Time) d.log.Info("sequencer started building new block", "payload_id", buildingID, "l2_parent_block", parent, "l2_parent_block_time", parent.Time)
} }
return nil return nil, nil
} }
} }
...@@ -17,12 +17,15 @@ import ( ...@@ -17,12 +17,15 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
) )
var mockResetErr = fmt.Errorf("mock reset err: %w", derive.ErrReset)
type FakeEngineControl struct { type FakeEngineControl struct {
finalized eth.L2BlockRef finalized eth.L2BlockRef
safe eth.L2BlockRef safe eth.L2BlockRef
...@@ -122,7 +125,11 @@ func (m *FakeEngineControl) resetBuildingState() { ...@@ -122,7 +125,11 @@ func (m *FakeEngineControl) resetBuildingState() {
m.buildingAttrs = nil m.buildingAttrs = nil
} }
var _ derive.EngineControl = (*FakeEngineControl)(nil) func (m *FakeEngineControl) Reset() {
m.err = nil
}
var _ derive.ResettableEngineControl = (*FakeEngineControl)(nil)
type testAttrBuilderFn func(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error) type testAttrBuilderFn func(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error)
...@@ -295,7 +302,7 @@ func TestSequencerChaosMonkey(t *testing.T) { ...@@ -295,7 +302,7 @@ func TestSequencerChaosMonkey(t *testing.T) {
} }
}) })
seq := NewSequencer(log, cfg, engControl, attrBuilder, originSelector) seq := NewSequencer(log, cfg, engControl, attrBuilder, originSelector, metrics.NoopMetrics)
seq.timeNow = clockFn seq.timeNow = clockFn
// try to build 1000 blocks, with 5x as many planning attempts, to handle errors and clock problems // try to build 1000 blocks, with 5x as many planning attempts, to handle errors and clock problems
...@@ -319,25 +326,30 @@ func TestSequencerChaosMonkey(t *testing.T) { ...@@ -319,25 +326,30 @@ func TestSequencerChaosMonkey(t *testing.T) {
// reset errors // reset errors
originErr = nil originErr = nil
attrsErr = nil attrsErr = nil
engControl.err = nil if engControl.err != mockResetErr { // the mockResetErr requires the sequencer to Reset() to recover.
engControl.err = nil
}
engControl.errTyp = derive.BlockInsertOK engControl.errTyp = derive.BlockInsertOK
// maybe make something maybe fail, or try a new L1 origin // maybe make something maybe fail, or try a new L1 origin
switch rng.Intn(10) { // 40% chance to fail sequencer action (!!!) switch rng.Intn(20) { // 9/20 = 45% chance to fail sequencer action (!!!)
case 0: case 0, 1:
originErr = errors.New("mock origin error") originErr = errors.New("mock origin error")
case 1: case 2, 3:
attrsErr = errors.New("mock attributes error") attrsErr = errors.New("mock attributes error")
case 2: case 4, 5:
engControl.err = errors.New("mock temporary engine error") engControl.err = errors.New("mock temporary engine error")
engControl.errTyp = derive.BlockInsertTemporaryErr engControl.errTyp = derive.BlockInsertTemporaryErr
case 3: case 6, 7:
engControl.err = errors.New("mock prestate engine error") engControl.err = errors.New("mock prestate engine error")
engControl.errTyp = derive.BlockInsertPrestateErr engControl.errTyp = derive.BlockInsertPrestateErr
case 8:
engControl.err = mockResetErr
default: default:
// no error // no error
} }
payload := seq.RunNextSequencerAction(context.Background()) payload, err := seq.RunNextSequencerAction(context.Background())
require.NoError(t, err)
if payload != nil { if payload != nil {
require.Equal(t, engControl.UnsafeL2Head().ID(), payload.ID(), "head must stay in sync with emitted payloads") require.Equal(t, engControl.UnsafeL2Head().ID(), payload.ID(), "head must stay in sync with emitted payloads")
var tx types.Transaction var tx types.Transaction
......
...@@ -32,9 +32,6 @@ type Driver struct { ...@@ -32,9 +32,6 @@ type Driver struct {
// The derivation pipeline determines the new l2Safe. // The derivation pipeline determines the new l2Safe.
derivation DerivationPipeline derivation DerivationPipeline
// When the derivation pipeline is waiting for new data to do anything
idleDerivation bool
// Requests to block the event loop for synchronous execution to avoid reading an inconsistent state // Requests to block the event loop for synchronous execution to avoid reading an inconsistent state
stateReq chan chan struct{} stateReq chan chan struct{}
...@@ -212,7 +209,11 @@ func (s *Driver) eventLoop() { ...@@ -212,7 +209,11 @@ func (s *Driver) eventLoop() {
select { select {
case <-sequencerCh: case <-sequencerCh:
payload := s.sequencer.RunNextSequencerAction(ctx) payload, err := s.sequencer.RunNextSequencerAction(ctx)
if err != nil {
s.log.Error("Sequencer critical error", "err", err)
return
}
if s.network != nil && payload != nil { if s.network != nil && payload != nil {
// Publishing of unsafe data via p2p is optional. // Publishing of unsafe data via p2p is optional.
// Errors are not severe enough to change/halt sequencing but should be logged and metered. // Errors are not severe enough to change/halt sequencing but should be logged and metered.
...@@ -244,13 +245,11 @@ func (s *Driver) eventLoop() { ...@@ -244,13 +245,11 @@ func (s *Driver) eventLoop() {
step() step()
case <-stepReqCh: case <-stepReqCh:
s.metrics.SetDerivationIdle(false) s.metrics.SetDerivationIdle(false)
s.idleDerivation = false
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
err := s.derivation.Step(context.Background()) err := s.derivation.Step(context.Background())
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF { if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin()) s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin())
s.idleDerivation = true
stepAttempts = 0 stepAttempts = 0
s.metrics.SetDerivationIdle(true) s.metrics.SetDerivationIdle(true)
continue continue
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment