Commit 55165087 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

supervisor: L1 Finality Processor (#13274)

* Add Finality Handling to L1 Processor

* Add E2E Test for Finality Tracking

* Completely remove UpdateFinalizedL1 API

* use BlockRef in Supervisor Client return
parent 0d99a87a
......@@ -21,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/interop"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
......@@ -142,6 +143,11 @@ func (sa *SupervisorActor) SyncCrossSafe(t helpers.Testing, chainID types.ChainI
require.NoError(t, sa.backend.SyncCrossSafe(chainID))
}
func (sa *SupervisorActor) SyncFinalizedL1(t helpers.Testing, ref eth.BlockRef) {
sa.backend.SyncFinalizedL1(ref)
require.Equal(t, ref, sa.backend.FinalizedL1())
}
// worldToDepSet converts a set of chain configs into a dependency-set for the supervisor.
func worldToDepSet(t helpers.Testing, worldOutput *interopgen.WorldOutput) *depset.StaticConfigDependencySet {
depSetCfg := make(map[types.ChainID]*depset.StaticConfigDependency)
......
......@@ -90,6 +90,7 @@ func TestFullInterop(gt *testing.T) {
actors.L1Miner.ActL1FinalizeNext(t)
actors.ChainA.Sequencer.ActL1SafeSignal(t)
actors.ChainA.Sequencer.ActL1FinalizedSignal(t)
actors.Supervisor.SyncFinalizedL1(t, status.HeadL1)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
finalizedL2BlockID, err := actors.Supervisor.Finalized(t.Ctx(), actors.ChainA.ChainID)
require.NoError(t, err)
......
......@@ -105,6 +105,24 @@ func TestInterop_IsolatedChains(t *testing.T) {
setupAndRun(t, config, test)
}
// TestInterop_SupervisorFinality tests that the supervisor updates its finality
// It waits for the finalized block to advance past the genesis block.
func TestInterop_SupervisorFinality(t *testing.T) {
test := func(t *testing.T, s2 SuperSystem) {
supervisor := s2.SupervisorClient()
require.Eventually(t, func() bool {
final, err := supervisor.FinalizedL1(context.Background())
require.NoError(t, err)
return final.Number > 0
// this test takes about 30 seconds, with a longer Eventually timeout for CI
}, time.Second*60, time.Second, "wait for finalized block to be greater than 0")
}
config := SuperSystemConfig{
mempoolFiltering: false,
}
setupAndRun(t, config, test)
}
// TestInterop_EmitLogs tests a simple interop scenario
// Chains A and B exist, but no messages are sent between them.
// A contract is deployed on each chain, and logs are emitted repeatedly.
......
......@@ -32,7 +32,6 @@ type InteropBackend interface {
UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error
UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.BlockRef) error
UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.L1BlockRef) error
}
// For testing usage, the backend of the supervisor implements the interface, no need for RPC.
......@@ -153,12 +152,10 @@ func (d *InteropDeriver) onFinalizedL1(x finality.FinalizeL1Event) {
if !d.cfg.IsInterop(x.FinalizedL1.Time) {
return
}
d.log.Debug("Signaling finalized L1 update to interop backend", "finalized", x.FinalizedL1)
ctx, cancel := context.WithTimeout(d.driverCtx, rpcTimeout)
defer cancel()
if err := d.backend.UpdateFinalizedL1(ctx, d.chainID, x.FinalizedL1); err != nil {
d.log.Warn("Failed to signal finalized L1 block to interop backend", "finalized", x.FinalizedL1, "err", err)
}
// there used to be code here which sent the finalized L1 block to the supervisor
// but the supervisor manages its own finality now
// so we don't need to do anything here besides emit the event.
// New L2 blocks may be ready to finalize now that the backend knows of new L1 finalized info.
d.emitter.Emit(engine.RequestFinalizedUpdateEvent{})
}
......
......@@ -158,7 +158,6 @@ func TestInteropDeriver(t *testing.T) {
t.Run("finalized L1 trigger cross-L2 finality check", func(t *testing.T) {
emitter.ExpectOnce(engine.RequestFinalizedUpdateEvent{})
finalizedL1 := testutils.RandomBlockRef(rng)
interopBackend.ExpectUpdateFinalizedL1(chainID, finalizedL1, nil)
interopDeriver.OnEvent(finality.FinalizeL1Event{
FinalizedL1: finalizedL1,
})
......
......@@ -114,6 +114,15 @@ func (cl *SupervisorClient) Finalized(ctx context.Context, chainID types.ChainID
return result, err
}
func (cl *SupervisorClient) FinalizedL1(ctx context.Context) (eth.BlockRef, error) {
var result eth.BlockRef
err := cl.client.CallContext(
ctx,
&result,
"supervisor_finalizedL1")
return result, err
}
func (cl *SupervisorClient) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (eth.BlockRef, error) {
var result eth.BlockRef
err := cl.client.CallContext(
......@@ -144,15 +153,6 @@ func (cl *SupervisorClient) UpdateLocalSafe(ctx context.Context, chainID types.C
lastDerived)
}
func (cl *SupervisorClient) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalizedL1 eth.L1BlockRef) error {
return cl.client.CallContext(
ctx,
nil,
"supervisor_updateFinalizedL1",
chainID,
finalizedL1)
}
func (cl *SupervisorClient) Close() {
cl.client.Close()
}
......@@ -94,10 +94,6 @@ func (m *MockInteropBackend) UpdateFinalizedL1(ctx context.Context, chainID type
return *result.Get(0).(*error)
}
func (m *MockInteropBackend) ExpectUpdateFinalizedL1(chainID types.ChainID, finalized eth.L1BlockRef, err error) {
m.Mock.On("UpdateFinalizedL1", chainID, finalized).Once().Return(&err)
}
func (m *MockInteropBackend) AssertExpectations(t mock.TestingT) {
m.Mock.AssertExpectations(t)
}
......@@ -450,6 +450,10 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI
return v.ID(), nil
}
func (su *SupervisorBackend) FinalizedL1() eth.BlockRef {
return su.chainDBs.FinalizedL1()
}
func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
if err != nil {
......@@ -478,10 +482,6 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.
return nil
}
func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
return su.chainDBs.UpdateFinalizedL1(finalized)
}
// Access to synchronous processing for tests
// ----------------------------
......@@ -509,3 +509,7 @@ func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
}
return ch.ProcessWork()
}
func (su *SupervisorBackend) SyncFinalizedL1(ref eth.BlockRef) {
processors.MaybeUpdateFinalizedL1(context.Background(), su.logger, su.chainDBs, ref)
}
......@@ -148,6 +148,10 @@ func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom types.BlockSea
return crossDB.Latest()
}
func (db *ChainsDB) FinalizedL1() eth.BlockRef {
return db.finalizedL1.Get()
}
func (db *ChainsDB) Finalized(chainID types.ChainID) (types.BlockSeal, error) {
finalizedL1 := db.finalizedL1.Get()
if finalizedL1 == (eth.L1BlockRef{}) {
......
......@@ -62,6 +62,10 @@ func (m *MockBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth
return eth.BlockID{}, nil
}
func (m *MockBackend) FinalizedL1() eth.BlockRef {
return eth.BlockRef{}
}
func (m *MockBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
return eth.BlockRef{}, nil
}
......
......@@ -8,23 +8,28 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
)
type chainsDB interface {
RecordNewL1(ref eth.BlockRef) error
LastCommonL1() (types.BlockSeal, error)
FinalizedL1() eth.BlockRef
UpdateFinalizedL1(finalized eth.BlockRef) error
}
type L1Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error)
}
type L1Processor struct {
log log.Logger
client L1Source
clientMu sync.Mutex
clientMu sync.RWMutex
running atomic.Bool
finalitySub ethereum.Subscription
currentNumber uint64
tickDuration time.Duration
......@@ -38,11 +43,12 @@ type L1Processor struct {
func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor {
ctx, cancel := context.WithCancel(context.Background())
tickDuration := 6 * time.Second
return &L1Processor{
client: client,
db: cdb,
log: log.New("service", "l1-processor"),
tickDuration: 6 * time.Second,
tickDuration: tickDuration,
ctx: ctx,
cancel: cancel,
}
......@@ -51,7 +57,20 @@ func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor
func (p *L1Processor) AttachClient(client L1Source) {
p.clientMu.Lock()
defer p.clientMu.Unlock()
// unsubscribe from the old client
if p.finalitySub != nil {
p.finalitySub.Unsubscribe()
}
// make the new client the active one
p.client = client
// resubscribe to the new client
p.finalitySub = eth.PollBlockChanges(
p.log,
p.client,
p.handleFinalized,
eth.Finalized,
3*time.Second,
10*time.Second)
}
func (p *L1Processor) Start() {
......@@ -68,6 +87,13 @@ func (p *L1Processor) Start() {
}
p.wg.Add(1)
go p.worker()
p.finalitySub = eth.PollBlockChanges(
p.log,
p.client,
p.handleFinalized,
eth.Finalized,
p.tickDuration,
p.tickDuration)
}
func (p *L1Processor) Stop() {
......@@ -103,8 +129,8 @@ func (p *L1Processor) worker() {
// if a new block is found, it is recorded in the database and the target number is updated
// in the future it will also kick of derivation management for the sync nodes
func (p *L1Processor) work() error {
p.clientMu.Lock()
defer p.clientMu.Unlock()
p.clientMu.RLock()
defer p.clientMu.RUnlock()
nextNumber := p.currentNumber + 1
ref, err := p.client.L1BlockRefByNumber(p.ctx, nextNumber)
if err != nil {
......@@ -125,3 +151,30 @@ func (p *L1Processor) work() error {
p.currentNumber = nextNumber
return nil
}
// handleFinalized is called when a new finalized block is received from the L1 chain subscription
// it updates the database with the new finalized block if it is newer than the current one
func (p *L1Processor) handleFinalized(ctx context.Context, sig eth.L1BlockRef) {
MaybeUpdateFinalizedL1(ctx, p.log, p.db, sig)
}
// MaybeUpdateFinalizedL1 updates the database with the new finalized block if it is newer than the current one
// it is defined outside of the L1Processor so tests can call it directly without having a processor
func MaybeUpdateFinalizedL1(ctx context.Context, logger log.Logger, db chainsDB, ref eth.L1BlockRef) {
// do something with the new block
logger.Debug("Received new Finalized L1 block", "block", ref)
currentFinalized := db.FinalizedL1()
if currentFinalized.Number > ref.Number {
logger.Warn("Finalized block in database is newer than subscribed finalized block", "current", currentFinalized, "new", ref)
return
}
if ref.Number > currentFinalized.Number || currentFinalized == (eth.BlockRef{}) {
// update the database with the new finalized block
if err := db.UpdateFinalizedL1(ref); err != nil {
logger.Warn("Failed to update finalized L1", "err", err)
return
}
logger.Debug("Updated finalized L1 block", "block", ref)
}
}
......@@ -16,6 +16,8 @@ import (
type mockChainsDB struct {
recordNewL1Fn func(ref eth.BlockRef) error
lastCommonL1Fn func() (types.BlockSeal, error)
finalizedL1Fn func() eth.BlockRef
updateFinalizedL1Fn func(finalized eth.BlockRef) error
}
func (m *mockChainsDB) RecordNewL1(ref eth.BlockRef) error {
......@@ -32,10 +34,28 @@ func (m *mockChainsDB) LastCommonL1() (types.BlockSeal, error) {
return types.BlockSeal{}, nil
}
func (m *mockChainsDB) FinalizedL1() eth.BlockRef {
if m.finalizedL1Fn != nil {
return m.finalizedL1Fn()
}
return eth.BlockRef{}
}
func (m *mockChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
if m.updateFinalizedL1Fn != nil {
return m.updateFinalizedL1Fn(finalized)
}
return nil
}
type mockL1BlockRefByNumberFetcher struct {
l1BlockByNumberFn func() (eth.L1BlockRef, error)
}
func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByLabel(context.Context, eth.BlockLabel) (eth.L1BlockRef, error) {
return eth.L1BlockRef{}, nil
}
func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) {
if m.l1BlockByNumberFn != nil {
return m.l1BlockByNumberFn()
......@@ -103,5 +123,37 @@ func TestL1Processor(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
})
t.Run("Updates L1 Finalized", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef {
return eth.BlockRef{Number: 0}
}
proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error {
require.Equal(t, uint64(10), finalized.Number)
return nil
}
proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10})
})
t.Run("No L1 Finalized Update for Same Number", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef {
return eth.BlockRef{Number: 10}
}
proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error {
require.Fail(t, "should not be called")
return nil
}
proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10})
})
t.Run("No L1 Finalized Update When Behind", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).finalizedL1Fn = func() eth.BlockRef {
return eth.BlockRef{Number: 20}
}
proc.db.(*mockChainsDB).updateFinalizedL1Fn = func(finalized eth.BlockRef) error {
require.Fail(t, "should not be called")
return nil
}
proc.handleFinalized(context.Background(), eth.BlockRef{Number: 10})
})
}
......@@ -21,12 +21,12 @@ type QueryBackend interface {
UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error)
SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error)
Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error)
FinalizedL1() eth.BlockRef
}
type UpdatesBackend interface {
UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error
UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error
UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error
}
type Backend interface {
......@@ -67,6 +67,10 @@ func (q *QueryFrontend) Finalized(ctx context.Context, chainID types.ChainID) (e
return q.Supervisor.Finalized(ctx, chainID)
}
func (q *QueryFrontend) FinalizedL1() eth.BlockRef {
return q.Supervisor.FinalizedL1()
}
func (q *QueryFrontend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
return q.Supervisor.CrossDerivedFrom(ctx, chainID, derived)
}
......@@ -105,7 +109,3 @@ func (u *UpdatesFrontend) UpdateLocalUnsafe(ctx context.Context, chainID types.C
func (u *UpdatesFrontend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
return u.Supervisor.UpdateLocalSafe(ctx, chainID, derivedFrom, lastDerived)
}
func (u *UpdatesFrontend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
return u.Supervisor.UpdateFinalizedL1(ctx, chainID, finalized)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment