Commit 56d336e9 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

supervisor: Record new L1 on ProvideL1 and Notify L2 Finality (#13678)

* Record new L1 on ProvideL1 and Notify L2 Finality

* lint ; expand unit tests

* Add comment for safe use of RecordNewL1
parent 33330b1c
...@@ -517,6 +517,10 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types. ...@@ -517,6 +517,10 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.
return nil return nil
} }
func (su *SupervisorBackend) RecordNewL1(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error {
return su.chainDBs.RecordNewL1(chain, ref)
}
// Access to synchronous processing for tests // Access to synchronous processing for tests
// ---------------------------- // ----------------------------
......
...@@ -114,60 +114,65 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error { ...@@ -114,60 +114,65 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
db.logger.Info("Updated finalized L1", "finalizedL1", finalized) db.logger.Info("Updated finalized L1", "finalizedL1", finalized)
db.finalizedL1.Unlock() db.finalizedL1.Unlock()
// whenever the L1 Finalized changes, the L2 Finalized may change, notify subscribers // whenver the L1 Finalized changes, the L2 Finalized may change, notify subscribers
db.NotifyL2Finalized() for _, chain := range db.depSet.Chains() {
db.NotifyL2Finalized(chain)
}
return nil return nil
} }
// NotifyL2Finalized notifies all L2 finality subscribers of the latest L2 finalized block, per chain. // NotifyL2Finalized notifies all L2 finality subscribers of the latest L2 finalized block for the given chain.
func (db *ChainsDB) NotifyL2Finalized() { func (db *ChainsDB) NotifyL2Finalized(chain types.ChainID) {
for _, chain := range db.depSet.Chains() { f, err := db.Finalized(chain)
f, err := db.Finalized(chain) if err != nil {
if err != nil { db.logger.Error("Failed to get finalized L1 block", "chain", chain, "err", err)
db.logger.Error("Failed to get finalized L2 block", "chain", chain, "err", err) return
continue }
} sub, ok := db.l2FinalityFeeds.Get(chain)
sub, ok := db.l2FinalityFeeds.Get(chain) if ok {
if ok { sub.Send(f)
sub.Send(f)
}
} }
} }
// RecordNewL1 records a new L1 block in the database. // RecordNewL1 records a new L1 block in the database for a given chain.
// it uses the latest derived L2 block as the derived block for the new L1 block. // It uses the latest derived L2 block as the derived block for the new L1 block.
func (db *ChainsDB) RecordNewL1(ref eth.BlockRef) error { // It also triggers L2 Finality Notifications, as a new L1 may change L2 finality.
for _, chain := range db.depSet.Chains() { // NOTE: callers to this function are responsible for ensuring that advancing the L1 block is correct
// get local derivation database // (ie that no further L2 blocks need to be recorded) because if the L1 block is recorded with a gap in derived blocks,
ldb, ok := db.localDBs.Get(chain) // the database is considered corrupted and the supervisor will not be able to proceed without pruning the database.
if !ok { // The database cannot protect against this because it is does not know how many L2 blocks to expect for a given L1 block.
return fmt.Errorf("cannot RecordNewL1 to chain %s: %w", chain, types.ErrUnknownChain) func (db *ChainsDB) RecordNewL1(chain types.ChainID, ref eth.BlockRef) error {
} // get local derivation database
// get the latest derived and derivedFrom blocks ldb, ok := db.localDBs.Get(chain)
derivedFrom, derived, err := ldb.Latest() if !ok {
if err != nil { return fmt.Errorf("cannot RecordNewL1 to chain %s: %w", chain, types.ErrUnknownChain)
return fmt.Errorf("failed to get latest derivedFrom for chain %s: %w", chain, err) }
} // get the latest derived and derivedFrom blocks
// make a ref from the latest derived block derivedFrom, derived, err := ldb.Latest()
derivedParent, err := ldb.PreviousDerived(derived.ID()) if err != nil {
if errors.Is(err, types.ErrFuture) { return fmt.Errorf("failed to get latest derivedFrom for chain %s: %w", chain, err)
db.logger.Warn("Empty DB, Recording first L1 block", "chain", chain, "err", err) }
} else if err != nil { // make a ref from the latest derived block
db.logger.Warn("Failed to get latest derivedfrom to insert new L1 block", "chain", chain, "err", err) derivedParent, err := ldb.PreviousDerived(derived.ID())
return err if errors.Is(err, types.ErrFuture) {
} db.logger.Warn("Empty DB, Recording first L1 block", "chain", chain, "err", err)
derivedRef := derived.MustWithParent(derivedParent.ID()) } else if err != nil {
// don't push the new L1 block if it's not newer than the latest derived block db.logger.Warn("Failed to get latest derivedfrom to insert new L1 block", "chain", chain, "err", err)
if derivedFrom.Number >= ref.Number { return err
db.logger.Warn("L1 block has already been processed for this height", "chain", chain, "block", ref, "latest", derivedFrom) }
continue derivedRef := derived.MustWithParent(derivedParent.ID())
} // don't push the new L1 block if it's not newer than the latest derived block
// the database is extended with the new L1 and the existing L2 if derivedFrom.Number >= ref.Number {
if err = db.UpdateLocalSafe(chain, ref, derivedRef); err != nil { db.logger.Warn("L1 block has already been processed for this height", "chain", chain, "block", ref, "latest", derivedFrom)
db.logger.Error("Failed to update local safe", "chain", chain, "block", ref, "derived", derived, "err", err) return nil
return err }
} // the database is extended with the new L1 and the existing L2
if err = db.UpdateLocalSafe(chain, ref, derivedRef); err != nil {
db.logger.Error("Failed to update local safe", "chain", chain, "block", ref, "derived", derived, "err", err)
return err
} }
// now tht the db has the new L1, we can attempt to to notify the L2 finality subscribers
db.NotifyL2Finalized(chain)
return nil return nil
} }
...@@ -137,6 +137,7 @@ var _ SyncControl = (*mockSyncControl)(nil) ...@@ -137,6 +137,7 @@ var _ SyncControl = (*mockSyncControl)(nil)
type mockBackend struct { type mockBackend struct {
updateLocalUnsafeFn func(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error updateLocalUnsafeFn func(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error
updateLocalSafeFn func(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error updateLocalSafeFn func(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error
recordL1Fn func(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error
} }
func (m *mockBackend) LocalSafe(ctx context.Context, chainID types.ChainID) (pair types.DerivedIDPair, err error) { func (m *mockBackend) LocalSafe(ctx context.Context, chainID types.ChainID) (pair types.DerivedIDPair, err error) {
...@@ -177,6 +178,13 @@ func (m *mockBackend) L1BlockRefByNumber(ctx context.Context, number uint64) (et ...@@ -177,6 +178,13 @@ func (m *mockBackend) L1BlockRefByNumber(ctx context.Context, number uint64) (et
return eth.L1BlockRef{}, nil return eth.L1BlockRef{}, nil
} }
func (m *mockBackend) RecordNewL1(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error {
if m.recordL1Fn != nil {
return m.recordL1Fn(ctx, chain, ref)
}
return nil
}
var _ backend = (*mockBackend)(nil) var _ backend = (*mockBackend)(nil)
func sampleDepSet(t *testing.T) depset.DependencySet { func sampleDepSet(t *testing.T) depset.DependencySet {
......
...@@ -38,6 +38,7 @@ type backend interface { ...@@ -38,6 +38,7 @@ type backend interface {
SafeDerivedAt(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error) SafeDerivedAt(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error)
Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error)
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
RecordNewL1(ctx context.Context, chainID types.ChainID, l1 eth.BlockRef) error
} }
const ( const (
...@@ -357,6 +358,15 @@ func (m *ManagedNode) onExhaustL1Event(completed types.DerivedBlockRefPair) { ...@@ -357,6 +358,15 @@ func (m *ManagedNode) onExhaustL1Event(completed types.DerivedBlockRefPair) {
// but does not fit on the derivation state. // but does not fit on the derivation state.
return return
} }
// now that the node has the next L1 block, we can add it to the database
// this ensures that only the L1 *or* the L2 ever increments in the derivation database,
// as RecordNewL1 will insert the new L1 block with the latest L2 block
ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
defer cancel()
err = m.backend.RecordNewL1(ctx, m.chainID, nextL1)
if err != nil {
m.log.Warn("Failed to record new L1 block", "l1Block", nextL1, "err", err)
}
} }
func (m *ManagedNode) AwaitSentCrossUnsafeUpdate(ctx context.Context, minNum uint64) error { func (m *ManagedNode) AwaitSentCrossUnsafeUpdate(ctx context.Context, minNum uint64) error {
......
...@@ -28,6 +28,8 @@ func TestEventResponse(t *testing.T) { ...@@ -28,6 +28,8 @@ func TestEventResponse(t *testing.T) {
nodeUnsafe := 0 nodeUnsafe := 0
nodeDerivation := 0 nodeDerivation := 0
nodeExhausted := 0 nodeExhausted := 0
// recordL1 is called along with nodeExhausted
recordL1 := 0
// the node will call UpdateCrossUnsafe when a cross-unsafe event is received from the database // the node will call UpdateCrossUnsafe when a cross-unsafe event is received from the database
syncCtrl.updateCrossUnsafeFn = func(ctx context.Context, id eth.BlockID) error { syncCtrl.updateCrossUnsafeFn = func(ctx context.Context, id eth.BlockID) error {
...@@ -61,6 +63,10 @@ func TestEventResponse(t *testing.T) { ...@@ -61,6 +63,10 @@ func TestEventResponse(t *testing.T) {
nodeExhausted++ nodeExhausted++
return nil return nil
} }
backend.recordL1Fn = func(ctx context.Context, chainID types.ChainID, ref eth.L1BlockRef) error {
recordL1++
return nil
}
// TODO(#13595): rework node-reset, and include testing for it here // TODO(#13595): rework node-reset, and include testing for it here
node.Start() node.Start()
...@@ -85,4 +91,7 @@ func TestEventResponse(t *testing.T) { ...@@ -85,4 +91,7 @@ func TestEventResponse(t *testing.T) {
nodeDerivation >= 1 && nodeDerivation >= 1 &&
nodeExhausted >= 1 nodeExhausted >= 1
}, 4*time.Second, 250*time.Millisecond) }, 4*time.Second, 250*time.Millisecond)
// recordL1 is called every time nodeExhausted is called
require.Equal(t, nodeExhausted, recordL1)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment