Commit da4c33c5 authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor: Cross-safe updates [rebased] (#12624)

* op-supervisor: cross-safe-updates PR squashed

op-supervisor: experimental cross-safety, with hazard detection

tweak: Add some errors/error returns in backend/cross.

wip: Chain index <> ID mapping.

fix: Check parent instead of re-checking hazardBlock.

Remove Hazard Work

Write missing DB Bindings OpenBlock, LocallyDerivedFrom, CrossDerivedFrom

Configurable WorkFn for Workers

op-supervisor: move chain-index <> chain ID translation into dependency set, fix some interfaces

op-supervisor: update cross-safety worker routine

op-supervisor: update more error handling

op-supervisor: move errors to types package

op-supervisor: check CanExecuteAt and CanInitiateAt

op-supervisor: determine cross-safe candidate and L1 scope, and more fixes

todo L1 scope increment

op-supervisor: cross-safe L1 scope bump

op-supervisor: dependency set getter

op-supervisor: L1 scope increment fix

op-supervisor: fix cross-safe updates typing

op-node: signal L1 traversal of derivation to supervisor

op-supervisor: fromda fixes and tests

op-supervisor: fix OpenBlock, fix/add missing interface methods, hook up cross-safe worker routines

OpenBlock to return map[uint32]ExecutingMessage

Add Frontier Unit Tests

fix WithParent panic

op-node: register L1 traversal with op-supervisor

op-node,op-supervisor: add logging, temp work around for interop local-safe updates

Add safe_start_test, unsafe_start_test

Add safe_update_test and unsafe_update_test

add worker_test

op-supervisor: fix cross-safe L1 scope bumping

op-supervisor: fix logs DB test
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
Co-authored-by: default avatarTyler Smith <mail@tcry.pt>

* op-node: fix interop deriver test

* op-e2e: fix interop action test

* op-supervisor: improve map init

* op-node: link interop TODO comment to issue, in engine events emitter

* op-supervisor: cleanup Worker instances of tests

---------
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
Co-authored-by: default avatarTyler Smith <mail@tcry.pt>
parent 9f22034f
...@@ -41,6 +41,18 @@ func TestInteropVerifier(gt *testing.T) { ...@@ -41,6 +41,18 @@ func TestInteropVerifier(gt *testing.T) {
l1Miner.L1Client(t, sd.RollupCfg), l1Miner.BlobStore(), &sync.Config{}, l1Miner.L1Client(t, sd.RollupCfg), l1Miner.BlobStore(), &sync.Config{},
helpers.WithInteropBackend(verMockBackend)) helpers.WithInteropBackend(verMockBackend))
// Genesis block will be registered as local-safe when we first trigger the derivation pipeline
seqMockBackend.UpdateLocalSafeFn = func(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.L2BlockRef) error {
require.Equal(t, sd.RollupCfg.Genesis.L1, derivedFrom.ID())
require.Equal(t, sd.RollupCfg.Genesis.L2, lastDerived.ID())
return nil
}
verMockBackend.UpdateLocalSafeFn = func(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.L2BlockRef) error {
require.Equal(t, sd.RollupCfg.Genesis.L1, derivedFrom.ID())
require.Equal(t, sd.RollupCfg.Genesis.L2, lastDerived.ID())
return nil
}
seq.ActL2PipelineFull(t) seq.ActL2PipelineFull(t)
ver.ActL2PipelineFull(t) ver.ActL2PipelineFull(t)
...@@ -93,8 +105,10 @@ func TestInteropVerifier(gt *testing.T) { ...@@ -93,8 +105,10 @@ func TestInteropVerifier(gt *testing.T) {
// Sync the L1 block, to verify the L2 block as local-safe. // Sync the L1 block, to verify the L2 block as local-safe.
seqMockBackend.UpdateLocalUnsafeFn = nil seqMockBackend.UpdateLocalUnsafeFn = nil
nextL2 := uint64(0)
seqMockBackend.UpdateLocalSafeFn = func(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.L2BlockRef) error { seqMockBackend.UpdateLocalSafeFn = func(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.L2BlockRef) error {
require.Equal(t, uint64(1), lastDerived.Number) require.Equal(t, nextL2, lastDerived.Number)
nextL2 += 1
return nil return nil
} }
seqMockBackend.SafeViewFn = func(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) { seqMockBackend.SafeViewFn = func(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
...@@ -106,6 +120,7 @@ func TestInteropVerifier(gt *testing.T) { ...@@ -106,6 +120,7 @@ func TestInteropVerifier(gt *testing.T) {
seq.ActL1HeadSignal(t) seq.ActL1HeadSignal(t)
l1Head := seq.SyncStatus().HeadL1 l1Head := seq.SyncStatus().HeadL1
seq.ActL2PipelineFull(t) seq.ActL2PipelineFull(t)
require.Equal(t, uint64(2), nextL2)
status = seq.SyncStatus() status = seq.SyncStatus()
require.Equal(t, uint64(1), status.UnsafeL2.Number) require.Equal(t, uint64(1), status.UnsafeL2.Number)
...@@ -143,8 +158,10 @@ func TestInteropVerifier(gt *testing.T) { ...@@ -143,8 +158,10 @@ func TestInteropVerifier(gt *testing.T) {
require.Equal(t, uint64(1), head.Number) require.Equal(t, uint64(1), head.Number)
return nil return nil
} }
nextL2 = 0
verMockBackend.UpdateLocalSafeFn = func(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.L2BlockRef) error { verMockBackend.UpdateLocalSafeFn = func(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.L2BlockRef) error {
require.Equal(t, uint64(1), lastDerived.Number) require.Equal(t, nextL2, lastDerived.Number)
nextL2 += 1
require.Equal(t, l1Head.ID(), derivedFrom.ID()) require.Equal(t, l1Head.ID(), derivedFrom.ID())
return nil return nil
} }
...@@ -163,6 +180,7 @@ func TestInteropVerifier(gt *testing.T) { ...@@ -163,6 +180,7 @@ func TestInteropVerifier(gt *testing.T) {
} }
ver.ActL1HeadSignal(t) ver.ActL1HeadSignal(t)
ver.ActL2PipelineFull(t) ver.ActL2PipelineFull(t)
require.Equal(t, uint64(2), nextL2)
status = ver.SyncStatus() status = ver.SyncStatus()
require.Equal(t, uint64(1), status.UnsafeL2.Number, "synced the block") require.Equal(t, uint64(1), status.UnsafeL2.Number, "synced the block")
require.Equal(t, uint64(0), status.CrossUnsafeL2.Number, "not cross-verified yet") require.Equal(t, uint64(0), status.CrossUnsafeL2.Number, "not cross-verified yet")
......
...@@ -20,6 +20,7 @@ func (d DeriverIdleEvent) String() string { ...@@ -20,6 +20,7 @@ func (d DeriverIdleEvent) String() string {
type DeriverL1StatusEvent struct { type DeriverL1StatusEvent struct {
Origin eth.L1BlockRef Origin eth.L1BlockRef
LastL2 eth.L2BlockRef
} }
func (d DeriverL1StatusEvent) String() string { func (d DeriverL1StatusEvent) String() string {
...@@ -99,7 +100,7 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) bool { ...@@ -99,7 +100,7 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) bool {
attrib, err := d.pipeline.Step(d.ctx, x.PendingSafe) attrib, err := d.pipeline.Step(d.ctx, x.PendingSafe)
postOrigin := d.pipeline.Origin() postOrigin := d.pipeline.Origin()
if preOrigin != postOrigin { if preOrigin != postOrigin {
d.emitter.Emit(DeriverL1StatusEvent{Origin: postOrigin}) d.emitter.Emit(DeriverL1StatusEvent{Origin: postOrigin, LastL2: x.PendingSafe})
} }
if err == io.EOF { if err == io.EOF {
d.pipeline.log.Debug("Derivation process went idle", "progress", d.pipeline.Origin(), "err", err) d.pipeline.log.Debug("Derivation process went idle", "progress", d.pipeline.Origin(), "err", err)
......
...@@ -98,6 +98,15 @@ func (ev PendingSafeUpdateEvent) String() string { ...@@ -98,6 +98,15 @@ func (ev PendingSafeUpdateEvent) String() string {
return "pending-safe-update" return "pending-safe-update"
} }
type InteropPendingSafeChangedEvent struct {
Ref eth.L2BlockRef
DerivedFrom eth.L1BlockRef
}
func (ev InteropPendingSafeChangedEvent) String() string {
return "interop-pending-safe-changed"
}
// PromotePendingSafeEvent signals that a block can be marked as pending-safe, and/or safe. // PromotePendingSafeEvent signals that a block can be marked as pending-safe, and/or safe.
type PromotePendingSafeEvent struct { type PromotePendingSafeEvent struct {
Ref eth.L2BlockRef Ref eth.L2BlockRef
...@@ -407,6 +416,11 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool { ...@@ -407,6 +416,11 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
DerivedFrom: x.DerivedFrom, DerivedFrom: x.DerivedFrom,
}) })
} }
// TODO(#12646): temporary interop work-around, assumes Holocene local-safe progression behavior.
d.emitter.Emit(InteropPendingSafeChangedEvent{
Ref: x.Ref,
DerivedFrom: x.DerivedFrom,
})
case PromoteLocalSafeEvent: case PromoteLocalSafeEvent:
d.ec.SetLocalSafeHead(x.Ref) d.ec.SetLocalSafeHead(x.Ref)
d.emitter.Emit(LocalSafeUpdateEvent(x)) d.emitter.Emit(LocalSafeUpdateEvent(x))
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event" "github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality" "github.com/ethereum-optimism/optimism/op-node/rollup/finality"
...@@ -83,10 +84,17 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool { ...@@ -83,10 +84,17 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool {
switch x := ev.(type) { switch x := ev.(type) {
case engine.UnsafeUpdateEvent: case engine.UnsafeUpdateEvent:
d.onLocalUnsafeUpdate(x) d.onLocalUnsafeUpdate(x)
case engine.LocalSafeUpdateEvent: case engine.InteropPendingSafeChangedEvent:
d.onLocalSafeUpdate(x) d.onInteropPendingSafeChangedEvent(x)
case finality.FinalizeL1Event: case finality.FinalizeL1Event:
d.onFinalizedL1(x) d.onFinalizedL1(x)
case derive.DeriverL1StatusEvent:
d.log.Debug("deriver L1 traversal event", "l1", x.Origin, "l2", x.LastL2)
// Register traversal of L1, repeat the last local-safe L2
d.onInteropPendingSafeChangedEvent(engine.InteropPendingSafeChangedEvent{
Ref: x.LastL2,
DerivedFrom: x.Origin,
})
case engine.CrossUnsafeUpdateEvent: case engine.CrossUnsafeUpdateEvent:
if err := d.onCrossUnsafe(x); err != nil { if err := d.onCrossUnsafe(x); err != nil {
d.log.Error("Failed to process cross-unsafe update", "err", err) d.log.Error("Failed to process cross-unsafe update", "err", err)
...@@ -117,7 +125,7 @@ func (d *InteropDeriver) onLocalUnsafeUpdate(x engine.UnsafeUpdateEvent) { ...@@ -117,7 +125,7 @@ func (d *InteropDeriver) onLocalUnsafeUpdate(x engine.UnsafeUpdateEvent) {
d.emitter.Emit(engine.RequestCrossUnsafeEvent{}) d.emitter.Emit(engine.RequestCrossUnsafeEvent{})
} }
func (d *InteropDeriver) onLocalSafeUpdate(x engine.LocalSafeUpdateEvent) { func (d *InteropDeriver) onInteropPendingSafeChangedEvent(x engine.InteropPendingSafeChangedEvent) {
d.log.Debug("Signaling derived-from update to interop backend", "derivedFrom", x.DerivedFrom, "block", x.Ref) d.log.Debug("Signaling derived-from update to interop backend", "derivedFrom", x.DerivedFrom, "block", x.Ref)
ctx, cancel := context.WithTimeout(d.driverCtx, rpcTimeout) ctx, cancel := context.WithTimeout(d.driverCtx, rpcTimeout)
defer cancel() defer cancel()
......
...@@ -93,7 +93,7 @@ func TestInteropDeriver(t *testing.T) { ...@@ -93,7 +93,7 @@ func TestInteropDeriver(t *testing.T) {
derivedFrom := testutils.RandomBlockRef(rng) derivedFrom := testutils.RandomBlockRef(rng)
localSafe := testutils.RandomL2BlockRef(rng) localSafe := testutils.RandomL2BlockRef(rng)
interopBackend.ExpectUpdateLocalSafe(chainID, derivedFrom, localSafe, nil) interopBackend.ExpectUpdateLocalSafe(chainID, derivedFrom, localSafe, nil)
interopDeriver.OnEvent(engine.LocalSafeUpdateEvent{ interopDeriver.OnEvent(engine.InteropPendingSafeChangedEvent{
Ref: localSafe, Ref: localSafe,
DerivedFrom: derivedFrom, DerivedFrom: derivedFrom,
}) })
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
...@@ -43,6 +44,12 @@ type SupervisorBackend struct { ...@@ -43,6 +44,12 @@ type SupervisorBackend struct {
// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB // chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors map[types.ChainID]*processors.ChainProcessor chainProcessors map[types.ChainID]*processors.ChainProcessor
// crossSafeProcessors take local-safe data and promote it to cross-safe when verified
crossSafeProcessors map[types.ChainID]*cross.Worker
// crossUnsafeProcessors take local-unsafe data and promote it to cross-unsafe when verified
crossUnsafeProcessors map[types.ChainID]*cross.Worker
// synchronousProcessors disables background-workers, // synchronousProcessors disables background-workers,
// requiring manual triggers for the backend to process anything. // requiring manual triggers for the backend to process anything.
synchronousProcessors bool synchronousProcessors bool
...@@ -71,18 +78,18 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg ...@@ -71,18 +78,18 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
// create initial per-chain resources // create initial per-chain resources
chainsDBs := db.NewChainsDB(logger, depSet) chainsDBs := db.NewChainsDB(logger, depSet)
chainProcessors := make(map[types.ChainID]*processors.ChainProcessor, len(chains))
chainMetrics := make(map[types.ChainID]*chainMetrics, len(chains))
// create the supervisor backend // create the supervisor backend
super := &SupervisorBackend{ super := &SupervisorBackend{
logger: logger, logger: logger,
m: m, m: m,
dataDir: cfg.Datadir, dataDir: cfg.Datadir,
depSet: depSet, depSet: depSet,
chainDBs: chainsDBs, chainDBs: chainsDBs,
chainProcessors: chainProcessors, chainProcessors: make(map[types.ChainID]*processors.ChainProcessor, len(chains)),
chainMetrics: chainMetrics, chainMetrics: make(map[types.ChainID]*chainMetrics, len(chains)),
crossUnsafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)),
crossSafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)),
// For testing we can avoid running the processors. // For testing we can avoid running the processors.
synchronousProcessors: cfg.SynchronousProcessors, synchronousProcessors: cfg.SynchronousProcessors,
} }
...@@ -117,6 +124,17 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf ...@@ -117,6 +124,17 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
su.chainProcessors[chainID] = chainProcessor su.chainProcessors[chainID] = chainProcessor
} }
// initialize all cross-unsafe processors
for _, chainID := range chains {
worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
su.crossUnsafeProcessors[chainID] = worker
}
// initialize all cross-safe processors
for _, chainID := range chains {
worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
su.crossSafeProcessors[chainID] = worker
}
// the config has some RPC connections to attach to the chain-processors // the config has some RPC connections to attach to the chain-processors
for _, rpc := range cfg.L2RPCs { for _, rpc := range cfg.L2RPCs {
err := su.attachRPC(ctx, rpc) err := su.attachRPC(ctx, rpc)
...@@ -231,6 +249,12 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { ...@@ -231,6 +249,12 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
for _, processor := range su.chainProcessors { for _, processor := range su.chainProcessors {
processor.StartBackground() processor.StartBackground()
} }
for _, worker := range su.crossUnsafeProcessors {
worker.StartBackground()
}
for _, worker := range su.crossSafeProcessors {
worker.StartBackground()
}
} }
return nil return nil
...@@ -249,6 +273,19 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { ...@@ -249,6 +273,19 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
processor.Close() processor.Close()
} }
clear(su.chainProcessors) clear(su.chainProcessors)
for id, worker := range su.crossUnsafeProcessors {
su.logger.Info("stopping cross-unsafe processor", "chainID", id)
worker.Close()
}
clear(su.crossUnsafeProcessors)
for id, worker := range su.crossSafeProcessors {
su.logger.Info("stopping cross-safe processor", "chainID", id)
worker.Close()
}
clear(su.crossSafeProcessors)
// close the databases // close the databases
return su.chainDBs.Close() return su.chainDBs.Close()
} }
......
package cross
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type SafeFrontierCheckDeps interface {
CandidateCrossSafe(chain types.ChainID) (derivedFromScope, crossSafe eth.BlockRef, err error)
CrossDerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom types.BlockSeal, err error)
DependencySet() depset.DependencySet
}
// HazardSafeFrontierChecks verifies all the hazard blocks are either:
// - already cross-safe.
// - the first (if not first: local blocks to verify before proceeding)
// local-safe block, after the cross-safe block.
func HazardSafeFrontierChecks(d SafeFrontierCheckDeps, inL1DerivedFrom eth.BlockID, hazards map[types.ChainIndex]types.BlockSeal) error {
depSet := d.DependencySet()
for hazardChainIndex, hazardBlock := range hazards {
hazardChainID, err := depSet.ChainIDFromIndex(hazardChainIndex)
if err != nil {
if errors.Is(err, types.ErrUnknownChain) {
err = fmt.Errorf("cannot cross-safe verify block %s of unknown chain index %s: %w", hazardBlock, hazardChainIndex, types.ErrConflict)
}
return err
}
initDerivedFrom, err := d.CrossDerivedFrom(hazardChainID, hazardBlock.ID())
if err != nil {
if errors.Is(err, types.ErrFuture) {
// If not in cross-safe scope, then check if it's the candidate cross-safe block.
initDerivedFrom, initSelf, err := d.CandidateCrossSafe(hazardChainID)
if err != nil {
return fmt.Errorf("failed to determine cross-safe candidate block of hazard dependency %s (chain %s): %w", hazardBlock, hazardChainID, err)
}
if initSelf.Number == hazardBlock.Number && initSelf.ID() != hazardBlock.ID() {
return fmt.Errorf("expected block %s (chain %d) does not match candidate local-safe block %s: %w",
hazardBlock, hazardChainID, initSelf, types.ErrConflict)
}
if initDerivedFrom.Number > inL1DerivedFrom.Number {
return fmt.Errorf("local-safe hazard block %s derived from L1 block %s is after scope %s: %w",
hazardBlock.ID(), initDerivedFrom, inL1DerivedFrom, types.ErrOutOfScope)
}
} else {
return fmt.Errorf("failed to determine cross-derived of hazard block %s (chain %s): %w", hazardBlock, hazardChainID, err)
}
} else if initDerivedFrom.Number > inL1DerivedFrom.Number {
return fmt.Errorf("cross-safe hazard block %s derived from L1 block %s is after scope %s: %w",
hazardBlock.ID(), initDerivedFrom, inL1DerivedFrom, types.ErrOutOfScope)
}
}
return nil
}
package cross
import (
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestHazardSafeFrontierChecks(t *testing.T) {
t.Run("empty hazards", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{}
l1DerivedFrom := eth.BlockID{}
hazards := map[types.ChainIndex]types.BlockSeal{}
// when there are no hazards,
// no work is done, and no error is returned
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.NoError(t, err)
})
t.Run("unknown chain", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{
deps: mockDependencySet{
chainIDFromIndexfn: func() (types.ChainID, error) {
return types.ChainID{}, types.ErrUnknownChain
},
},
}
l1DerivedFrom := eth.BlockID{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
// when there is one hazard, and ChainIDFromIndex returns ErrUnknownChain,
// an error is returned as a ErrConflict
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.ErrorIs(t, err, types.ErrConflict)
})
t.Run("initDerivedFrom in scope", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{}
sfcd.crossDerivedFromFn = func() (types.BlockSeal, error) {
return types.BlockSeal{Number: 1}, nil
}
l1DerivedFrom := eth.BlockID{Number: 2}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
// when there is one hazard, and CrossDerivedFrom returns a BlockSeal within scope
// (ie the hazard's block number is less than or equal to the derivedFrom block number),
// no error is returned
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.NoError(t, err)
})
t.Run("initDerivedFrom out of scope", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{}
sfcd.crossDerivedFromFn = func() (types.BlockSeal, error) {
return types.BlockSeal{Number: 3}, nil
}
l1DerivedFrom := eth.BlockID{Number: 2}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
// when there is one hazard, and CrossDerivedFrom returns a BlockSeal out of scope
// (ie the hazard's block number is greater than the derivedFrom block number),
// an error is returned as a ErrOutOfScope
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.ErrorIs(t, err, types.ErrOutOfScope)
})
t.Run("errFuture: candidate cross safe failure", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{}
sfcd.crossDerivedFromFn = func() (types.BlockSeal, error) {
return types.BlockSeal{Number: 3}, types.ErrFuture
}
sfcd.candidateCrossSafeFn = func() (derivedFromScope, crossSafe eth.BlockRef, err error) {
return eth.BlockRef{},
eth.BlockRef{Number: 3, Hash: common.BytesToHash([]byte{0x01})},
errors.New("some error")
}
l1DerivedFrom := eth.BlockID{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
// when there is one hazard, and CrossDerivedFrom returns an ErrFuture,
// and CandidateCrossSafe returns an error,
// the error from CandidateCrossSafe is returned
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.ErrorContains(t, err, "some error")
})
t.Run("errFuture: expected block does not match candidate", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{}
sfcd.crossDerivedFromFn = func() (types.BlockSeal, error) {
return types.BlockSeal{}, types.ErrFuture
}
sfcd.candidateCrossSafeFn = func() (derivedFromScope, crossSafe eth.BlockRef, err error) {
return eth.BlockRef{},
eth.BlockRef{Number: 3, Hash: common.BytesToHash([]byte{0x01})},
nil
}
l1DerivedFrom := eth.BlockID{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {Number: 3, Hash: common.BytesToHash([]byte{0x02})}}
// when there is one hazard, and CrossDerivedFrom returns an ErrFuture,
// and CandidateCrossSafe returns a candidate that does not match the hazard,
// (ie the candidate's block number is the same as the hazard's block number, but the hashes are different),
// an error is returned as a ErrConflict
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.ErrorIs(t, err, types.ErrConflict)
})
t.Run("errFuture: local-safe hazard out of scope", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{}
sfcd.crossDerivedFromFn = func() (types.BlockSeal, error) {
return types.BlockSeal{}, types.ErrFuture
}
sfcd.candidateCrossSafeFn = func() (derivedFromScope, crossSafe eth.BlockRef, err error) {
return eth.BlockRef{Number: 9},
eth.BlockRef{},
nil
}
l1DerivedFrom := eth.BlockID{Number: 8}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {Number: 3, Hash: common.BytesToHash([]byte{0x02})}}
// when there is one hazard, and CrossDerivedFrom returns an ErrFuture,
// and the initDerivedFrom is out of scope,
// an error is returned as a ErrOutOfScope
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.ErrorIs(t, err, types.ErrOutOfScope)
})
t.Run("CrossDerivedFrom Error", func(t *testing.T) {
sfcd := &mockSafeFrontierCheckDeps{}
sfcd.crossDerivedFromFn = func() (types.BlockSeal, error) {
return types.BlockSeal{}, errors.New("some error")
}
sfcd.candidateCrossSafeFn = func() (derivedFromScope, crossSafe eth.BlockRef, err error) {
return eth.BlockRef{Number: 9},
eth.BlockRef{},
nil
}
l1DerivedFrom := eth.BlockID{Number: 8}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {Number: 3, Hash: common.BytesToHash([]byte{0x02})}}
// when there is one hazard, and CrossDerivedFrom returns an ErrFuture,
// and the initDerivedFrom is out of scope,
// an error is returned as a ErrOutOfScope
err := HazardSafeFrontierChecks(sfcd, l1DerivedFrom, hazards)
require.ErrorContains(t, err, "some error")
})
}
type mockSafeFrontierCheckDeps struct {
deps mockDependencySet
candidateCrossSafeFn func() (derivedFromScope, crossSafe eth.BlockRef, err error)
crossDerivedFromFn func() (derivedFrom types.BlockSeal, err error)
}
func (m *mockSafeFrontierCheckDeps) CandidateCrossSafe(chain types.ChainID) (derivedFromScope, crossSafe eth.BlockRef, err error) {
if m.candidateCrossSafeFn != nil {
return m.candidateCrossSafeFn()
}
return eth.BlockRef{}, eth.BlockRef{}, nil
}
func (m *mockSafeFrontierCheckDeps) CrossDerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom types.BlockSeal, err error) {
if m.crossDerivedFromFn != nil {
return m.crossDerivedFromFn()
}
return types.BlockSeal{}, nil
}
func (m *mockSafeFrontierCheckDeps) DependencySet() depset.DependencySet {
return m.deps
}
type mockDependencySet struct {
chainIDFromIndexfn func() (types.ChainID, error)
canExecuteAtfn func() (bool, error)
canInitiateAtfn func() (bool, error)
}
func (m mockDependencySet) CanExecuteAt(chain types.ChainID, timestamp uint64) (bool, error) {
if m.canExecuteAtfn != nil {
return m.canExecuteAtfn()
}
return true, nil
}
func (m mockDependencySet) CanInitiateAt(chain types.ChainID, timestamp uint64) (bool, error) {
if m.canInitiateAtfn != nil {
return m.canInitiateAtfn()
}
return true, nil
}
func (m mockDependencySet) ChainIDFromIndex(index types.ChainIndex) (types.ChainID, error) {
if m.chainIDFromIndexfn != nil {
return m.chainIDFromIndexfn()
}
return types.ChainID{}, nil
}
func (m mockDependencySet) ChainIndexFromID(chain types.ChainID) (types.ChainIndex, error) {
return types.ChainIndex(0), nil
}
func (m mockDependencySet) Chains() []types.ChainID {
return nil
}
func (m mockDependencySet) HasChain(chain types.ChainID) bool {
return true
}
package cross
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type SafeStartDeps interface {
Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (includedIn types.BlockSeal, err error)
CrossDerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom types.BlockSeal, err error)
DependencySet() depset.DependencySet
}
// CrossSafeHazards checks if the given messages all exist and pass invariants.
// It returns a hazard-set: if any intra-block messaging happened,
// these hazard blocks have to be verified.
func CrossSafeHazards(d SafeStartDeps, chainID types.ChainID, inL1DerivedFrom eth.BlockID,
candidate types.BlockSeal, execMsgs []*types.ExecutingMessage) (hazards map[types.ChainIndex]types.BlockSeal, err error) {
hazards = make(map[types.ChainIndex]types.BlockSeal)
// Warning for future: If we have sub-second distinct blocks (different block number),
// we need to increase precision on the above timestamp invariant.
// Otherwise a local block can depend on a future local block of the same chain,
// simply by pulling in a block of another chain,
// which then depends on a block of the original chain,
// all with the same timestamp, without message cycles.
depSet := d.DependencySet()
if len(execMsgs) > 0 {
if ok, err := depSet.CanExecuteAt(chainID, candidate.Timestamp); err != nil {
return nil, fmt.Errorf("cannot check message execution of block %s (chain %s): %w", candidate, chainID, err)
} else if !ok {
return nil, fmt.Errorf("cannot execute messages in block %s (chain %s): %w", candidate, chainID, types.ErrConflict)
}
}
// check all executing messages
for _, msg := range execMsgs {
initChainID, err := depSet.ChainIDFromIndex(msg.Chain)
if err != nil {
if errors.Is(err, types.ErrUnknownChain) {
err = fmt.Errorf("msg %s may not execute from unknown chain %s: %w", msg, msg.Chain, types.ErrConflict)
}
return nil, err
}
if ok, err := depSet.CanInitiateAt(initChainID, msg.Timestamp); err != nil {
return nil, fmt.Errorf("cannot check message initiation of msg %s (chain %s): %w", msg, chainID, err)
} else if !ok {
return nil, fmt.Errorf("cannot allow initiating message %s (chain %s): %w", msg, chainID, types.ErrConflict)
}
if msg.Timestamp < candidate.Timestamp {
// If timestamp is older: invariant ensures non-cyclic ordering relative to other messages.
// Check that the block that they are included in is cross-safe already.
includedIn, err := d.Check(initChainID, msg.BlockNum, msg.LogIdx, msg.Hash)
if err != nil {
return nil, fmt.Errorf("executing msg %s failed check: %w", msg, err)
}
initDerivedFrom, err := d.CrossDerivedFrom(initChainID, includedIn.ID())
if err != nil {
return nil, fmt.Errorf("msg %s included in non-cross-safe block %s: %w", msg, includedIn, err)
}
if initDerivedFrom.Number > inL1DerivedFrom.Number {
return nil, fmt.Errorf("msg %s was included in block %s derived from %s which is not in cross-safe scope %s: %w",
msg, includedIn, initDerivedFrom, inL1DerivedFrom, types.ErrOutOfScope)
}
} else if msg.Timestamp == candidate.Timestamp {
// If timestamp is equal: we have to inspect ordering of individual
// log events to ensure non-cyclic cross-chain message ordering.
// And since we may have back-and-forth messaging, we cannot wait till the initiating side is cross-safe.
// Thus check that it was included in a local-safe block,
// and then proceed with transitive block checks,
// to ensure the local block we depend on is becoming cross-safe also.
includedIn, err := d.Check(initChainID, msg.BlockNum, msg.LogIdx, msg.Hash)
if err != nil {
return nil, fmt.Errorf("executing msg %s failed check: %w", msg, err)
}
// As a hazard block, it will be checked to be included in a cross-safe block,
// or right after a cross-safe block in a local-safe block, in HazardSafeFrontierChecks.
if existing, ok := hazards[msg.Chain]; ok {
if existing != includedIn {
return nil, fmt.Errorf("found dependency on %s (chain %d), but already depend on %s", includedIn, initChainID, chainID)
}
} else {
// Mark it as hazard block
hazards[msg.Chain] = includedIn
}
} else {
// Timestamp invariant is broken: executing message tries to execute future block.
// The predeploy inbox contract should not have allowed this executing message through.
return nil, fmt.Errorf("executing message %s in %s breaks timestamp invariant", msg, candidate)
}
}
return hazards, nil
}
This diff is collapsed.
package cross
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type CrossSafeDeps interface {
CrossSafe(chainID types.ChainID) (derivedFrom types.BlockSeal, derived types.BlockSeal, err error)
SafeFrontierCheckDeps
SafeStartDeps
CandidateCrossSafe(chain types.ChainID) (derivedFromScope, crossSafe eth.BlockRef, err error)
NextDerivedFrom(chain types.ChainID, derivedFrom eth.BlockID) (after eth.BlockRef, err error)
PreviousDerived(chain types.ChainID, derived eth.BlockID) (prevDerived types.BlockSeal, err error)
OpenBlock(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error)
UpdateCrossSafe(chain types.ChainID, l1View eth.BlockRef, lastCrossDerived eth.BlockRef) error
}
func CrossSafeUpdate(ctx context.Context, logger log.Logger, chainID types.ChainID, d CrossSafeDeps) error {
logger.Debug("Cross-safe update call")
// TODO(#11693): establish L1 reorg-lock of scopeDerivedFrom
// defer unlock once we are done checking the chain
candidateScope, err := scopedCrossSafeUpdate(logger, chainID, d)
if err == nil {
// if we made progress, and no errors, then there is no need to bump the L1 scope yet.
return nil
}
if !errors.Is(err, types.ErrOutOfScope) {
return err
}
// candidateScope is expected to be set if ErrOutOfScope is returned.
if candidateScope == (eth.BlockRef{}) {
return fmt.Errorf("expected L1 scope to be defined with ErrOutOfScope: %w", err)
}
logger.Debug("Cross-safe updating ran out of L1 scope", "scope", candidateScope, "err", err)
// bump the L1 scope up, and repeat the prev L2 block, not the candidate
newScope, err := d.NextDerivedFrom(chainID, candidateScope.ID())
if err != nil {
return fmt.Errorf("failed to identify new L1 scope to expand to after %s: %w", candidateScope, err)
}
_, currentCrossSafe, err := d.CrossSafe(chainID)
if err != nil {
// TODO: if genesis isn't cross-safe by default, then we can't register something as cross-safe here
return fmt.Errorf("failed to identify cross-safe scope to repeat: %w", err)
}
parent, err := d.PreviousDerived(chainID, currentCrossSafe.ID())
if err != nil {
return fmt.Errorf("cannot find parent-block of cross-safe: %w", err)
}
crossSafeRef := currentCrossSafe.WithParent(parent.ID())
logger.Debug("Bumping cross-safe scope", "scope", newScope, "crossSafe", crossSafeRef)
if err := d.UpdateCrossSafe(chainID, newScope, crossSafeRef); err != nil {
return fmt.Errorf("failed to update cross-safe head with L1 scope increment to %s and repeat of L2 block %s: %w", candidateScope, crossSafeRef, err)
}
return nil
}
// scopedCrossSafeUpdate runs through the cross-safe update checks.
// If no L2 cross-safe progress can be made without additional L1 input data,
// then a types.ErrOutOfScope error is returned,
// with the current scope that will need to be expanded for further progress.
func scopedCrossSafeUpdate(logger log.Logger, chainID types.ChainID, d CrossSafeDeps) (scope eth.BlockRef, err error) {
candidateScope, candidate, err := d.CandidateCrossSafe(chainID)
if err != nil {
return candidateScope, fmt.Errorf("failed to determine candidate block for cross-safe: %w", err)
}
logger.Debug("Candidate cross-safe", "scope", candidateScope, "candidate", candidate)
opened, _, execMsgs, err := d.OpenBlock(chainID, candidate.Number)
if err != nil {
return candidateScope, fmt.Errorf("failed to open block %s: %w", candidate, err)
}
if opened.ID() != candidate.ID() {
return candidateScope, fmt.Errorf("unsafe L2 DB has %s, but candidate cross-safe was %s: %w", opened, candidate, types.ErrConflict)
}
hazards, err := CrossSafeHazards(d, chainID, candidateScope.ID(), types.BlockSealFromRef(opened), sliceOfExecMsgs(execMsgs))
if err != nil {
return candidateScope, fmt.Errorf("failed to determine dependencies of cross-safe candidate %s: %w", candidate, err)
}
if err := HazardSafeFrontierChecks(d, candidateScope.ID(), hazards); err != nil {
return candidateScope, fmt.Errorf("failed to verify block %s in cross-safe frontier: %w", candidate, err)
}
//if err := HazardCycleChecks(d, candidate.Timestamp, hazards); err != nil {
// TODO
//}
// promote the candidate block to cross-safe
if err := d.UpdateCrossSafe(chainID, candidateScope, candidate); err != nil {
return candidateScope, fmt.Errorf("failed to update cross-safe head to %s, derived from scope %s: %w", candidate, candidateScope, err)
}
return candidateScope, nil
}
func NewCrossSafeWorker(logger log.Logger, chainID types.ChainID, d CrossSafeDeps) *Worker {
logger = logger.New("chain", chainID)
return NewWorker(logger, func(ctx context.Context) error {
return CrossSafeUpdate(ctx, logger, chainID, d)
})
}
func sliceOfExecMsgs(execMsgs map[uint32]*types.ExecutingMessage) []*types.ExecutingMessage {
msgs := make([]*types.ExecutingMessage, 0, len(execMsgs))
for _, msg := range execMsgs {
msgs = append(msgs, msg)
}
return msgs
}
This diff is collapsed.
package cross
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type UnsafeFrontierCheckDeps interface {
ParentBlock(chainID types.ChainID, parentOf eth.BlockID) (parent eth.BlockID, err error)
IsCrossUnsafe(chainID types.ChainID, block eth.BlockID) error
IsLocalUnsafe(chainID types.ChainID, block eth.BlockID) error
DependencySet() depset.DependencySet
}
// HazardUnsafeFrontierChecks verifies all the hazard blocks are either:
// - already cross-unsafe.
// - the first (if not first: local blocks to verify before proceeding)
// local-unsafe block, after the cross-unsafe block.
func HazardUnsafeFrontierChecks(d UnsafeFrontierCheckDeps, hazards map[types.ChainIndex]types.BlockSeal) error {
depSet := d.DependencySet()
for hazardChainIndex, hazardBlock := range hazards {
hazardChainID, err := depSet.ChainIDFromIndex(hazardChainIndex)
if err != nil {
if errors.Is(err, types.ErrUnknownChain) {
err = fmt.Errorf("cannot cross-unsafe verify block %s of unknown chain index %s: %w", hazardBlock, hazardChainIndex, types.ErrConflict)
}
return err
}
// Anything we depend on in this timestamp must be cross-unsafe already, or the first block after.
err = d.IsCrossUnsafe(hazardChainID, hazardBlock.ID())
if err != nil {
if errors.Is(err, types.ErrFuture) {
// Not already cross-unsafe, so we check if the block is local-unsafe
// (a sanity check if part of the canonical chain).
err = d.IsLocalUnsafe(hazardChainID, hazardBlock.ID())
if err != nil {
// can be ErrFuture (missing data) or ErrConflict (non-canonical)
return fmt.Errorf("hazard block %s (chain %d) is not local-unsafe: %w", hazardBlock, hazardChainID, err)
}
// If it doesn't have a parent block, then there is no prior block required to be cross-safe
if hazardBlock.Number > 0 {
// Check that parent of hazardBlockID is cross-safe within view
parent, err := d.ParentBlock(hazardChainID, hazardBlock.ID())
if err != nil {
return fmt.Errorf("failed to retrieve parent-block of hazard block %s (chain %s): %w", hazardBlock, hazardChainID, err)
}
if err := d.IsCrossUnsafe(hazardChainID, parent); err != nil {
return fmt.Errorf("cannot rely on hazard-block %s (chain %s), parent block %s is not cross-unsafe: %w", hazardBlock, hazardChainID, parent, err)
}
}
} else {
return fmt.Errorf("failed to determine cross-derived of hazard block %s (chain %s): %w", hazardBlock, hazardChainID, err)
}
}
}
return nil
}
package cross
import (
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestHazardUnsafeFrontierChecks(t *testing.T) {
t.Run("empty hazards", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{}
hazards := map[types.ChainIndex]types.BlockSeal{}
// when there are no hazards,
// no work is done, and no error is returned
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.NoError(t, err)
})
t.Run("unknown chain", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{
deps: mockDependencySet{
chainIDFromIndexfn: func() (types.ChainID, error) {
return types.ChainID{}, types.ErrUnknownChain
},
},
}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
// when there is one hazard, and ChainIDFromIndex returns ErrUnknownChain,
// an error is returned as a ErrConflict
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.ErrorIs(t, err, types.ErrConflict)
})
t.Run("is cross unsafe", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
ufcd.isCrossUnsafe = nil
// when there is one hazard, and IsCrossUnsafe returns nil (no error)
// no error is returned
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.NoError(t, err)
})
t.Run("errFuture: is not local unsafe", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
ufcd.isCrossUnsafe = types.ErrFuture
ufcd.isLocalUnsafe = errors.New("some error")
// when there is one hazard, and IsCrossUnsafe returns an ErrFuture,
// and IsLocalUnsafe returns an error,
// the error from IsLocalUnsafe is (wrapped and) returned
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.ErrorContains(t, err, "some error")
})
t.Run("errFuture: genesis block", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {}}
ufcd.isCrossUnsafe = types.ErrFuture
// when there is one hazard, and IsCrossUnsafe returns an ErrFuture,
// BUT the hazard's block number is 0,
// no error is returned
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.NoError(t, err)
})
t.Run("errFuture: error getting parent block", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {Number: 3}}
ufcd.isCrossUnsafe = types.ErrFuture
ufcd.parentBlockFn = func() (parent eth.BlockID, err error) {
return eth.BlockID{}, errors.New("some error")
}
// when there is one hazard, and IsCrossUnsafe returns an ErrFuture,
// and there is an error getting the parent block,
// the error from ParentBlock is (wrapped and) returned
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.ErrorContains(t, err, "some error")
})
t.Run("errFuture: parent block is not cross unsafe", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {Number: 3}}
ufcd.isCrossUnsafe = types.ErrFuture
ufcd.parentBlockFn = func() (parent eth.BlockID, err error) {
// when getting the parent block, prep isCrossSafe to be err
ufcd.isCrossUnsafe = errors.New("not cross unsafe!")
return eth.BlockID{}, nil
}
// when there is one hazard, and IsCrossUnsafe returns an ErrFuture,
// and the parent block is not cross unsafe,
// the error from IsCrossUnsafe is (wrapped and) returned
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.ErrorContains(t, err, "not cross unsafe!")
})
t.Run("IsCrossUnsafe Error", func(t *testing.T) {
ufcd := &mockUnsafeFrontierCheckDeps{}
hazards := map[types.ChainIndex]types.BlockSeal{types.ChainIndex(0): {Number: 3, Hash: common.BytesToHash([]byte{0x02})}}
ufcd.isCrossUnsafe = errors.New("some error")
// when there is one hazard, and IsCrossUnsafe returns an error,
// the error from IsCrossUnsafe is (wrapped and) returned
err := HazardUnsafeFrontierChecks(ufcd, hazards)
require.ErrorContains(t, err, "some error")
})
}
type mockUnsafeFrontierCheckDeps struct {
deps mockDependencySet
parentBlockFn func() (parent eth.BlockID, err error)
isCrossUnsafe error
isLocalUnsafe error
}
func (m *mockUnsafeFrontierCheckDeps) DependencySet() depset.DependencySet {
return m.deps
}
func (m *mockUnsafeFrontierCheckDeps) ParentBlock(chainID types.ChainID, block eth.BlockID) (parent eth.BlockID, err error) {
if m.parentBlockFn != nil {
return m.parentBlockFn()
}
return eth.BlockID{}, nil
}
func (m *mockUnsafeFrontierCheckDeps) IsCrossUnsafe(chainID types.ChainID, block eth.BlockID) error {
return m.isCrossUnsafe
}
func (m *mockUnsafeFrontierCheckDeps) IsLocalUnsafe(chainID types.ChainID, block eth.BlockID) error {
return m.isLocalUnsafe
}
package cross
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type UnsafeStartDeps interface {
Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (includedIn types.BlockSeal, err error)
IsCrossUnsafe(chainID types.ChainID, block eth.BlockID) error
DependencySet() depset.DependencySet
}
// CrossUnsafeHazards checks if the given messages all exist and pass invariants.
// It returns a hazard-set: if any intra-block messaging happened,
// these hazard blocks have to be verified.
func CrossUnsafeHazards(d UnsafeStartDeps, chainID types.ChainID,
candidate types.BlockSeal, execMsgs []*types.ExecutingMessage) (hazards map[types.ChainIndex]types.BlockSeal, err error) {
hazards = make(map[types.ChainIndex]types.BlockSeal)
// Warning for future: If we have sub-second distinct blocks (different block number),
// we need to increase precision on the above timestamp invariant.
// Otherwise a local block can depend on a future local block of the same chain,
// simply by pulling in a block of another chain,
// which then depends on a block of the original chain,
// all with the same timestamp, without message cycles.
depSet := d.DependencySet()
if len(execMsgs) > 0 {
if ok, err := depSet.CanExecuteAt(chainID, candidate.Timestamp); err != nil {
return nil, fmt.Errorf("cannot check message execution of block %s (chain %s): %w", candidate, chainID, err)
} else if !ok {
return nil, fmt.Errorf("cannot execute messages in block %s (chain %s): %w", candidate, chainID, types.ErrConflict)
}
}
// check all executing messages
for _, msg := range execMsgs {
initChainID, err := depSet.ChainIDFromIndex(msg.Chain)
if err != nil {
if errors.Is(err, types.ErrUnknownChain) {
err = fmt.Errorf("msg %s may not execute from unknown chain %s: %w", msg, msg.Chain, types.ErrConflict)
}
return nil, err
}
if ok, err := depSet.CanInitiateAt(initChainID, msg.Timestamp); err != nil {
return nil, fmt.Errorf("cannot check message initiation of msg %s (chain %s): %w", msg, chainID, err)
} else if !ok {
return nil, fmt.Errorf("cannot allow initiating message %s (chain %s): %w", msg, chainID, types.ErrConflict)
}
if msg.Timestamp < candidate.Timestamp {
// If timestamp is older: invariant ensures non-cyclic ordering relative to other messages.
// Check that the block that they are included in is cross-safe already.
includedIn, err := d.Check(initChainID, msg.BlockNum, msg.LogIdx, msg.Hash)
if err != nil {
return nil, fmt.Errorf("executing msg %s failed check: %w", msg, err)
}
if err := d.IsCrossUnsafe(initChainID, includedIn.ID()); err != nil {
return nil, fmt.Errorf("msg %s included in non-cross-unsafe block %s: %w", msg, includedIn, err)
}
} else if msg.Timestamp == candidate.Timestamp {
// If timestamp is equal: we have to inspect ordering of individual
// log events to ensure non-cyclic cross-chain message ordering.
// And since we may have back-and-forth messaging, we cannot wait till the initiating side is cross-unsafe.
// Thus check that it was included in a local-unsafe block,
// and then proceed with transitive block checks,
// to ensure the local block we depend on is becoming cross-unsafe also.
includedIn, err := d.Check(initChainID, msg.BlockNum, msg.LogIdx, msg.Hash)
if err != nil {
return nil, fmt.Errorf("executing msg %s failed check: %w", msg, err)
}
// As a hazard block, it will be checked to be included in a cross-unsafe block,
// or right after a cross-unsafe block, in HazardUnsafeFrontierChecks.
if existing, ok := hazards[msg.Chain]; ok {
if existing != includedIn {
return nil, fmt.Errorf("found dependency on %s (chain %d), but already depend on %s", includedIn, initChainID, chainID)
}
} else {
// Mark it as hazard block
hazards[msg.Chain] = includedIn
}
} else {
// Timestamp invariant is broken: executing message tries to execute future block.
// The predeploy inbox contract should not have allowed this executing message through.
return nil, fmt.Errorf("executing message %s in %s breaks timestamp invariant", msg, candidate)
}
}
return hazards, nil
}
This diff is collapsed.
package cross
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type CrossUnsafeDeps interface {
CrossUnsafe(chainID types.ChainID) (types.BlockSeal, error)
UnsafeStartDeps
UnsafeFrontierCheckDeps
OpenBlock(chainID types.ChainID, blockNum uint64) (block eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error)
UpdateCrossUnsafe(chain types.ChainID, crossUnsafe types.BlockSeal) error
}
func CrossUnsafeUpdate(ctx context.Context, logger log.Logger, chainID types.ChainID, d CrossUnsafeDeps) error {
var candidate types.BlockSeal
var execMsgs []*types.ExecutingMessage
// fetch cross-head to determine next cross-unsafe candidate
if crossUnsafe, err := d.CrossUnsafe(chainID); err != nil {
if errors.Is(err, types.ErrFuture) {
// If genesis / no cross-safe block yet, then defer update
logger.Debug("No cross-unsafe starting point yet")
return nil
} else {
return err
}
} else {
// Open block N+1: this is a local-unsafe block,
// just after cross-safe, that can be promoted if it passes the dependency checks.
bl, _, msgs, err := d.OpenBlock(chainID, crossUnsafe.Number+1)
if err != nil {
return fmt.Errorf("failed to open block %d: %w", crossUnsafe.Number+1, err)
}
if bl.ParentHash != crossUnsafe.Hash {
return fmt.Errorf("cannot use block %s, it does not build on cross-unsafe block %s: %w", bl, crossUnsafe, types.ErrConflict)
}
candidate = types.BlockSealFromRef(bl)
execMsgs = sliceOfExecMsgs(msgs)
}
hazards, err := CrossUnsafeHazards(d, chainID, candidate, execMsgs)
if err != nil {
// TODO(#11693): reorgs can be detected by checking if the error is ErrConflict,
// missing data is identified by ErrFuture,
// and other errors (e.g. DB issues) are identifier by remaining error kinds.
return fmt.Errorf("failed to check for cross-chain hazards: %w", err)
}
if err := HazardUnsafeFrontierChecks(d, hazards); err != nil {
return fmt.Errorf("failed to verify block %s in cross-unsafe frontier: %w", candidate, err)
}
//if err := HazardCycleChecks(d, candidate.Timestamp, hazards); err != nil {
//// TODO
//}
// promote the candidate block to cross-unsafe
if err := d.UpdateCrossUnsafe(chainID, candidate); err != nil {
return fmt.Errorf("failed to update cross-unsafe head to %s: %w", candidate, err)
}
return nil
}
func NewCrossUnsafeWorker(logger log.Logger, chainID types.ChainID, d CrossUnsafeDeps) *Worker {
logger = logger.New("chain", chainID)
return NewWorker(logger, func(ctx context.Context) error {
return CrossUnsafeUpdate(ctx, logger, chainID, d)
})
}
package cross
import (
"context"
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestCrossUnsafeUpdate(t *testing.T) {
t.Run("CrossUnsafe returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
usd.crossUnsafeFn = func(chainID types.ChainID) (types.BlockSeal, error) {
return types.BlockSeal{}, errors.New("some error")
}
usd.deps = mockDependencySet{}
// when an error is returned by CrossUnsafe,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("CrossUnsafe returns ErrFuture", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
usd.crossUnsafeFn = func(chainID types.ChainID) (types.BlockSeal, error) {
return types.BlockSeal{}, types.ErrFuture
}
usd.deps = mockDependencySet{}
// when a ErrFuture is returned by CrossUnsafe,
// no error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
require.NoError(t, err)
})
t.Run("OpenBlock returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
usd.openBlockFn = func(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
return eth.BlockRef{}, 0, nil, errors.New("some error")
}
usd.deps = mockDependencySet{}
// when an error is returned by OpenBlock,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("opened block parent hash does not match", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
crossUnsafe := types.BlockSeal{Hash: common.Hash{0x11}}
usd.crossUnsafeFn = func(chainID types.ChainID) (types.BlockSeal, error) {
return crossUnsafe, nil
}
bl := eth.BlockRef{ParentHash: common.Hash{0x01}}
usd.openBlockFn = func(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
return bl, 0, nil, nil
}
usd.deps = mockDependencySet{}
// when the parent hash of the opened block does not match the cross-unsafe block,
// an ErrConflict is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
require.ErrorIs(t, err, types.ErrConflict)
})
t.Run("CrossSafeHazards returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
crossUnsafe := types.BlockSeal{Hash: common.Hash{0x01}}
usd.crossUnsafeFn = func(chainID types.ChainID) (types.BlockSeal, error) {
return crossUnsafe, nil
}
bl := eth.BlockRef{ParentHash: common.Hash{0x01}}
usd.openBlockFn = func(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
// include one executing message to trigger the CanExecuteAt check
return bl, 0, map[uint32]*types.ExecutingMessage{1: {}}, nil
}
usd.deps = mockDependencySet{}
// make CrossSafeHazards return an error by setting CanExecuteAtfn to return an error
usd.deps.canExecuteAtfn = func() (bool, error) {
return false, errors.New("some error")
}
// when CrossSafeHazards returns an error,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("HazardUnsafeFrontierChecks returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
crossUnsafe := types.BlockSeal{Hash: common.Hash{0x01}}
usd.crossUnsafeFn = func(chainID types.ChainID) (types.BlockSeal, error) {
return crossUnsafe, nil
}
bl := eth.BlockRef{ParentHash: common.Hash{0x01}, Time: 1}
em1 := &types.ExecutingMessage{Timestamp: 1}
usd.openBlockFn = func(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
// include one executing message to ensure one hazard is returned
return bl, 0, map[uint32]*types.ExecutingMessage{1: em1}, nil
}
usd.deps = mockDependencySet{}
count := 0
// make HazardUnsafeFrontierChecks return an error by failing the second ChainIDFromIndex call
// (the first one is in CrossSafeHazards)
usd.deps.chainIDFromIndexfn = func() (types.ChainID, error) {
defer func() { count++ }()
if count == 1 {
return types.ChainID{}, errors.New("some error")
}
return types.ChainID{}, nil
}
// when HazardUnsafeFrontierChecks returns an error,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("successful update", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
crossUnsafe := types.BlockSeal{Hash: common.Hash{0x01}}
usd.crossUnsafeFn = func(chainID types.ChainID) (types.BlockSeal, error) {
return crossUnsafe, nil
}
bl := eth.BlockRef{ParentHash: common.Hash{0x01}, Time: 1}
em1 := &types.ExecutingMessage{Timestamp: 1}
usd.openBlockFn = func(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
// include one executing message to ensure one hazard is returned
return bl, 0, map[uint32]*types.ExecutingMessage{1: em1}, nil
}
usd.deps = mockDependencySet{}
var updatingChainID types.ChainID
var updatingBlock types.BlockSeal
usd.updateCrossUnsafeFn = func(chain types.ChainID, crossUnsafe types.BlockSeal) error {
updatingChainID = chain
updatingBlock = crossUnsafe
return nil
}
// when there are no errors, the cross-unsafe block is updated
// the updated block is the block opened in OpenBlock
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
require.NoError(t, err)
require.Equal(t, chainID, updatingChainID)
require.Equal(t, types.BlockSealFromRef(bl), updatingBlock)
})
}
type mockCrossUnsafeDeps struct {
deps mockDependencySet
crossUnsafeFn func(chainID types.ChainID) (types.BlockSeal, error)
openBlockFn func(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error)
updateCrossUnsafeFn func(chain types.ChainID, crossUnsafe types.BlockSeal) error
}
func (m *mockCrossUnsafeDeps) CrossUnsafe(chainID types.ChainID) (derived types.BlockSeal, err error) {
if m.crossUnsafeFn != nil {
return m.crossUnsafeFn(chainID)
}
return types.BlockSeal{}, nil
}
func (m *mockCrossUnsafeDeps) DependencySet() depset.DependencySet {
return m.deps
}
func (m *mockCrossUnsafeDeps) Check(chainID types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (types.BlockSeal, error) {
return types.BlockSeal{}, nil
}
func (m *mockCrossUnsafeDeps) OpenBlock(chainID types.ChainID, blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
if m.openBlockFn != nil {
return m.openBlockFn(chainID, blockNum)
}
return eth.BlockRef{}, 0, nil, nil
}
func (m *mockCrossUnsafeDeps) UpdateCrossUnsafe(chain types.ChainID, block types.BlockSeal) error {
if m.updateCrossUnsafeFn != nil {
return m.updateCrossUnsafeFn(chain, block)
}
return nil
}
func (m *mockCrossUnsafeDeps) IsCrossUnsafe(chainID types.ChainID, blockNum eth.BlockID) error {
return nil
}
func (m *mockCrossUnsafeDeps) IsLocalUnsafe(chainID types.ChainID, blockNum eth.BlockID) error {
return nil
}
func (m *mockCrossUnsafeDeps) ParentBlock(chainID types.ChainID, blockNum eth.BlockID) (eth.BlockID, error) {
return eth.BlockID{}, nil
}
package cross
import (
"context"
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
)
// Worker iterates work
type Worker struct {
log log.Logger
// workFn is the function to call to process the scope
workFn workFn
// channel with capacity of 1, full if there is work to do
poke chan struct{}
pollDuration time.Duration
// lifetime management of the chain processor
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// workFn is a function used by the worker
// it is opaque to the worker, and is set by the constructor
type workFn func(ctx context.Context) error
// NewWorker creates a new worker to process updates
func NewWorker(log log.Logger, workFn workFn) *Worker {
ctx, cancel := context.WithCancel(context.Background())
out := &Worker{
log: log,
poke: make(chan struct{}, 1),
// The data may have changed, and we may have missed a poke, so re-attempt regularly.
pollDuration: time.Second * 4,
ctx: ctx,
cancel: cancel,
}
out.workFn = workFn
return out
}
func (s *Worker) StartBackground() {
s.wg.Add(1)
go s.worker()
}
func (s *Worker) ProcessWork() error {
return s.workFn(s.ctx)
}
func (s *Worker) worker() {
defer s.wg.Done()
delay := time.NewTicker(s.pollDuration)
for {
if s.ctx.Err() != nil { // check if we are closing down
return
}
// do the work
err := s.workFn(s.ctx)
if err != nil {
if errors.Is(err, s.ctx.Err()) {
return
}
s.log.Error("Failed to process work", "err", err)
}
// await next time we process, or detect shutdown
select {
case <-s.ctx.Done():
delay.Stop()
return
case <-s.poke:
s.log.Debug("Continuing cross-safe verification after hint of new data")
continue
case <-delay.C:
s.log.Debug("Checking for cross-safe updates")
continue
}
}
}
func (s *Worker) OnNewData() error {
// signal that we have something to process
select {
case s.poke <- struct{}{}:
default:
// already requested an update
}
return nil
}
func (s *Worker) Close() {
s.cancel()
s.wg.Wait()
}
package cross
import (
"context"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestWorker(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
t.Run("do work", func(t *testing.T) {
count := 0
w := NewWorker(logger, func(ctx context.Context) error {
count++
return nil
})
t.Cleanup(w.Close)
// when ProcessWork is called, the workFn is called once
require.NoError(t, w.ProcessWork())
require.Equal(t, 1, count)
})
t.Run("background worker", func(t *testing.T) {
count := 0
w := NewWorker(logger, func(ctx context.Context) error {
count++
return nil
})
t.Cleanup(w.Close)
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Second
// when StartBackground is called, the worker runs in the background
// the count should increment once
w.StartBackground()
require.Eventually(t, func() bool {
return count == 1
}, 2*time.Second, 100*time.Millisecond)
})
t.Run("background worker OnNewData", func(t *testing.T) {
count := 0
w := NewWorker(logger, func(ctx context.Context) error {
count++
return nil
})
t.Cleanup(w.Close)
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Second
// when StartBackground is called, the worker runs in the background
// the count should increment once
w.StartBackground()
require.Eventually(t, func() bool {
return count == 1
}, 2*time.Second, 100*time.Millisecond)
// when OnNewData is called, the worker runs again
require.NoError(t, w.OnNewData())
require.Eventually(t, func() bool {
return count == 2
}, 2*time.Second, 100*time.Millisecond)
// and due to the long poll duration, the worker does not run again
require.Never(t, func() bool {
return count > 2
}, 10*time.Second, 100*time.Millisecond)
})
t.Run("background fast poll", func(t *testing.T) {
count := 0
w := NewWorker(logger, func(ctx context.Context) error {
count++
return nil
})
t.Cleanup(w.Close)
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Millisecond
// when StartBackground is called, the worker runs in the background
// the count should increment rapidly and reach 10 in 1 second
w.StartBackground()
require.Eventually(t, func() bool {
return count == 10
}, 2*time.Second, 100*time.Millisecond)
})
t.Run("close", func(t *testing.T) {
count := 0
w := NewWorker(logger, func(ctx context.Context) error {
count++
return nil
})
t.Cleanup(w.Close) // close on cleanup in case of early error
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Millisecond
// when StartBackground is called, the worker runs in the background
// the count should increment rapidly and reach 10 in 1 second
w.StartBackground()
require.Eventually(t, func() bool {
return count == 10
}, 2*time.Second, 100*time.Millisecond)
// once the worker is closed, it stops running
// and the count does not increment
w.Close()
stopCount := count
require.Never(t, func() bool {
return count != stopCount
}, 3*time.Second, 100*time.Millisecond)
})
}
...@@ -17,8 +17,6 @@ import ( ...@@ -17,8 +17,6 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
var ErrNoRPCSource = errors.New("no RPC client configured")
type Source interface { type Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error)
...@@ -137,7 +135,7 @@ func (s *ChainProcessor) work() { ...@@ -137,7 +135,7 @@ func (s *ChainProcessor) work() {
if err := s.update(target); err != nil { if err := s.update(target); err != nil {
if errors.Is(err, ethereum.NotFound) { if errors.Is(err, ethereum.NotFound) {
s.log.Info("Cannot find next block yet", "target", target) s.log.Info("Cannot find next block yet", "target", target)
} else if errors.Is(err, ErrNoRPCSource) { } else if errors.Is(err, types.ErrNoRPCSource) {
s.log.Warn("No RPC source configured, cannot process new blocks") s.log.Warn("No RPC source configured, cannot process new blocks")
} else { } else {
s.log.Error("Failed to process new block", "err", err) s.log.Error("Failed to process new block", "err", err)
...@@ -158,7 +156,7 @@ func (s *ChainProcessor) update(nextNum uint64) error { ...@@ -158,7 +156,7 @@ func (s *ChainProcessor) update(nextNum uint64) error {
defer s.clientLock.Unlock() defer s.clientLock.Unlock()
if s.client == nil { if s.client == nil {
return ErrNoRPCSource return types.ErrNoRPCSource
} }
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10) ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
......
...@@ -108,7 +108,7 @@ func TestLogProcessor(t *testing.T) { ...@@ -108,7 +108,7 @@ func TestLogProcessor(t *testing.T) {
}, },
} }
execMsg := types.ExecutingMessage{ execMsg := types.ExecutingMessage{
Chain: 4, Chain: 4, // TODO(#11105): translate chain ID to chain index
BlockNum: 6, BlockNum: 6,
LogIdx: 8, LogIdx: 8,
Timestamp: 10, Timestamp: 10,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment