Commit 89f75545 authored by protolambda's avatar protolambda Committed by GitHub

op-node: sequencing better encapsulated, now with events (#10991)

* op-node: sequencer / engine events refactor

incl sequencer events fixes

* op-node: distinguish block sealing error kinds

* op-node: review fixes, stashed tweaks

* op-node: events based sequencer chaos test

* op-node: fix missing DerivedFrom data in attributes test

* op-node: drop old wip debugging work log

* op-node: sequencer move OnEvent function

* op-node: update stale todo comment

* op-node: detect derivation block-building as sequencer, and avoid conflict

* op-node: clarify comments and rename PayloadSealTemporaryErrorEvent to PayloadSealExpiredErrorEvent to describe applicability better

* op-node: prevent temporary engine error from influencing inactive sequencer
parent 6d48bac3
......@@ -39,6 +39,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.19.1
github.com/protolambda/ctxlock v0.1.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.1
golang.org/x/crypto v0.25.0
......
......@@ -652,6 +652,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/protolambda/ctxlock v0.1.0 h1:rCUY3+vRdcdZXqT07iXgyr744J2DU2LCBIXowYAjBCE=
github.com/protolambda/ctxlock v0.1.0/go.mod h1:vefhX6rIZH8rsg5ZpOJfEDYQOppZi19SfPiGOFrNnwM=
github.com/prysmaticlabs/gohashtree v0.0.1-alpha.0.20220714111606-acbb2962fb48 h1:cSo6/vk8YpvkLbk9v3FO97cakNmUoxwi2KMP8hd5WIw=
github.com/prysmaticlabs/gohashtree v0.0.1-alpha.0.20220714111606-acbb2962fb48/go.mod h1:4pWaT30XoEx1j8KNJf3TV+E3mQkaufn7mf+jRNb/Fuk=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
......
......@@ -3,6 +3,8 @@ package actions
import (
"errors"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/types"
......@@ -14,7 +16,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
......@@ -43,7 +45,7 @@ type L1Replica struct {
l1Cfg *core.Genesis
l1Signer types.Signer
failL1RPC func() error // mock error
failL1RPC func(call []rpc.BatchElem) error // mock error
}
// NewL1Replica constructs a L1Replica starting at the given genesis.
......@@ -152,18 +154,16 @@ func (s *L1Replica) CanonL1Chain() func(num uint64) *types.Block {
// ActL1RPCFail makes the next L1 RPC request to this node fail
func (s *L1Replica) ActL1RPCFail(t Testing) {
failed := false
s.failL1RPC = func() error {
if failed {
return nil
}
failed = true
s.failL1RPC = func(call []rpc.BatchElem) error {
s.failL1RPC = nil
return errors.New("mock L1 RPC error")
}
}
func (s *L1Replica) MockL1RPCErrors(fn func() error) {
s.failL1RPC = fn
s.failL1RPC = func(call []rpc.BatchElem) error {
return fn()
}
}
func (s *L1Replica) EthClient() *ethclient.Client {
......@@ -175,12 +175,11 @@ func (s *L1Replica) RPCClient() client.RPC {
cl := s.node.Attach()
return testutils.RPCErrFaker{
RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error {
if s.failL1RPC != nil {
return s.failL1RPC()
} else {
ErrFn: func(call []rpc.BatchElem) error {
if s.failL1RPC == nil {
return nil
}
return s.failL1RPC(call)
},
}
}
......
......@@ -44,7 +44,7 @@ type L2Engine struct {
engineApi *engineapi.L2EngineAPI
failL2RPC error // mock error
failL2RPC func(call []rpc.BatchElem) error // mock error
}
type EngineOption func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error
......@@ -160,10 +160,11 @@ func (e *L2Engine) RPCClient() client.RPC {
cl := e.node.Attach()
return testutils.RPCErrFaker{
RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error {
err := e.failL2RPC
e.failL2RPC = nil // reset back, only error once.
return err
ErrFn: func(call []rpc.BatchElem) error {
if e.failL2RPC == nil {
return nil
}
return e.failL2RPC(call)
},
}
}
......@@ -180,7 +181,10 @@ func (e *L2Engine) ActL2RPCFail(t Testing, err error) {
t.InvalidAction("already set a mock L2 rpc error")
return
}
e.failL2RPC = err
e.failL2RPC = func(call []rpc.BatchElem) error {
e.failL2RPC = nil
return err
}
}
// ActL2IncludeTx includes the next transaction from the given address in the block that is being built
......
......@@ -2,28 +2,31 @@ package actions
import (
"context"
"errors"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/confdepth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// MockL1OriginSelector is a shim to override the origin as sequencer, so we can force it to stay on an older origin.
type MockL1OriginSelector struct {
actual *driver.L1OriginSelector
actual *sequencing.L1OriginSelector
originOverride eth.L1BlockRef // override which origin gets picked
}
......@@ -39,7 +42,7 @@ func (m *MockL1OriginSelector) FindL1Origin(ctx context.Context, l2Head eth.L2Bl
type L2Sequencer struct {
*L2Verifier
sequencer *driver.Sequencer
sequencer *sequencing.Sequencer
failL2GossipUnsafeBlock error // mock error
......@@ -50,13 +53,33 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri
plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, blobSrc, plasmaSrc, eng, cfg, &sync.Config{}, safedb.Disabled)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.syncStatus.L1Head, l1)
seqConfDepthL1 := confdepth.NewConfDepth(seqConfDepth, ver.syncStatus.L1Head, l1)
l1OriginSelector := &MockL1OriginSelector{
actual: driver.NewL1OriginSelector(log, cfg, seqConfDepthL1),
}
actual: sequencing.NewL1OriginSelector(log, cfg, seqConfDepthL1),
}
metr := metrics.NoopMetrics
seqStateListener := node.DisabledConfigPersistence{}
conduc := &conductor.NoOpConductor{}
asyncGossip := async.NoOpGossiper{}
seq := sequencing.NewSequencer(t.Ctx(), log, cfg, attrBuilder, l1OriginSelector,
seqStateListener, conduc, asyncGossip, metr)
opts := event.DefaultRegisterOpts()
opts.Emitter = event.EmitterOpts{
Limiting: true,
// TestSyncBatchType/DerivationWithFlakyL1RPC does *a lot* of quick retries
// TestL2BatcherBatchType/ExtendedTimeWithoutL1Batches as well.
Rate: rate.Limit(100_000),
Burst: 100_000,
OnLimited: func() {
log.Warn("Hitting events rate-limit. An events code-path may be hot-looping.")
t.Fatal("Tests must not hot-loop events")
},
}
ver.eventSys.Register("sequencer", seq, opts)
require.NoError(t, seq.Init(t.Ctx(), true))
return &L2Sequencer{
L2Verifier: ver,
sequencer: driver.NewSequencer(log, cfg, ver.engine, attrBuilder, l1OriginSelector, metrics.NoopMetrics),
sequencer: seq,
mockL1OriginSelector: l1OriginSelector,
failL2GossipUnsafeBlock: nil,
}
......@@ -64,10 +87,6 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri
// ActL2StartBlock starts building of a new L2 block on top of the head
func (s *L2Sequencer) ActL2StartBlock(t Testing) {
s.ActL2StartBlockCheckErr(t, nil)
}
func (s *L2Sequencer) ActL2StartBlockCheckErr(t Testing, checkErr error) {
if !s.l2PipelineIdle {
t.InvalidAction("cannot start L2 build when derivation is not idle")
return
......@@ -76,21 +95,11 @@ func (s *L2Sequencer) ActL2StartBlockCheckErr(t Testing, checkErr error) {
t.InvalidAction("already started building L2 block")
return
}
s.synchronousEvents.Emit(sequencing.SequencerActionEvent{})
require.NoError(t, s.drainer.DrainUntil(event.Is[engine.BuildStartedEvent], false),
"failed to start block building")
err := s.sequencer.StartBuildingBlock(t.Ctx())
if checkErr == nil {
require.NoError(t, err, "failed to start block building")
} else {
require.ErrorIs(t, err, checkErr, "expected typed error")
}
if errors.Is(err, derive.ErrReset) {
s.derivation.Reset()
}
if err == nil {
s.l2Building = true
}
}
// ActL2EndBlock completes a new L2 block and applies it to the L2 chain as new canonical unsafe head
......@@ -101,10 +110,9 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
}
s.l2Building = false
_, err := s.sequencer.CompleteBuildingBlock(t.Ctx(), async.NoOpGossiper{}, &conductor.NoOpConductor{})
// TODO: there may be legitimate temporary errors here, if we mock engine API RPC-failure.
// For advanced tests we can catch those and print a warning instead.
require.NoError(t, err)
s.synchronousEvents.Emit(sequencing.SequencerActionEvent{})
require.NoError(t, s.drainer.DrainUntil(event.Is[engine.PayloadSuccessEvent], false),
"failed to complete block building")
// After having built a L2 block, make sure to get an engine update processed.
// This will ensure the sync-status and such reflect the latest changes.
......
......@@ -40,15 +40,15 @@ func EngineWithP2P() EngineOption {
func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Sequencer) {
jwtPath := e2eutils.WriteDefaultJWT(t)
miner := NewL1Miner(t, log, sd.L1Cfg)
miner := NewL1Miner(t, log.New("role", "l1-miner"), sd.L1Cfg)
l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindStandard))
require.NoError(t, err)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
engine := NewL2Engine(t, log.New("role", "sequencer-engine"), sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)
sequencer := NewL2Sequencer(t, log, l1F, miner.BlobStore(), plasma.Disabled, l2Cl, sd.RollupCfg, 0)
sequencer := NewL2Sequencer(t, log.New("role", "sequencer"), l1F, miner.BlobStore(), plasma.Disabled, l2Cl, sd.RollupCfg, 0)
return miner, engine, sequencer
}
......
......@@ -63,7 +63,7 @@ type L2Verifier struct {
rpc *rpc.Server
failRPC error // mock error
failRPC func(call []rpc.BatchElem) error // mock error
// The L2Verifier actor is embedded in the L2Sequencer actor,
// but must not be copied for the deriver-functionality to modify the same state.
......@@ -147,7 +147,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
Drain: executor.Drain,
}, opts)
sys.Register("engine", engine.NewEngDeriver(log, ctx, cfg, ec), opts)
sys.Register("engine", engine.NewEngDeriver(log, ctx, cfg, metrics, ec), opts)
rollupNode := &L2Verifier{
eventSys: sys,
......@@ -262,10 +262,11 @@ func (s *L2Verifier) RPCClient() client.RPC {
cl := rpc.DialInProc(s.rpc)
return testutils.RPCErrFaker{
RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error {
err := s.failRPC
s.failRPC = nil // reset back, only error once.
return err
ErrFn: func(call []rpc.BatchElem) error {
if s.failRPC == nil {
return nil
}
return s.failRPC(call)
},
}
}
......@@ -276,7 +277,10 @@ func (s *L2Verifier) ActRPCFail(t Testing) {
t.InvalidAction("already set a mock rpc error")
return
}
s.failRPC = errors.New("mock RPC error")
s.failRPC = func(call []rpc.BatchElem) error {
s.failRPC = nil
return errors.New("mock RPC error")
}
}
func (s *L2Verifier) ActL1HeadSignal(t Testing) {
......@@ -327,6 +331,10 @@ func (s *L2Verifier) OnEvent(ev event.Event) bool {
panic(fmt.Errorf("derivation failed critically: %w", x.Err))
case derive.DeriverIdleEvent:
s.l2PipelineIdle = true
case derive.PipelineStepEvent:
s.l2PipelineIdle = false
case driver.StepReqEvent:
s.synchronousEvents.Emit(driver.StepEvent{})
default:
return false
}
......@@ -359,23 +367,8 @@ func (s *L2Verifier) ActL2EventsUntil(t Testing, fn func(ev event.Event) bool, m
}
func (s *L2Verifier) ActL2PipelineFull(t Testing) {
s.l2PipelineIdle = false
i := 0
for !s.l2PipelineIdle {
i += 1
// Some tests do generate a lot of derivation steps
// (e.g. thousand blocks span-batch, or deep reorgs).
// Hence we set the sanity limit to something really high.
if i > 10_000 {
t.Fatalf("ActL2PipelineFull running for too long. Is a deriver looping?")
}
if s.l2Building {
t.InvalidAction("cannot derive new data while building L2 block")
return
}
s.synchronousEvents.Emit(driver.StepEvent{})
require.NoError(t, s.drainer.Drain(), "complete all event processing triggered by deriver step")
}
}
// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
......
......@@ -39,9 +39,9 @@ func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive
opt(cfg)
}
jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
engine := NewL2Engine(t, log.New("role", "verifier-engine"), sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
engCl := engine.EngineClient(t, sd.RollupCfg)
verifier := NewL2Verifier(t, log, l1F, blobSrc, plasma.Disabled, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener)
verifier := NewL2Verifier(t, log.New("role", "verifier"), l1F, blobSrc, plasma.Disabled, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener)
return engine, verifier
}
......
......@@ -4,6 +4,7 @@ import (
"errors"
"math/big"
"math/rand"
"strings"
"testing"
"time"
......@@ -16,9 +17,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
engine2 "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
......@@ -448,7 +449,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
// B3 is invalid block
// NextAttributes is called
sequencer.ActL2EventsUntil(t, event.Is[engine2.ProcessAttributesEvent], 100, true)
sequencer.ActL2EventsUntil(t, event.Is[engine2.BuildStartEvent], 100, true)
// mock forkChoiceUpdate error while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, eth.InputError{Inner: errors.New("mock L2 RPC error"), Code: eth.InvalidForkchoiceState})
......@@ -581,17 +582,28 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
// B3 is invalid block
// wait till attributes processing (excl.) before mocking errors
sequencer.ActL2EventsUntil(t, event.Is[engine2.ProcessAttributesEvent], 100, true)
sequencer.ActL2EventsUntil(t, event.Is[engine2.BuildStartEvent], 100, true)
serverErrCnt := 2
for i := 0; i < serverErrCnt; i++ {
// mock forkChoiceUpdate failure while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, gethengine.GenericServerError)
// TryBackupUnsafeReorg is called - forkChoiceUpdate returns GenericServerError so retry
sequencer.ActL2EventsUntil(t, event.Is[rollup.EngineTemporaryErrorEvent], 100, false)
// backupUnsafeHead not emptied yet
require.Equal(t, targetUnsafeHeadHash, sequencer.L2BackupUnsafe().Hash)
seqEng.failL2RPC = func(call []rpc.BatchElem) error {
for _, e := range call {
// There may be other calls, like payload-processing-cancellation
// based on previous invalid block, and processing of block attributes.
if strings.HasPrefix(e.Method, "engine_forkchoiceUpdated") && e.Args[1].(*eth.PayloadAttributes) == nil {
if serverErrCnt > 0 {
serverErrCnt -= 1
return gethengine.GenericServerError
} else {
return nil
}
}
}
return nil
}
// cannot drain events until specific engine error, since SyncDeriver calls Drain internally still.
sequencer.ActL2PipelineFull(t)
// now forkchoice succeeds
// try to process invalid leftovers: B4, B5
sequencer.ActL2PipelineFull(t)
......
......@@ -211,7 +211,7 @@ func TestSequencerFailover_DisasterRecovery_OverrideLeader(t *testing.T) {
// Start sequencer without the overrideLeader flag set to true, should fail
err = sys.RollupClient(Sequencer3Name).StartSequencer(ctx, common.Hash{1, 2, 3})
require.ErrorContains(t, err, "sequencer is not the leader, aborting.", "Expected sequencer to fail to start")
require.ErrorContains(t, err, "sequencer is not the leader, aborting", "Expected sequencer to fail to start")
// Start sequencer with the overrideLeader flag set to true, should succeed
err = sys.RollupClient(Sequencer3Name).OverrideLeader(ctx)
......
......@@ -35,6 +35,7 @@ type AttributesHandler struct {
emitter event.Emitter
attributes *derive.AttributesWithParent
sentAttributes bool
}
func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Context, l2 L2) *AttributesHandler {
......@@ -64,13 +65,39 @@ func (eq *AttributesHandler) OnEvent(ev event.Event) bool {
eq.emitter.Emit(derive.ConfirmReceivedAttributesEvent{})
// to make sure we have a pre-state signal to process the attributes from
eq.emitter.Emit(engine.PendingSafeRequestEvent{})
case rollup.ResetEvent:
eq.sentAttributes = false
eq.attributes = nil
case rollup.EngineTemporaryErrorEvent:
eq.sentAttributes = false
case engine.InvalidPayloadAttributesEvent:
if x.Attributes.DerivedFrom == (eth.L1BlockRef{}) {
return true // from sequencing
}
eq.sentAttributes = false
// If the engine signals that attributes are invalid,
// that should match our last applied attributes, which we should thus drop.
eq.attributes = nil
// Time to re-evaluate without attributes.
// (the pending-safe state will then be forwarded to our source of attributes).
eq.emitter.Emit(engine.PendingSafeRequestEvent{})
case engine.PayloadSealExpiredErrorEvent:
if x.DerivedFrom == (eth.L1BlockRef{}) {
return true // from sequencing
}
eq.log.Warn("Block sealing job of derived attributes expired, job will be re-attempted.",
"build_id", x.Info.ID, "timestamp", x.Info.Timestamp, "err", x.Err)
// If the engine failed to seal temporarily, just allow to resubmit (triggered on next safe-head poke)
eq.sentAttributes = false
case engine.PayloadSealInvalidEvent:
if x.DerivedFrom == (eth.L1BlockRef{}) {
return true // from sequencing
}
eq.log.Warn("Cannot seal derived block attributes, input is invalid",
"build_id", x.Info.ID, "timestamp", x.Info.Timestamp, "err", x.Err)
eq.sentAttributes = false
eq.attributes = nil
eq.emitter.Emit(engine.PendingSafeRequestEvent{})
default:
return false
}
......@@ -88,6 +115,7 @@ func (eq *AttributesHandler) onPendingSafeUpdate(x engine.PendingSafeUpdateEvent
}
if eq.attributes == nil {
eq.sentAttributes = false
// Request new attributes to be generated, only if we don't currently have attributes that have yet to be processed.
// It is safe to request the pipeline, the attributes-handler is the only user of it,
// and the pipeline will not generate another set of attributes until the last set is recognized.
......@@ -95,11 +123,19 @@ func (eq *AttributesHandler) onPendingSafeUpdate(x engine.PendingSafeUpdateEvent
return
}
// Drop attributes if they don't apply on top of the pending safe head
// Drop attributes if they don't apply on top of the pending safe head.
// This is expected after successful processing of these attributes.
if eq.attributes.Parent.Number != x.PendingSafe.Number {
eq.log.Warn("dropping stale attributes",
eq.log.Debug("dropping stale attributes, requesting new ones",
"pending", x.PendingSafe, "attributes_parent", eq.attributes.Parent)
eq.attributes = nil
eq.sentAttributes = false
eq.emitter.Emit(derive.PipelineStepEvent{PendingSafe: x.PendingSafe})
return
}
if eq.sentAttributes {
eq.log.Warn("already sent the existing attributes")
return
}
......@@ -118,7 +154,8 @@ func (eq *AttributesHandler) onPendingSafeUpdate(x engine.PendingSafeUpdateEvent
eq.consolidateNextSafeAttributes(eq.attributes, x.PendingSafe)
} else {
// append to tip otherwise
eq.emitter.Emit(engine.ProcessAttributesEvent{Attributes: eq.attributes})
eq.sentAttributes = true
eq.emitter.Emit(engine.BuildStartEvent{Attributes: eq.attributes})
}
}
}
......@@ -144,8 +181,9 @@ func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.At
eq.log.Warn("L2 reorg: existing unsafe block does not match derived attributes from L1",
"err", err, "unsafe", envelope.ExecutionPayload.ID(), "pending_safe", onto)
eq.sentAttributes = true
// geth cannot wind back a chain without reorging to a new, previously non-canonical, block
eq.emitter.Emit(engine.ProcessAttributesEvent{Attributes: attributes})
eq.emitter.Emit(engine.BuildStartEvent{Attributes: attributes})
return
} else {
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
......
......@@ -31,6 +31,9 @@ func TestAttributesHandler(t *testing.T) {
ParentHash: refA.Hash,
Time: refA.Time + 12,
}
// Copy with different hash, as alternative where the alt-L2 block may come from
refBAlt := refB
refBAlt.Hash = testutils.RandomHash(rng)
aL1Info := &testutils.MockBlockInfo{
InfoParentHash: refA.ParentHash,
......@@ -116,6 +119,7 @@ func TestAttributesHandler(t *testing.T) {
},
Parent: refA0,
IsLastInSpan: true,
DerivedFrom: refB,
}
refA1, err := derive.PayloadToBlockRef(cfg, payloadA1.ExecutionPayload)
require.NoError(t, err)
......@@ -152,6 +156,7 @@ func TestAttributesHandler(t *testing.T) {
},
Parent: refA0,
IsLastInSpan: true,
DerivedFrom: refBAlt,
}
refA1Alt, err := derive.PayloadToBlockRef(cfg, payloadA1Alt.ExecutionPayload)
......@@ -193,6 +198,8 @@ func TestAttributesHandler(t *testing.T) {
})
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes)
// New attributes will have to get generated after processing the last ones
emitter.ExpectOnce(derive.PipelineStepEvent{PendingSafe: refA1Alt})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1Alt,
Unsafe: refA1Alt,
......@@ -246,7 +253,7 @@ func TestAttributesHandler(t *testing.T) {
// The payloadA1 is going to get reorged out in favor of attrA1Alt (turns into payloadA1Alt)
l2.ExpectPayloadByNumber(refA1.Number, payloadA1, nil)
// fail consolidation, perform force reorg
emitter.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrA1Alt})
emitter.ExpectOnce(engine.BuildStartEvent{Attributes: attrA1Alt})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
Unsafe: refA1,
......@@ -255,6 +262,7 @@ func TestAttributesHandler(t *testing.T) {
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed")
emitter.ExpectOnce(derive.PipelineStepEvent{PendingSafe: refA1Alt})
// recognize reorg as complete
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1Alt,
......@@ -299,6 +307,7 @@ func TestAttributesHandler(t *testing.T) {
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes, "still have attributes, processing still unconfirmed")
emitter.ExpectOnce(derive.PipelineStepEvent{PendingSafe: refA1})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1,
Unsafe: refA1,
......@@ -334,7 +343,7 @@ func TestAttributesHandler(t *testing.T) {
require.True(t, attrA1Alt.IsLastInSpan, "must be last in span for attributes to become safe")
// attrA1Alt will fit right on top of A0
emitter.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrA1Alt})
emitter.ExpectOnce(engine.BuildStartEvent{Attributes: attrA1Alt})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
Unsafe: refA0,
......@@ -343,6 +352,7 @@ func TestAttributesHandler(t *testing.T) {
emitter.AssertExpectations(t)
require.NotNil(t, ah.attributes)
emitter.ExpectOnce(derive.PipelineStepEvent{PendingSafe: refA1Alt})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA1Alt,
Unsafe: refA1Alt,
......
......@@ -73,7 +73,7 @@ func (eq *CLSync) OnEvent(ev event.Event) bool {
defer eq.mu.Unlock()
switch x := ev.(type) {
case engine.InvalidPayloadEvent:
case engine.PayloadInvalidEvent:
eq.onInvalidPayload(x)
case engine.ForkchoiceUpdateEvent:
eq.onForkchoiceUpdate(x)
......@@ -87,7 +87,7 @@ func (eq *CLSync) OnEvent(ev event.Event) bool {
// onInvalidPayload checks if the first next-up payload matches the invalid payload.
// If so, the payload is dropped, to give the next payloads a try.
func (eq *CLSync) onInvalidPayload(x engine.InvalidPayloadEvent) {
func (eq *CLSync) onInvalidPayload(x engine.PayloadInvalidEvent) {
eq.log.Debug("CL sync received invalid-payload report", x.Envelope.ExecutionPayload.ID())
block := x.Envelope.ExecutionPayload
......
package clsync
import (
"errors"
"math/big"
"math/rand" // nosemgrep
"testing"
......@@ -377,7 +378,7 @@ func TestCLSync(t *testing.T) {
emitter.AssertExpectations(t)
// Pretend the payload is bad. It should not be retried after this.
cl.OnEvent(engine.InvalidPayloadEvent{Envelope: payloadA1})
cl.OnEvent(engine.PayloadInvalidEvent{Envelope: payloadA1, Err: errors.New("test err")})
emitter.AssertExpectations(t)
require.Nil(t, cl.unsafePayloads.Peek(), "pop because invalid")
})
......
......@@ -2,7 +2,6 @@ package driver
import (
"context"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -12,16 +11,24 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/clsync"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/confdepth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"
"github.com/ethereum-optimism/optimism/op-node/rollup/status"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// aliases to not disrupt op-conductor code
var (
ErrSequencerAlreadyStarted = sequencing.ErrSequencerAlreadyStarted
ErrSequencerAlreadyStopped = sequencing.ErrSequencerAlreadyStopped
)
type Metrics interface {
RecordPipelineReset()
RecordPublishingError()
......@@ -44,10 +51,10 @@ type Metrics interface {
RecordL1ReorgDepth(d uint64)
EngineMetrics
engine.Metrics
L1FetcherMetrics
SequencerMetrics
event.Metrics
sequencing.Metrics
}
type L1Chain interface {
......@@ -113,15 +120,6 @@ type SyncStatusTracker interface {
L1Head() eth.L1BlockRef
}
type SequencerIface interface {
StartBuildingBlock(ctx context.Context) error
CompleteBuildingBlock(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (*eth.ExecutionPayloadEnvelope, error)
PlanNextSequencerAction() time.Duration
RunNextSequencerAction(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (*eth.ExecutionPayloadEnvelope, error)
BuildingOnto() eth.L2BlockRef
CancelBuildingBlock(ctx context.Context)
}
type Network interface {
// PublishL2Payload is called by the driver whenever there is a new payload to publish, synchronously with the driver main loop.
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
......@@ -162,7 +160,7 @@ func NewDriver(
network Network,
log log.Logger,
metrics Metrics,
sequencerStateListener SequencerStateListener,
sequencerStateListener sequencing.SequencerStateListener,
safeHeadListener rollup.SafeHeadListener,
syncCfg *sync.Config,
sequencerConductor conductor.SequencerConductor,
......@@ -187,9 +185,7 @@ func NewDriver(
sys.Register("status", statusTracker, opts)
l1 = NewMeteredL1Fetcher(l1, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, statusTracker.L1Head, l1)
verifConfDepth := confdepth.NewConfDepth(driverCfg.VerifierConfDepth, statusTracker.L1Head, l1)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg,
sys.Register("engine-controller", nil, opts))
......@@ -216,11 +212,6 @@ func NewDriver(
sys.Register("pipeline",
derive.NewPipelineDeriver(driverCtx, derivationPipeline), opts)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, ec, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
syncDeriver := &SyncDeriver{
Derivation: derivationPipeline,
SafeHeadNotifs: safeHeadListener,
......@@ -236,11 +227,24 @@ func NewDriver(
}
sys.Register("sync", syncDeriver, opts)
sys.Register("engine", engine.NewEngDeriver(log, driverCtx, cfg, ec), opts)
sys.Register("engine", engine.NewEngDeriver(log, driverCtx, cfg, metrics, ec), opts)
schedDeriv := NewStepSchedulingDeriver(log)
sys.Register("step-scheduler", schedDeriv, opts)
var sequencer sequencing.SequencerIface
if driverCfg.SequencerEnabled {
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
sequencerConfDepth := confdepth.NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1)
findL1Origin := sequencing.NewL1OriginSelector(log, cfg, sequencerConfDepth)
sequencer = sequencing.NewSequencer(driverCtx, log, cfg, attrBuilder, findL1Origin,
sequencerStateListener, sequencerConductor, asyncGossiper, metrics)
sys.Register("sequencer", sequencer, opts)
} else {
sequencer = sequencing.DisabledSequencer{}
}
driverEmitter := sys.Register("driver", nil, opts)
driver := &Driver{
eventSys: sys,
......@@ -251,10 +255,6 @@ func NewDriver(
drain: drain,
stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10),
stopSequencer: make(chan chan hashAndError, 10),
sequencerActive: make(chan chan bool, 10),
sequencerNotifs: sequencerStateListener,
driverConfig: driverCfg,
driverCtx: driverCtx,
driverCancel: driverCancel,
......@@ -267,8 +267,6 @@ func NewDriver(
l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayloadEnvelope, 10),
altSync: altSync,
asyncGossiper: asyncGossiper,
sequencerConductor: sequencerConductor,
}
return driver
......
package driver
import (
"context"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type EngineMetrics interface {
RecordSequencingError()
CountSequencedTxs(count int)
RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration)
}
// MeteredEngine wraps an EngineControl and adds metrics such as block building time diff and sealing time
type MeteredEngine struct {
inner engine.EngineControl
cfg *rollup.Config
metrics EngineMetrics
log log.Logger
buildingStartTime time.Time
}
func NewMeteredEngine(cfg *rollup.Config, inner engine.EngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine {
return &MeteredEngine{
inner: inner,
cfg: cfg,
metrics: metrics,
log: log,
}
}
func (m *MeteredEngine) Finalized() eth.L2BlockRef {
return m.inner.Finalized()
}
func (m *MeteredEngine) UnsafeL2Head() eth.L2BlockRef {
return m.inner.UnsafeL2Head()
}
func (m *MeteredEngine) SafeL2Head() eth.L2BlockRef {
return m.inner.SafeL2Head()
}
func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType engine.BlockInsertionErrType, err error) {
m.buildingStartTime = time.Now()
errType, err = m.inner.StartPayload(ctx, parent, attrs, updateSafe)
if err != nil {
m.metrics.RecordSequencingError()
}
return errType, err
}
func (m *MeteredEngine) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp engine.BlockInsertionErrType, err error) {
sealingStart := time.Now()
// Actually execute the block and add it to the head of the chain.
payload, errType, err := m.inner.ConfirmPayload(ctx, agossip, sequencerConductor)
if err != nil {
m.metrics.RecordSequencingError()
return payload, errType, err
}
now := time.Now()
sealTime := now.Sub(sealingStart)
buildTime := now.Sub(m.buildingStartTime)
m.metrics.RecordSequencerSealingTime(sealTime)
m.metrics.RecordSequencerBuildingDiffTime(buildTime - time.Duration(m.cfg.BlockTime)*time.Second)
txnCount := len(payload.ExecutionPayload.Transactions)
m.metrics.CountSequencedTxs(txnCount)
ref := m.inner.UnsafeL2Head()
m.log.Debug("Processed new L2 block", "l2_unsafe", ref, "l1_origin", ref.L1Origin,
"txs", txnCount, "time", ref.Time, "seal_time", sealTime, "build_time", buildTime)
return payload, errType, err
}
func (m *MeteredEngine) CancelPayload(ctx context.Context, force bool) error {
return m.inner.CancelPayload(ctx, force)
}
func (m *MeteredEngine) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return m.inner.BuildingPayload()
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package engine
import (
"context"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type BuildCancelEvent struct {
Info eth.PayloadInfo
Force bool
}
func (ev BuildCancelEvent) String() string {
return "build-cancel"
}
func (eq *EngDeriver) onBuildCancel(ev BuildCancelEvent) {
ctx, cancel := context.WithTimeout(eq.ctx, buildCancelTimeout)
defer cancel()
// the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API
eq.log.Warn("cancelling old block building job", "info", ev.Info)
_, err := eq.ec.engine.GetPayload(ctx, ev.Info)
if err != nil {
if x, ok := err.(eth.InputError); ok && x.Code == eth.UnknownPayload { //nolint:all
return // if unknown, then it did not need to be cancelled anymore.
}
eq.log.Error("failed to cancel block building job", "info", ev.Info, "err", err)
if !ev.Force {
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
}
}
}
package engine
import (
"fmt"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
// BuildInvalidEvent is an internal engine event, to post-process upon invalid attributes.
// Not for temporary processing problems.
type BuildInvalidEvent struct {
Attributes *derive.AttributesWithParent
Err error
}
func (ev BuildInvalidEvent) String() string {
return "build-invalid"
}
// InvalidPayloadAttributesEvent is a signal to external derivers that the attributes were invalid.
type InvalidPayloadAttributesEvent struct {
Attributes *derive.AttributesWithParent
Err error
}
func (ev InvalidPayloadAttributesEvent) String() string {
return "invalid-payload-attributes"
}
func (eq *EngDeriver) onBuildInvalid(ev BuildInvalidEvent) {
eq.log.Warn("could not process payload attributes", "err", ev.Err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range ev.Attributes.Attributes.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
depositCount += 1
}
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
if len(ev.Attributes.Attributes.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", ev.Attributes.Parent, "err", ev.Err)
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("failed to process block with only deposit transactions: %w", ev.Err)})
return
}
// Revert the pending safe head to the safe head.
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)
// drop the payload without inserting it into the engine
// Signal that we deemed the attributes as unfit
eq.emitter.Emit(InvalidPayloadAttributesEvent(ev))
}
package engine
import (
"context"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// PayloadSealInvalidEvent identifies a permanent in-consensus problem with the payload sealing.
type PayloadSealInvalidEvent struct {
Info eth.PayloadInfo
Err error
IsLastInSpan bool
DerivedFrom eth.L1BlockRef
}
func (ev PayloadSealInvalidEvent) String() string {
return "payload-seal-invalid"
}
// PayloadSealExpiredErrorEvent identifies a form of failed payload-sealing that is not coupled
// to the attributes themselves, but rather the build-job process.
// The user should re-attempt by starting a new build process. The payload-sealing job should not be re-attempted,
// as it most likely expired, timed out, or referenced an otherwise invalidated block-building job identifier.
type PayloadSealExpiredErrorEvent struct {
Info eth.PayloadInfo
Err error
IsLastInSpan bool
DerivedFrom eth.L1BlockRef
}
func (ev PayloadSealExpiredErrorEvent) String() string {
return "payload-seal-expired-error"
}
type BuildSealEvent struct {
Info eth.PayloadInfo
BuildStarted time.Time
// if payload should be promoted to safe (must also be pending safe, see DerivedFrom)
IsLastInSpan bool
// payload is promoted to pending-safe if non-zero
DerivedFrom eth.L1BlockRef
}
func (ev BuildSealEvent) String() string {
return "build-seal"
}
func (eq *EngDeriver) onBuildSeal(ev BuildSealEvent) {
ctx, cancel := context.WithTimeout(eq.ctx, buildSealTimeout)
defer cancel()
sealingStart := time.Now()
envelope, err := eq.ec.engine.GetPayload(ctx, ev.Info)
if err != nil {
if x, ok := err.(eth.InputError); ok && x.Code == eth.UnknownPayload { //nolint:all
eq.log.Warn("Cannot seal block, payload ID is unknown",
"payloadID", ev.Info.ID, "payload_time", ev.Info.Timestamp,
"started_time", ev.BuildStarted)
}
// Although the engine will very likely not be able to continue from here with the same building job,
// we still call it "temporary", since the exact same payload-attributes have not been invalidated in-consensus.
// So the user (attributes-handler or sequencer) should be able to re-attempt the exact
// same attributes with a new block-building job from here to recover from this error.
// We name it "expired", as this generally identifies a timeout, unknown job, or otherwise invalidated work.
eq.emitter.Emit(PayloadSealExpiredErrorEvent{
Info: ev.Info,
Err: fmt.Errorf("failed to seal execution payload (ID: %s): %w", ev.Info.ID, err),
IsLastInSpan: ev.IsLastInSpan,
DerivedFrom: ev.DerivedFrom,
})
return
}
if err := sanityCheckPayload(envelope.ExecutionPayload); err != nil {
eq.emitter.Emit(PayloadSealInvalidEvent{
Info: ev.Info,
Err: fmt.Errorf("failed sanity-check of execution payload contents (ID: %s, blockhash: %s): %w",
ev.Info.ID, envelope.ExecutionPayload.BlockHash, err),
IsLastInSpan: ev.IsLastInSpan,
DerivedFrom: ev.DerivedFrom,
})
return
}
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
eq.emitter.Emit(PayloadSealInvalidEvent{
Info: ev.Info,
Err: fmt.Errorf("failed to decode L2 block ref from payload: %w", err),
IsLastInSpan: ev.IsLastInSpan,
DerivedFrom: ev.DerivedFrom,
})
return
}
now := time.Now()
sealTime := now.Sub(sealingStart)
buildTime := now.Sub(ev.BuildStarted)
eq.metrics.RecordSequencerSealingTime(sealTime)
eq.metrics.RecordSequencerBuildingDiffTime(buildTime - time.Duration(eq.cfg.BlockTime)*time.Second)
txnCount := len(envelope.ExecutionPayload.Transactions)
eq.metrics.CountSequencedTxs(txnCount)
eq.log.Debug("Processed new L2 block", "l2_unsafe", ref, "l1_origin", ref.L1Origin,
"txs", txnCount, "time", ref.Time, "seal_time", sealTime, "build_time", buildTime)
eq.emitter.Emit(BuildSealedEvent{
IsLastInSpan: ev.IsLastInSpan,
DerivedFrom: ev.DerivedFrom,
Info: ev.Info,
Envelope: envelope,
Ref: ref,
})
}
package engine
import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// BuildSealedEvent is emitted by the engine when a payload finished building,
// but is not locally inserted as canonical block yet
type BuildSealedEvent struct {
// if payload should be promoted to safe (must also be pending safe, see DerivedFrom)
IsLastInSpan bool
// payload is promoted to pending-safe if non-zero
DerivedFrom eth.L1BlockRef
Info eth.PayloadInfo
Envelope *eth.ExecutionPayloadEnvelope
Ref eth.L2BlockRef
}
func (ev BuildSealedEvent) String() string {
return "build-sealed"
}
func (eq *EngDeriver) onBuildSealed(ev BuildSealedEvent) {
// If a (pending) safe block, immediately process the block
if ev.DerivedFrom != (eth.L1BlockRef{}) {
eq.emitter.Emit(PayloadProcessEvent{
IsLastInSpan: ev.IsLastInSpan,
DerivedFrom: ev.DerivedFrom,
Envelope: ev.Envelope,
Ref: ev.Ref,
})
}
}
package engine
import (
"context"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type BuildStartEvent struct {
Attributes *derive.AttributesWithParent
}
func (ev BuildStartEvent) String() string {
return "build-start"
}
func (eq *EngDeriver) onBuildStart(ev BuildStartEvent) {
ctx, cancel := context.WithTimeout(eq.ctx, buildStartTimeout)
defer cancel()
if ev.Attributes.DerivedFrom != (eth.L1BlockRef{}) &&
eq.ec.PendingSafeL2Head().Hash != ev.Attributes.Parent.Hash {
// Warn about small reorgs, happens when pending safe head is getting rolled back
eq.log.Warn("block-attributes derived from L1 do not build on pending safe head, likely reorg",
"pending_safe", eq.ec.PendingSafeL2Head(), "attributes_parent", ev.Attributes.Parent)
}
fcEvent := ForkchoiceUpdateEvent{
UnsafeL2Head: ev.Attributes.Parent,
SafeL2Head: eq.ec.safeHead,
FinalizedL2Head: eq.ec.finalizedHead,
}
fc := eth.ForkchoiceState{
HeadBlockHash: fcEvent.UnsafeL2Head.Hash,
SafeBlockHash: fcEvent.SafeL2Head.Hash,
FinalizedBlockHash: fcEvent.FinalizedL2Head.Hash,
}
buildStartTime := time.Now()
id, errTyp, err := startPayload(ctx, eq.ec.engine, fc, ev.Attributes.Attributes)
if err != nil {
switch errTyp {
case BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: fmt.Errorf("temporarily cannot insert new safe block: %w", err)})
return
case BlockInsertPrestateErr:
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("need reset to resolve pre-state problem: %w", err)})
return
case BlockInsertPayloadErr:
eq.emitter.Emit(BuildInvalidEvent{Attributes: ev.Attributes, Err: err})
return
default:
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unknown error type %d: %w", errTyp, err)})
return
}
}
eq.emitter.Emit(fcEvent)
eq.emitter.Emit(BuildStartedEvent{
Info: eth.PayloadInfo{ID: id, Timestamp: uint64(ev.Attributes.Attributes.Timestamp)},
BuildStarted: buildStartTime,
IsLastInSpan: ev.Attributes.IsLastInSpan,
DerivedFrom: ev.Attributes.DerivedFrom,
Parent: ev.Attributes.Parent,
})
}
package engine
import (
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type BuildStartedEvent struct {
Info eth.PayloadInfo
BuildStarted time.Time
Parent eth.L2BlockRef
// if payload should be promoted to safe (must also be pending safe, see DerivedFrom)
IsLastInSpan bool
// payload is promoted to pending-safe if non-zero
DerivedFrom eth.L1BlockRef
}
func (ev BuildStartedEvent) String() string {
return "build-started"
}
func (eq *EngDeriver) onBuildStarted(ev BuildStartedEvent) {
// If a (pending) safe block, immediately seal the block
if ev.DerivedFrom != (eth.L1BlockRef{}) {
eq.emitter.Emit(BuildSealEvent{
Info: ev.Info,
BuildStarted: ev.BuildStarted,
IsLastInSpan: ev.IsLastInSpan,
DerivedFrom: ev.DerivedFrom,
})
}
}
......@@ -11,8 +11,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
......@@ -70,12 +68,6 @@ type EngineController struct {
// because engine may forgot backupUnsafeHead or backupUnsafeHead is not part
// of the chain.
needFCUCallForBackupUnsafeReorg bool
// Building State
buildingOnto eth.L2BlockRef
buildingInfo eth.PayloadInfo
buildingSafe bool
safeAttrs *derive.AttributesWithParent
}
func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics,
......@@ -120,10 +112,6 @@ func (e *EngineController) BackupUnsafeL2Head() eth.L2BlockRef {
return e.backupUnsafeHead
}
func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, bool) {
return e.buildingOnto, e.buildingInfo.ID, e.buildingSafe
}
func (e *EngineController) IsEngineSyncing() bool {
return e.syncStatus == syncStatusWillStartEL || e.syncStatus == syncStatusStartedEL || e.syncStatus == syncStatusFinishedELButNotFinalized
}
......@@ -209,121 +197,6 @@ func (e *EngineController) logSyncProgressMaybe() func() {
}
}
// Engine Methods
func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
if e.IsEngineSyncing() {
return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync")
}
if e.buildingInfo != (eth.PayloadInfo{}) {
e.log.Warn("did not finish previous block building, starting new building now", "prev_onto", e.buildingOnto, "prev_payload_id", e.buildingInfo.ID, "new_onto", parent)
// TODO(8841): maybe worth it to force-cancel the old payload ID here.
}
fc := eth.ForkchoiceState{
HeadBlockHash: parent.Hash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
id, errTyp, err := startPayload(ctx, e.engine, fc, attrs.Attributes)
if err != nil {
return errTyp, err
}
e.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: parent,
SafeL2Head: e.safeHead,
FinalizedL2Head: e.finalizedHead,
})
e.buildingInfo = eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.Attributes.Timestamp)}
e.buildingSafe = updateSafe
e.buildingOnto = parent
if updateSafe {
e.safeAttrs = attrs
}
return BlockInsertOK, nil
}
func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
// don't create a BlockInsertPrestateErr if we have a cached gossip payload
if e.buildingInfo == (eth.PayloadInfo{}) && agossip.Get() == nil {
return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload")
}
if p := agossip.Get(); p != nil && e.buildingOnto == (eth.L2BlockRef{}) {
e.log.Warn("Found reusable payload from async gossiper, and no block was being built. Reusing payload.",
"hash", p.ExecutionPayload.BlockHash,
"number", uint64(p.ExecutionPayload.BlockNumber),
"parent", p.ExecutionPayload.ParentHash)
} else if e.buildingOnto.Hash != e.unsafeHead.Hash { // E.g. when safe-attributes consolidation fails, it will drop the existing work.
e.log.Warn("engine is building block that reorgs previous unsafe head", "onto", e.buildingOnto, "unsafe", e.unsafeHead)
}
fc := eth.ForkchoiceState{
HeadBlockHash: common.Hash{}, // gets overridden
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
// Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor)
if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err)
}
ref, err := derive.PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload)
if err != nil {
return nil, BlockInsertPayloadErr, derive.NewResetError(fmt.Errorf("failed to decode L2 block ref from payload: %w", err))
}
// Backup unsafeHead when new block is not built on original unsafe head.
if e.unsafeHead.Number >= ref.Number {
e.SetBackupUnsafeL2Head(e.unsafeHead, false)
}
e.unsafeHead = ref
e.metrics.RecordL2Ref("l2_unsafe", ref)
if e.buildingSafe {
e.metrics.RecordL2Ref("l2_pending_safe", ref)
e.pendingSafeHead = ref
if updateSafe {
e.safeHead = ref
e.metrics.RecordL2Ref("l2_safe", ref)
// Remove backupUnsafeHead because this backup will be never used after consolidation.
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
}
}
e.emitter.Emit(ForkchoiceUpdateEvent{
UnsafeL2Head: e.unsafeHead,
SafeL2Head: e.safeHead,
FinalizedL2Head: e.finalizedHead,
})
e.resetBuildingState()
return envelope, BlockInsertOK, nil
}
func (e *EngineController) CancelPayload(ctx context.Context, force bool) error {
if e.buildingInfo == (eth.PayloadInfo{}) { // only cancel if there is something to cancel.
return nil
}
// the building job gets wrapped up as soon as the payload is retrieved, there's no explicit cancel in the Engine API
e.log.Error("cancelling old block sealing job", "payload", e.buildingInfo.ID)
_, err := e.engine.GetPayload(ctx, e.buildingInfo)
if err != nil {
e.log.Error("failed to cancel block building job", "payload", e.buildingInfo.ID, "err", err)
if !force {
return err
}
}
e.resetBuildingState()
return nil
}
func (e *EngineController) resetBuildingState() {
e.buildingInfo = eth.PayloadInfo{}
e.buildingOnto = eth.L2BlockRef{}
e.buildingSafe = false
e.safeAttrs = nil
}
// Misc Setters only used by the engine queue
// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
......@@ -389,6 +262,10 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
FinalizedL2Head: e.finalizedHead,
})
}
if e.unsafeHead == e.safeHead && e.safeHead == e.pendingSafeHead {
// Remove backupUnsafeHead because this backup will be never used after consolidation.
e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
}
e.needFCUCall = false
return nil
}
......@@ -416,7 +293,7 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et
return derive.NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
}
if status.Status == eth.ExecutionInvalid {
e.emitter.Emit(InvalidPayloadEvent{Envelope: envelope})
e.emitter.Emit(PayloadInvalidEvent{Envelope: envelope, Err: eth.NewPayloadErr(envelope.ExecutionPayload, status)})
}
if !e.checkNewPayloadStatus(status.Status) {
payload := envelope.ExecutionPayload
......@@ -550,8 +427,3 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro
return true, derive.NewTemporaryError(fmt.Errorf("cannot restore unsafe chain using backupUnsafe: err: %w",
eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
// ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() {
e.resetBuildingState()
}
......@@ -5,12 +5,8 @@ import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
)
// isDepositTx checks an opaqueTx to determine if it is a Deposit Transaction
......@@ -68,6 +64,8 @@ func sanityCheckPayload(payload *eth.ExecutionPayload) error {
return nil
}
var ErrEngineSyncing = errors.New("engine is syncing")
type BlockInsertionErrType uint
const (
......@@ -94,7 +92,11 @@ func startPayload(ctx context.Context, eng ExecEngine, fc eth.ForkchoiceState, a
case eth.InvalidPayloadAttributes:
return eth.PayloadID{}, BlockInsertPayloadErr, fmt.Errorf("payload attributes are not valid, cannot build block: %w", inputErr.Unwrap())
default:
return eth.PayloadID{}, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err)
if inputErr.Code.IsEngineError() {
return eth.PayloadID{}, BlockInsertPrestateErr, fmt.Errorf("unexpected engine error code in forkchoice-updated response: %w", err)
} else {
return eth.PayloadID{}, BlockInsertTemporaryErr, fmt.Errorf("unexpected generic error code in forkchoice-updated response: %w", err)
}
}
} else {
return eth.PayloadID{}, BlockInsertTemporaryErr, fmt.Errorf("failed to create new block via forkchoice: %w", err)
......@@ -111,92 +113,9 @@ func startPayload(ctx context.Context, eng ExecEngine, fc eth.ForkchoiceState, a
return eth.PayloadID{}, BlockInsertTemporaryErr, errors.New("nil id in forkchoice result when expecting a valid ID")
}
return *id, BlockInsertOK, nil
case eth.ExecutionSyncing:
return eth.PayloadID{}, BlockInsertTemporaryErr, ErrEngineSyncing
default:
return eth.PayloadID{}, BlockInsertTemporaryErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
}
}
// confirmPayload ends an execution payload building process in the provided Engine, and persists the payload as the canonical head.
// If updateSafe is true, then the payload will also be recognized as safe-head at the same time.
// The severity of the error is distinguished to determine whether the payload was valid and can become canonical.
func confirmPayload(
ctx context.Context,
log log.Logger,
eng ExecEngine,
fc eth.ForkchoiceState,
payloadInfo eth.PayloadInfo,
updateSafe bool,
agossip async.AsyncGossiper,
sequencerConductor conductor.SequencerConductor,
) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
var envelope *eth.ExecutionPayloadEnvelope
// if the payload is available from the async gossiper, it means it was not yet imported, so we reuse it
if cached := agossip.Get(); cached != nil {
envelope = cached
// log a limited amount of information about the reused payload, more detailed logging happens later down
log.Debug("found uninserted payload from async gossiper, reusing it and bypassing engine",
"hash", envelope.ExecutionPayload.BlockHash,
"number", uint64(envelope.ExecutionPayload.BlockNumber),
"parent", envelope.ExecutionPayload.ParentHash,
"txs", len(envelope.ExecutionPayload.Transactions))
} else {
envelope, err = eng.GetPayload(ctx, payloadInfo)
}
if err != nil {
// even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload.
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to get execution payload: %w", err)
}
payload := envelope.ExecutionPayload
if err := sanityCheckPayload(payload); err != nil {
return nil, BlockInsertPayloadErr, err
}
if err := sequencerConductor.CommitUnsafePayload(ctx, envelope); err != nil {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to commit unsafe payload to conductor: %w", err)
}
// begin gossiping as soon as possible
// agossip.Clear() will be called later if an non-temporary error is found, or if the payload is successfully inserted
agossip.Gossip(envelope)
status, err := eng.NewPayload(ctx, payload, envelope.ParentBeaconBlockRoot)
if err != nil {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to insert execution payload: %w", err)
}
if status.Status == eth.ExecutionInvalid || status.Status == eth.ExecutionInvalidBlockHash {
agossip.Clear()
return nil, BlockInsertPayloadErr, eth.NewPayloadErr(payload, status)
}
if status.Status != eth.ExecutionValid {
return nil, BlockInsertTemporaryErr, eth.NewPayloadErr(payload, status)
}
fc.HeadBlockHash = payload.BlockHash
if updateSafe {
fc.SafeBlockHash = payload.BlockHash
}
fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
// if we succeed to update the forkchoice pre-payload, but fail post-payload, then it is a payload error
agossip.Clear()
return nil, BlockInsertPayloadErr, fmt.Errorf("post-block-creation forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap())
default:
agossip.Clear()
return nil, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err)
}
} else {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to make the new L2 block canonical via forkchoice: %w", err)
}
}
agossip.Clear()
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
return nil, BlockInsertPayloadErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
}
log.Info("inserted block", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber),
"state_root", payload.StateRoot, "timestamp", uint64(payload.Timestamp), "parent", payload.ParentHash,
"prev_randao", payload.PrevRandao, "fee_recipient", payload.FeeRecipient,
"txs", len(payload.Transactions), "update_safe", updateSafe)
return envelope, BlockInsertOK, nil
}
......@@ -6,31 +6,19 @@ import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type InvalidPayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}
func (ev InvalidPayloadEvent) String() string {
return "invalid-payload"
}
type Metrics interface {
CountSequencedTxs(count int)
type InvalidPayloadAttributesEvent struct {
Attributes *derive.AttributesWithParent
}
func (ev InvalidPayloadAttributesEvent) String() string {
return "invalid-payload-attributes"
RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration)
}
// ForkchoiceRequestEvent signals to the engine that it should emit an artificial
......@@ -82,6 +70,7 @@ func (ev SafeDerivedEvent) String() string {
return "safe-derived"
}
// ProcessAttributesEvent signals to immediately process the attributes.
type ProcessAttributesEvent struct {
Attributes *derive.AttributesWithParent
}
......@@ -145,6 +134,8 @@ func (ev PromoteFinalizedEvent) String() string {
}
type EngDeriver struct {
metrics Metrics
log log.Logger
cfg *rollup.Config
ec *EngineController
......@@ -155,12 +146,13 @@ type EngDeriver struct {
var _ event.Deriver = (*EngDeriver)(nil)
func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config,
ec *EngineController) *EngDeriver {
metrics Metrics, ec *EngineController) *EngDeriver {
return &EngDeriver{
log: log,
cfg: cfg,
ec: ec,
ctx: ctx,
metrics: metrics,
}
}
......@@ -242,8 +234,6 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
"safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time,
"unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent(x))
case ProcessAttributesEvent:
d.onForceNextSafeAttributes(x.Attributes)
case PendingSafeRequestEvent:
d.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: d.ec.PendingSafeL2Head(),
......@@ -254,10 +244,16 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
// Resets/overwrites happen through engine-resets, not through promotion.
if x.Ref.Number > d.ec.PendingSafeL2Head().Number {
d.ec.SetPendingSafeL2Head(x.Ref)
d.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: d.ec.PendingSafeL2Head(),
Unsafe: d.ec.UnsafeL2Head(),
})
}
if x.Safe && x.Ref.Number > d.ec.SafeL2Head().Number {
d.ec.SetSafeHead(x.Ref)
d.emitter.Emit(SafeDerivedEvent{Safe: x.Ref, DerivedFrom: x.DerivedFrom})
// Try to apply the forkchoice changes
d.emitter.Emit(TryUpdateEngineEvent{})
}
case PromoteFinalizedEvent:
if x.Ref.Number < d.ec.Finalized().Number {
......@@ -271,91 +267,36 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
d.ec.SetFinalizedHead(x.Ref)
// Try to apply the forkchoice changes
d.emitter.Emit(TryUpdateEngineEvent{})
case BuildStartEvent:
d.onBuildStart(x)
case BuildStartedEvent:
d.onBuildStarted(x)
case BuildSealedEvent:
d.onBuildSealed(x)
case BuildSealEvent:
d.onBuildSeal(x)
case BuildInvalidEvent:
d.onBuildInvalid(x)
case BuildCancelEvent:
d.onBuildCancel(x)
case PayloadProcessEvent:
d.onPayloadProcess(x)
case PayloadSuccessEvent:
d.onPayloadSuccess(x)
case PayloadInvalidEvent:
d.onPayloadInvalid(x)
default:
return false
}
return true
}
// onForceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
func (eq *EngDeriver) onForceNextSafeAttributes(attributes *derive.AttributesWithParent) {
ctx, cancel := context.WithTimeout(eq.ctx, time.Second*10)
defer cancel()
attrs := attributes.Attributes
errType, err := eq.ec.StartPayload(ctx, eq.ec.PendingSafeL2Head(), attributes, true)
var envelope *eth.ExecutionPayloadEnvelope
if err == nil {
envelope, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{})
}
if err != nil {
switch errType {
case BlockInsertTemporaryErr:
// RPC errors are recoverable, we can retry the buffered payload attributes later.
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: fmt.Errorf("temporarily cannot insert new safe block: %w", err)})
return
case BlockInsertPrestateErr:
_ = eq.ec.CancelPayload(ctx, true)
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("need reset to resolve pre-state problem: %w", err)})
return
case BlockInsertPayloadErr:
if !errors.Is(err, derive.ErrTemporary) {
eq.emitter.Emit(InvalidPayloadAttributesEvent{Attributes: attributes})
}
_ = eq.ec.CancelPayload(ctx, true)
eq.log.Warn("could not process payload derived from L1 data, dropping attributes", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
depositCount += 1
}
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", attributes.Parent, "err", err)
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("failed to process block with only deposit transactions: %w", err)})
return
}
// Revert the pending safe head to the safe head.
eq.ec.SetPendingSafeL2Head(eq.ec.SafeL2Head())
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
// Try to restore to previous known unsafe chain.
eq.ec.SetBackupUnsafeL2Head(eq.ec.BackupUnsafeL2Head(), true)
// drop the payload without inserting it into the engine
return
default:
eq.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err)})
}
}
ref, err := derive.PayloadToBlockRef(eq.cfg, envelope.ExecutionPayload)
if err != nil {
eq.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("failed to decode L2 block ref from payload: %w", err)})
return
}
eq.ec.SetPendingSafeL2Head(ref)
if attributes.IsLastInSpan {
eq.ec.SetSafeHead(ref)
eq.emitter.Emit(SafeDerivedEvent{Safe: ref, DerivedFrom: attributes.DerivedFrom})
}
eq.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: eq.ec.PendingSafeL2Head(),
Unsafe: eq.ec.UnsafeL2Head(),
})
}
type ResetEngineControl interface {
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
ResetBuildingState()
}
// ForceEngineReset is not to be used. The op-program needs it for now, until event processing is adopted there.
......@@ -365,5 +306,4 @@ func ForceEngineReset(ec ResetEngineControl, x ForceEngineResetEvent) {
ec.SetPendingSafeL2Head(x.Safe)
ec.SetFinalizedHead(x.Finalized)
ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
ec.ResetBuildingState()
}
package engine
import (
"context"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -21,24 +17,6 @@ type Engine interface {
derive.L2Source
}
// EngineControl enables other components to build blocks with the Engine,
// while keeping the forkchoice state and payload-id management internal to
// avoid state inconsistencies between different users of the EngineControl.
type EngineControl interface {
EngineState
// StartPayload requests the engine to start building a block with the given attributes.
// If updateSafe, the resulting block will be marked as a safe block.
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *derive.AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error)
// ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned.
ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error)
// CancelPayload requests the engine to stop building the current block without making it canonical.
// This is optional, as the engine expires building jobs that are left uncompleted, but can still save resources.
CancelPayload(ctx context.Context, force bool) error
// BuildingPayload indicates if a payload is being built, and onto which block it is being built, and whether or not it is a safe payload.
BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool)
}
type LocalEngineState interface {
EngineState
......@@ -48,19 +26,7 @@ type LocalEngineState interface {
type LocalEngineControl interface {
LocalEngineState
EngineControl
ResetEngineControl
}
type FinalizerHooks interface {
// OnDerivationL1End remembers the given L1 block,
// and finalizes any prior data with the latest finality signal based on block height.
OnDerivationL1End(ctx context.Context, derivedFrom eth.L1BlockRef) error
// PostProcessSafeL2 remembers the L2 block is derived from the given L1 block, for later finalization.
PostProcessSafeL2(l2Safe eth.L2BlockRef, derivedFrom eth.L1BlockRef)
// Reset clear recent state, to adapt to reorgs.
Reset()
}
var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil)
package engine
import "time"
const (
buildSealTimeout = time.Second * 10
buildStartTimeout = time.Second * 10
buildCancelTimeout = time.Second * 10
payloadProcessTimeout = time.Second * 10
)
package engine
import "github.com/ethereum-optimism/optimism/op-service/eth"
type PayloadInvalidEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
Err error
}
func (ev PayloadInvalidEvent) String() string {
return "payload-invalid"
}
func (eq *EngDeriver) onPayloadInvalid(ev PayloadInvalidEvent) {
eq.log.Warn("Payload was invalid", "block", ev.Envelope.ExecutionPayload.ID(),
"err", ev.Err, "timestamp", uint64(ev.Envelope.ExecutionPayload.Timestamp))
}
package engine
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type PayloadProcessEvent struct {
// if payload should be promoted to safe (must also be pending safe, see DerivedFrom)
IsLastInSpan bool
// payload is promoted to pending-safe if non-zero
DerivedFrom eth.L1BlockRef
Envelope *eth.ExecutionPayloadEnvelope
Ref eth.L2BlockRef
}
func (ev PayloadProcessEvent) String() string {
return "payload-process"
}
func (eq *EngDeriver) onPayloadProcess(ev PayloadProcessEvent) {
ctx, cancel := context.WithTimeout(eq.ctx, payloadProcessTimeout)
defer cancel()
status, err := eq.ec.engine.NewPayload(ctx,
ev.Envelope.ExecutionPayload, ev.Envelope.ParentBeaconBlockRoot)
if err != nil {
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{
Err: fmt.Errorf("failed to insert execution payload: %w", err)})
return
}
switch status.Status {
case eth.ExecutionInvalid, eth.ExecutionInvalidBlockHash:
eq.emitter.Emit(PayloadInvalidEvent{
Envelope: ev.Envelope,
Err: eth.NewPayloadErr(ev.Envelope.ExecutionPayload, status)})
return
case eth.ExecutionValid:
eq.emitter.Emit(PayloadSuccessEvent(ev))
return
default:
eq.emitter.Emit(rollup.EngineTemporaryErrorEvent{
Err: eth.NewPayloadErr(ev.Envelope.ExecutionPayload, status)})
return
}
}
package engine
import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type PayloadSuccessEvent struct {
// if payload should be promoted to safe (must also be pending safe, see DerivedFrom)
IsLastInSpan bool
// payload is promoted to pending-safe if non-zero
DerivedFrom eth.L1BlockRef
Envelope *eth.ExecutionPayloadEnvelope
Ref eth.L2BlockRef
}
func (ev PayloadSuccessEvent) String() string {
return "payload-success"
}
func (eq *EngDeriver) onPayloadSuccess(ev PayloadSuccessEvent) {
// Backup unsafeHead when new block is not built on original unsafe head.
if eq.ec.unsafeHead.Number >= ev.Ref.Number {
eq.ec.SetBackupUnsafeL2Head(eq.ec.unsafeHead, false)
}
eq.ec.SetUnsafeHead(ev.Ref)
// If derived from L1, then it can be considered (pending) safe
if ev.DerivedFrom != (eth.L1BlockRef{}) {
if ev.IsLastInSpan {
eq.ec.SetSafeHead(ev.Ref)
eq.emitter.Emit(SafeDerivedEvent{Safe: ev.Ref, DerivedFrom: ev.DerivedFrom})
}
eq.ec.SetPendingSafeL2Head(ev.Ref)
eq.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: eq.ec.PendingSafeL2Head(),
Unsafe: eq.ec.UnsafeL2Head(),
})
}
payload := ev.Envelope.ExecutionPayload
eq.log.Info("Inserted block", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber),
"state_root", payload.StateRoot, "timestamp", uint64(payload.Timestamp), "parent", payload.ParentHash,
"prev_randao", payload.PrevRandao, "fee_recipient", payload.FeeRecipient,
"txs", len(payload.Transactions), "last_in_span", ev.IsLastInSpan, "derived_from", ev.DerivedFrom)
eq.emitter.Emit(TryUpdateEngineEvent{})
}
package sequencing
import (
"context"
"errors"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
)
var ErrSequencerNotEnabled = errors.New("sequencer is not enabled")
type DisabledSequencer struct{}
var _ SequencerIface = DisabledSequencer{}
func (ds DisabledSequencer) OnEvent(ev event.Event) bool {
return false
}
func (ds DisabledSequencer) NextAction() (t time.Time, ok bool) {
return time.Time{}, false
}
func (ds DisabledSequencer) Active() bool {
return false
}
func (ds DisabledSequencer) Init(ctx context.Context, active bool) error {
return ErrSequencerNotEnabled
}
func (ds DisabledSequencer) Start(ctx context.Context, head common.Hash) error {
return ErrSequencerNotEnabled
}
func (ds DisabledSequencer) Stop(ctx context.Context) (hash common.Hash, err error) {
return common.Hash{}, ErrSequencerNotEnabled
}
func (ds DisabledSequencer) SetMaxSafeLag(ctx context.Context, v uint64) error {
return ErrSequencerNotEnabled
}
func (ds DisabledSequencer) OverrideLeader(ctx context.Context) error {
return ErrSequencerNotEnabled
}
func (ds DisabledSequencer) Close() {}
package sequencing
import (
"context"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
)
type SequencerIface interface {
event.Deriver
// NextAction returns when the sequencer needs to do the next change, and iff it should do so.
NextAction() (t time.Time, ok bool)
Active() bool
Init(ctx context.Context, active bool) error
Start(ctx context.Context, head common.Hash) error
Stop(ctx context.Context) (hash common.Hash, err error)
SetMaxSafeLag(ctx context.Context, v uint64) error
OverrideLeader(ctx context.Context) error
Close()
}
package driver
package sequencing
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/confdepth"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
......@@ -127,7 +128,7 @@ func TestOriginSelectorRespectsConfDepth(t *testing.T) {
}
l1.ExpectL1BlockRefByHash(a.Hash, a, nil)
confDepthL1 := NewConfDepth(10, func() eth.L1BlockRef { return b }, l1)
confDepthL1 := confdepth.NewConfDepth(10, func() eth.L1BlockRef { return b }, l1)
s := NewL1OriginSelector(log, cfg, confDepthL1)
next, err := s.FindL1Origin(context.Background(), l2Head)
......@@ -170,7 +171,7 @@ func TestOriginSelectorStrictConfDepth(t *testing.T) {
}
l1.ExpectL1BlockRefByHash(a.Hash, a, nil)
confDepthL1 := NewConfDepth(10, func() eth.L1BlockRef { return b }, l1)
confDepthL1 := confdepth.NewConfDepth(10, func() eth.L1BlockRef { return b }, l1)
s := NewL1OriginSelector(log, cfg, confDepthL1)
_, err := s.FindL1Origin(context.Background(), l2Head)
......@@ -304,7 +305,7 @@ func TestOriginSelectorHandlesLateL1Blocks(t *testing.T) {
l1.ExpectL1BlockRefByNumber(b.Number, b, nil)
l1Head := b
confDepthL1 := NewConfDepth(2, func() eth.L1BlockRef { return l1Head }, l1)
confDepthL1 := confdepth.NewConfDepth(2, func() eth.L1BlockRef { return l1Head }, l1)
s := NewL1OriginSelector(log, cfg, confDepthL1)
_, err := s.FindL1Origin(context.Background(), l2Head)
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -41,7 +41,7 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
pipelineDeriver.AttachEmitter(d)
ec := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, &sync.Config{SyncMode: sync.CLSync}, d)
engineDeriv := engine.NewEngDeriver(logger, context.Background(), cfg, ec)
engineDeriv := engine.NewEngDeriver(logger, context.Background(), cfg, metrics.NoopMetrics, ec)
engineDeriv.AttachEmitter(d)
syncCfg := &sync.Config{SyncMode: sync.CLSync}
engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg)
......
......@@ -52,7 +52,7 @@ func (d *ProgramDeriver) OnEvent(ev event.Event) bool {
d.Emitter.Emit(derive.ConfirmReceivedAttributesEvent{})
// No need to queue the attributes, since there is no unsafe chain to consolidate against,
// and no temporary-error retry to perform on block processing.
d.Emitter.Emit(engine.ProcessAttributesEvent{Attributes: x.Attributes})
d.Emitter.Emit(engine.BuildStartEvent{Attributes: x.Attributes})
case engine.InvalidPayloadAttributesEvent:
// If a set of attributes was invalid, then we drop the attributes,
// and continue with the next.
......
......@@ -64,7 +64,7 @@ func TestProgramDeriver(t *testing.T) {
p, m := newProgram(t, 1000)
attrib := &derive.AttributesWithParent{Parent: eth.L2BlockRef{Number: 123}}
m.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
m.ExpectOnce(engine.ProcessAttributesEvent{Attributes: attrib})
m.ExpectOnce(engine.BuildStartEvent{Attributes: attrib})
p.OnEvent(derive.DerivedAttributesEvent{Attributes: attrib})
m.AssertExpectations(t)
require.False(t, p.closing)
......
......@@ -6,7 +6,7 @@ import (
)
type SimpleClock struct {
unix atomic.Uint64
v atomic.Pointer[time.Time]
}
func NewSimpleClock() *SimpleClock {
......@@ -14,9 +14,18 @@ func NewSimpleClock() *SimpleClock {
}
func (c *SimpleClock) SetTime(u uint64) {
c.unix.Store(u)
t := time.Unix(int64(u), 0)
c.v.Store(&t)
}
func (c *SimpleClock) Set(v time.Time) {
c.v.Store(&v)
}
func (c *SimpleClock) Now() time.Time {
return time.Unix(int64(c.unix.Load()), 0)
v := c.v.Load()
if v == nil {
return time.Unix(0, 0)
}
return *v
}
package clock
import (
"sync/atomic"
"testing"
"time"
......@@ -11,10 +10,9 @@ import (
func TestSimpleClock_Now(t *testing.T) {
c := NewSimpleClock()
require.Equal(t, time.Unix(0, 0), c.Now())
expectedTime := uint64(time.Now().Unix())
c.unix = atomic.Uint64{}
c.unix.Store(expectedTime)
require.Equal(t, time.Unix(int64(expectedTime), 0), c.Now())
expectedTime := time.Now()
c.v.Store(&expectedTime)
require.Equal(t, expectedTime, c.Now())
}
func TestSimpleClock_SetTime(t *testing.T) {
......
......@@ -20,10 +20,18 @@ import (
type ErrorCode int
func (c ErrorCode) IsEngineError() bool {
return -38100 < c && c <= -38000
}
// Engine error codes used to be -3200x, but were rebased to -3800x:
// https://github.com/ethereum/execution-apis/pull/214
const (
UnknownPayload ErrorCode = -32001 // Payload does not exist / is not available.
UnknownPayload ErrorCode = -38001 // Payload does not exist / is not available.
InvalidForkchoiceState ErrorCode = -38002 // Forkchoice state is invalid / inconsistent.
InvalidPayloadAttributes ErrorCode = -38003 // Payload attributes are invalid / inconsistent.
TooLargeEngineRequest ErrorCode = -38004 // Unused, here for completeness, only used by engine_getPayloadBodiesByHashV1
UnsupportedFork ErrorCode = -38005 // Unused, see issue #11130.
)
var ErrBedrockScalarPaddingNotEmpty = errors.New("version 0 scalar value has non-empty padding")
......
package testutils
import (
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -14,6 +16,15 @@ type TestDerivationMetrics struct {
FnRecordChannelInputBytes func(inputCompressedBytes int)
}
func (t *TestDerivationMetrics) CountSequencedTxs(count int) {
}
func (t *TestDerivationMetrics) RecordSequencerBuildingDiffTime(duration time.Duration) {
}
func (t *TestDerivationMetrics) RecordSequencerSealingTime(duration time.Duration) {
}
func (t *TestDerivationMetrics) RecordL1ReorgDepth(d uint64) {
if t.FnRecordL1ReorgDepth != nil {
t.FnRecordL1ReorgDepth(d)
......
......@@ -13,9 +13,9 @@ import (
type RPCErrFaker struct {
// RPC to call when no ErrFn is set, or the ErrFn does not return an error
RPC client.RPC
// ErrFn returns an error when the RPC needs to return error upon a call, batch call or subscription.
// ErrFn returns an error when the RPC needs to return error upon a call, batch call or subscription (nil input).
// The RPC operates without fake errors if the ErrFn is nil, or returns nil.
ErrFn func() error
ErrFn func(call []rpc.BatchElem) error
}
func (r RPCErrFaker) Close() {
......@@ -24,7 +24,11 @@ func (r RPCErrFaker) Close() {
func (r RPCErrFaker) CallContext(ctx context.Context, result any, method string, args ...any) error {
if r.ErrFn != nil {
if err := r.ErrFn(); err != nil {
if err := r.ErrFn([]rpc.BatchElem{{
Method: method,
Args: args,
Result: result,
}}); err != nil {
return err
}
}
......@@ -33,7 +37,7 @@ func (r RPCErrFaker) CallContext(ctx context.Context, result any, method string,
func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
if r.ErrFn != nil {
if err := r.ErrFn(); err != nil {
if err := r.ErrFn(b); err != nil {
return err
}
}
......@@ -42,7 +46,7 @@ func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) er
func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
if r.ErrFn != nil {
if err := r.ErrFn(); err != nil {
if err := r.ErrFn(nil); err != nil {
return nil, err
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment