Commit c19d51bf authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor: head pointers, refactor block processor (#12031)

* op-supervisor: supervisor-head-pointers squashed

change entry indices to head pointers, refactor block processor, backend fixes
Co-authored-by: default avatarAxel Kingsley <axel.kingsley@gmail.com>

* use ticker instead of time.After

---------
Co-authored-by: default avatarAxel Kingsley <axel.kingsley@gmail.com>
parent d90e4340
...@@ -48,7 +48,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg ...@@ -48,7 +48,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
} }
// create the head tracker // create the head tracker
headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json")) headTracker, err := heads.NewHeadTracker(logger, filepath.Join(cfg.Datadir, "heads.json"))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load existing heads: %w", err) return nil, fmt.Errorf("failed to load existing heads: %w", err)
} }
...@@ -190,7 +190,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa ...@@ -190,7 +190,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
chainID := identifier.ChainID chainID := identifier.ChainID
blockNum := identifier.BlockNumber blockNum := identifier.BlockNumber
logIdx := identifier.LogIndex logIdx := identifier.LogIndex
i, err := su.db.Check(chainID, blockNum, uint32(logIdx), payloadHash) _, err := su.db.Check(chainID, blockNum, uint32(logIdx), payloadHash)
if errors.Is(err, logs.ErrFuture) { if errors.Is(err, logs.ErrFuture) {
return types.Unsafe, nil return types.Unsafe, nil
} }
...@@ -207,8 +207,15 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa ...@@ -207,8 +207,15 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
db.NewSafetyChecker(types.Safe, su.db), db.NewSafetyChecker(types.Safe, su.db),
db.NewSafetyChecker(types.Finalized, su.db), db.NewSafetyChecker(types.Finalized, su.db),
} { } {
if i <= checker.CrossHeadForChain(chainID) { // check local safety limit first as it's more permissive
safest = checker.SafetyLevel() localPtr := checker.LocalHead(chainID)
if localPtr.WithinRange(blockNum, uint32(logIdx)) {
safest = checker.LocalSafetyLevel()
}
// check cross safety level
crossPtr := checker.CrossHead(chainID)
if crossPtr.WithinRange(blockNum, uint32(logIdx)) {
safest = checker.CrossSafetyLevel()
} }
} }
return safest, nil return safest, nil
...@@ -239,7 +246,7 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common. ...@@ -239,7 +246,7 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.
safest := types.CrossUnsafe safest := types.CrossUnsafe
// find the last log index in the block // find the last log index in the block
id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)} id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)}
i, err := su.db.FindSealedBlock(types.ChainID(*chainID), id) _, err := su.db.FindSealedBlock(types.ChainID(*chainID), id)
if errors.Is(err, logs.ErrFuture) { if errors.Is(err, logs.ErrFuture) {
return types.Unsafe, nil return types.Unsafe, nil
} }
...@@ -256,8 +263,15 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common. ...@@ -256,8 +263,15 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.
db.NewSafetyChecker(types.Safe, su.db), db.NewSafetyChecker(types.Safe, su.db),
db.NewSafetyChecker(types.Finalized, su.db), db.NewSafetyChecker(types.Finalized, su.db),
} { } {
if i <= checker.CrossHeadForChain(types.ChainID(*chainID)) { // check local safety limit first as it's more permissive
safest = checker.SafetyLevel() localPtr := checker.LocalHead(types.ChainID(*chainID))
if localPtr.IsSealed(uint64(blockNumber)) {
safest = checker.LocalSafetyLevel()
}
// check cross safety level
crossPtr := checker.CrossHead(types.ChainID(*chainID))
if crossPtr.IsSealed(uint64(blockNumber)) {
safest = checker.CrossSafetyLevel()
} }
} }
return safest, nil return safest, nil
......
...@@ -39,7 +39,7 @@ type LogStorage interface { ...@@ -39,7 +39,7 @@ type LogStorage interface {
// returns ErrDifferent if the known block does not match // returns ErrDifferent if the known block does not match
FindSealedBlock(block eth.BlockID) (nextEntry entrydb.EntryIdx, err error) FindSealedBlock(block eth.BlockID) (nextEntry entrydb.EntryIdx, err error)
IteratorStartingAt(i entrydb.EntryIdx) (logs.Iterator, error) IteratorStartingAt(sealedNum uint64, logsSince uint32) (logs.Iterator, error)
// returns ErrConflict if the log does not match the canonical chain. // returns ErrConflict if the log does not match the canonical chain.
// returns ErrFuture if the log is out of reach. // returns ErrFuture if the log is out of reach.
...@@ -50,8 +50,20 @@ type LogStorage interface { ...@@ -50,8 +50,20 @@ type LogStorage interface {
var _ LogStorage = (*logs.DB)(nil) var _ LogStorage = (*logs.DB)(nil)
type HeadsStorage interface { type HeadsStorage interface {
Current() *heads.Heads CrossUnsafe(id types.ChainID) heads.HeadPointer
Apply(op heads.Operation) error CrossSafe(id types.ChainID) heads.HeadPointer
CrossFinalized(id types.ChainID) heads.HeadPointer
LocalUnsafe(id types.ChainID) heads.HeadPointer
LocalSafe(id types.ChainID) heads.HeadPointer
LocalFinalized(id types.ChainID) heads.HeadPointer
UpdateCrossUnsafe(id types.ChainID, pointer heads.HeadPointer) error
UpdateCrossSafe(id types.ChainID, pointer heads.HeadPointer) error
UpdateCrossFinalized(id types.ChainID, pointer heads.HeadPointer) error
UpdateLocalUnsafe(id types.ChainID, pointer heads.HeadPointer) error
UpdateLocalSafe(id types.ChainID, pointer heads.HeadPointer) error
UpdateLocalFinalized(id types.ChainID, pointer heads.HeadPointer) error
} }
// ChainsDB is a database that stores logs and heads for multiple chains. // ChainsDB is a database that stores logs and heads for multiple chains.
...@@ -85,7 +97,7 @@ func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) { ...@@ -85,7 +97,7 @@ func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) {
func (db *ChainsDB) ResumeFromLastSealedBlock() error { func (db *ChainsDB) ResumeFromLastSealedBlock() error {
for chain, logStore := range db.logDBs { for chain, logStore := range db.logDBs {
headNum, ok := logStore.LatestSealedBlockNum() headNum, ok := logStore.LatestSealedBlockNum()
if ok { if !ok {
// db must be empty, nothing to rewind to // db must be empty, nothing to rewind to
db.logger.Info("Resuming, but found no DB contents", "chain", chain) db.logger.Info("Resuming, but found no DB contents", "chain", chain)
continue continue
...@@ -155,7 +167,7 @@ func (db *ChainsDB) updateAllHeads() error { ...@@ -155,7 +167,7 @@ func (db *ChainsDB) updateAllHeads() error {
safeChecker, safeChecker,
finalizedChecker} { finalizedChecker} {
if err := db.UpdateCrossHeads(checker); err != nil { if err := db.UpdateCrossHeads(checker); err != nil {
return fmt.Errorf("failed to update cross-heads for safety level %v: %w", checker.Name(), err) return fmt.Errorf("failed to update cross-heads for safety level %s: %w", checker, err)
} }
} }
return nil return nil
...@@ -165,13 +177,14 @@ func (db *ChainsDB) updateAllHeads() error { ...@@ -165,13 +177,14 @@ func (db *ChainsDB) updateAllHeads() error {
// the provided checker controls which heads are considered. // the provided checker controls which heads are considered.
func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error { func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error {
// start with the xsafe head of the chain // start with the xsafe head of the chain
xHead := checker.CrossHeadForChain(chainID) xHead := checker.CrossHead(chainID)
// advance as far as the local head // advance as far as the local head
localHead := checker.LocalHeadForChain(chainID) localHead := checker.LocalHead(chainID)
// get an iterator for the last checkpoint behind the x-head // get an iterator for the next item
iter, err := db.logDBs[chainID].IteratorStartingAt(xHead) iter, err := db.logDBs[chainID].IteratorStartingAt(xHead.LastSealedBlockNum, xHead.LogsSince)
if err != nil { if err != nil {
return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err) return fmt.Errorf("failed to open iterator at sealed block %d logsSince %d for chain %v: %w",
xHead.LastSealedBlockNum, xHead.LogsSince, chainID, err)
} }
// track if we updated the cross-head // track if we updated the cross-head
updated := false updated := false
...@@ -181,51 +194,92 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe ...@@ -181,51 +194,92 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
// - when we reach a message that is not safe // - when we reach a message that is not safe
// - if an error occurs // - if an error occurs
for { for {
if err := iter.NextExecMsg(); err == io.EOF { if err := iter.NextInitMsg(); errors.Is(err, logs.ErrFuture) {
// We ran out of events, but there can still be empty blocks.
// Take the last block we've processed, and try to update the x-head with it.
sealedBlockHash, sealedBlockNum, ok := iter.SealedBlock()
if !ok {
break
}
// We can only drop the logsSince value to 0 if the block is not seen.
if sealedBlockNum > xHead.LastSealedBlockNum {
// if we would exceed the local head, then abort
if !localHead.WithinRange(sealedBlockNum, 0) {
break
}
xHead = heads.HeadPointer{
LastSealedBlockHash: sealedBlockHash,
LastSealedBlockNum: sealedBlockNum,
LogsSince: 0,
}
updated = true
}
break break
} else if err != nil { } else if err != nil {
return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err) return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err)
} }
// if we would exceed the local head, then abort
if iter.NextIndex() > localHead { sealedBlockHash, sealedBlockNum, ok := iter.SealedBlock()
xHead = localHead // clip to local head if !ok {
updated = localHead != xHead
break break
} }
exec := iter.ExecMessage() _, logIdx, ok := iter.InitMessage()
if exec == nil { if !ok {
panic("expected executing message after traversing to one without error") break
} }
// use the checker to determine if this message is safe // if we would exceed the local head, then abort
safe := checker.Check( if !localHead.WithinRange(sealedBlockNum, logIdx) {
types.ChainIDFromUInt64(uint64(exec.Chain)),
exec.BlockNum,
exec.LogIdx,
exec.Hash)
if !safe {
break break
} }
// Check the executing message, if any
exec := iter.ExecMessage()
if exec != nil {
// Use the checker to determine if this message exists in the canonical chain,
// within the view of the checker's safety level
if err := checker.CheckCross(
types.ChainIDFromUInt64(uint64(exec.Chain)),
exec.BlockNum,
exec.LogIdx,
exec.Hash); err != nil {
if errors.Is(err, logs.ErrConflict) {
db.logger.Error("Bad executing message!", "err", err)
} else if errors.Is(err, logs.ErrFuture) {
db.logger.Warn("Executing message references future message", "err", err)
} else {
db.logger.Error("Failed to check executing message")
}
break
}
}
// if all is well, prepare the x-head update to this point // if all is well, prepare the x-head update to this point
xHead = iter.NextIndex() xHead = heads.HeadPointer{
LastSealedBlockHash: sealedBlockHash,
LastSealedBlockNum: sealedBlockNum,
LogsSince: logIdx + 1,
}
updated = true updated = true
} }
// have the checker create an update to the x-head in question, and apply that update
err = db.heads.Apply(checker.Update(chainID, xHead))
if err != nil {
return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
}
// if any chain was updated, we can trigger a maintenance request // if any chain was updated, we can trigger a maintenance request
// this allows for the maintenance loop to handle cascading updates // this allows for the maintenance loop to handle cascading updates
// instead of waiting for the next scheduled update // instead of waiting for the next scheduled update
if updated { if updated {
db.logger.Info("Promoting cross-head", "head", xHead, "safety-level", checker.SafetyLevel()) db.logger.Info("Promoting cross-head", "chain", chainID, "head", xHead, "safety-level", checker.CrossSafetyLevel())
err = checker.UpdateCross(chainID, xHead)
if err != nil {
return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
}
db.RequestMaintenance() db.RequestMaintenance()
} else { } else {
db.logger.Info("No cross-head update", "head", xHead, "safety-level", checker.SafetyLevel()) db.logger.Debug("No cross-head update", "chain", chainID, "head", xHead, "safety-level", checker.CrossSafetyLevel())
} }
return nil return nil
} }
func (db *ChainsDB) Heads() HeadsStorage {
return db.heads
}
// UpdateCrossHeads updates the cross-heads of all chains // UpdateCrossHeads updates the cross-heads of all chains
// based on the provided SafetyChecker. The SafetyChecker is used to determine // based on the provided SafetyChecker. The SafetyChecker is used to determine
// the safety of each log entry in the database, and the cross-head associated with it. // the safety of each log entry in the database, and the cross-head associated with it.
......
package db package db
/*
import ( import (
"errors" "errors"
"fmt"
"io" "io"
"math/rand" // nosemgrep "math/rand" // nosemgrep
"testing" "testing"
...@@ -182,9 +184,9 @@ func TestChainsDB_UpdateCrossHeadsError(t *testing.T) { ...@@ -182,9 +184,9 @@ func TestChainsDB_UpdateCrossHeadsError(t *testing.T) {
// but readability and maintainability would be improved by making this function more configurable. // but readability and maintainability would be improved by making this function more configurable.
func setupStubbedForUpdateHeads(chainID types.ChainID) (*stubLogDB, *stubChecker, *heads.Heads) { func setupStubbedForUpdateHeads(chainID types.ChainID) (*stubLogDB, *stubChecker, *heads.Heads) {
// the last known cross-safe head is at 20 // the last known cross-safe head is at 20
cross := entrydb.EntryIdx(20) cross := heads.HeadPointer{LastSealedBlockNum: 20}
// the local head (the limit of the update) is at 40 // the local head (the limit of the update) is at 40
local := entrydb.EntryIdx(40) local := heads.HeadPointer{LastSealedBlockNum: 40}
// the number of executing messages to make available (this should be more than the number of safety checks performed) // the number of executing messages to make available (this should be more than the number of safety checks performed)
numExecutingMessages := 30 numExecutingMessages := 30
// number of safety checks that will pass before returning false // number of safety checks that will pass before returning false
...@@ -245,39 +247,57 @@ func setupStubbedForUpdateHeads(chainID types.ChainID) (*stubLogDB, *stubChecker ...@@ -245,39 +247,57 @@ func setupStubbedForUpdateHeads(chainID types.ChainID) (*stubLogDB, *stubChecker
} }
type stubChecker struct { type stubChecker struct {
localHeadForChain entrydb.EntryIdx localHeadForChain heads.HeadPointer
crossHeadForChain entrydb.EntryIdx crossHeadForChain heads.HeadPointer
numSafe int numSafe int
checkCalls int checkCalls int
updated entrydb.EntryIdx updated heads.HeadPointer
} }
func (s *stubChecker) LocalHeadForChain(chainID types.ChainID) entrydb.EntryIdx { func (s *stubChecker) String() string {
return s.localHeadForChain return "stubChecker"
} }
func (s *stubChecker) Name() string { func (s *stubChecker) LocalSafetyLevel() types.SafetyLevel {
return "stubChecker" return types.Safe
} }
func (s *stubChecker) CrossHeadForChain(chainID types.ChainID) entrydb.EntryIdx { func (s *stubChecker) CrossSafetyLevel() types.SafetyLevel {
return types.Safe
}
func (s *stubChecker) LocalHead(chainID types.ChainID) heads.HeadPointer {
return s.localHeadForChain
}
func (s *stubChecker) CrossHead(chainID types.ChainID) heads.HeadPointer {
return s.crossHeadForChain return s.crossHeadForChain
} }
// stubbed Check returns true for the first numSafe calls, and false thereafter // stubbed Check returns true for the first numSafe calls, and false thereafter
func (s *stubChecker) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) bool { func (s *stubChecker) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) bool {
if s.checkCalls >= s.numSafe { if s.checkCalls >= s.numSafe {
return false return fmt.Errorf("safety check failed")
} }
s.checkCalls++ s.checkCalls++
return true return nil
}
func (s *stubChecker) CheckCross(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) error {
return s.check(chain, blockNum, logIdx, logHash)
}
func (s *stubChecker) CheckLocal(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) error {
return s.check(chain, blockNum, logIdx, logHash)
} }
func (s *stubChecker) Update(chain types.ChainID, index entrydb.EntryIdx) heads.OperationFn { func (s *stubChecker) Update(chain types.ChainID, h heads.HeadPointer) error {
s.updated = index s.updated = h
return func(heads *heads.Heads) error { return nil
return nil }
} func (s *stubChecker) UpdateCross(chain types.ChainID, h heads.HeadPointer) error {
return s.Update(chain, h)
}
func (s *stubChecker) UpdateLocal(chain types.ChainID, h heads.HeadPointer) error {
return s.Update(chain, h)
} }
func (s *stubChecker) SafetyLevel() types.SafetyLevel { func (s *stubChecker) SafetyLevel() types.SafetyLevel {
...@@ -288,6 +308,54 @@ type stubHeadStorage struct { ...@@ -288,6 +308,54 @@ type stubHeadStorage struct {
heads *heads.Heads heads *heads.Heads
} }
func (s *stubHeadStorage) UpdateLocalUnsafe(chainID types.ChainID, h heads.HeadPointer) error {
panic("not implemented")
}
func (s *stubHeadStorage) UpdateLocalSafe(chainID types.ChainID, h heads.HeadPointer) error {
panic("not implemented")
}
func (s *stubHeadStorage) UpdateLocalFinalized(chainID types.ChainID, h heads.HeadPointer) error {
panic("not implemented")
}
func (s *stubHeadStorage) UpdateCrossUnsafe(chainID types.ChainID, h heads.HeadPointer) error {
panic("not implemented")
}
func (s *stubHeadStorage) UpdateCrossSafe(chainID types.ChainID, h heads.HeadPointer) error {
panic("not implemented")
}
func (s *stubHeadStorage) UpdateCrossFinalized(chainID types.ChainID, h heads.HeadPointer) error {
panic("not implemented")
}
func (s *stubHeadStorage) LocalUnsafe(chainID types.ChainID) heads.HeadPointer {
panic("not implemented")
}
func (s *stubHeadStorage) LocalSafe(chainID types.ChainID) heads.HeadPointer {
panic("not implemented")
}
func (s *stubHeadStorage) LocalFinalized(chainID types.ChainID) heads.HeadPointer {
panic("not implemented")
}
func (s *stubHeadStorage) CrossUnsafe(chainID types.ChainID) heads.HeadPointer {
panic("not implemented")
}
func (s *stubHeadStorage) CrossSafe(chainID types.ChainID) heads.HeadPointer {
panic("not implemented")
}
func (s *stubHeadStorage) CrossFinalized(chainID types.ChainID) heads.HeadPointer {
panic("not implemented")
}
func (s *stubHeadStorage) Apply(heads.Operation) error { func (s *stubHeadStorage) Apply(heads.Operation) error {
return nil return nil
} }
...@@ -415,10 +483,10 @@ func (s *stubLogDB) FindSealedBlock(block eth.BlockID) (nextEntry entrydb.EntryI ...@@ -415,10 +483,10 @@ func (s *stubLogDB) FindSealedBlock(block eth.BlockID) (nextEntry entrydb.EntryI
panic("not implemented") panic("not implemented")
} }
func (s *stubLogDB) IteratorStartingAt(i entrydb.EntryIdx) (logs.Iterator, error) { func (s *stubLogDB) IteratorStartingAt(sealedNum uint64, logIndex uint32) (logs.Iterator, error) {
return &stubIterator{ return &stubIterator{
index: i - 1, //index: i - 1, // TODO broken
db: s, db: s,
}, nil }, nil
} }
...@@ -447,3 +515,4 @@ func (s *stubLogDB) LatestBlockNum() uint64 { ...@@ -447,3 +515,4 @@ func (s *stubLogDB) LatestBlockNum() uint64 {
func (s *stubLogDB) Close() error { func (s *stubLogDB) Close() error {
return nil return nil
} }
*/
...@@ -7,8 +7,12 @@ import ( ...@@ -7,8 +7,12 @@ import (
"os" "os"
"sync" "sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/ioutil" "github.com/ethereum-optimism/optimism/op-service/ioutil"
"github.com/ethereum-optimism/optimism/op-service/jsonutil" "github.com/ethereum-optimism/optimism/op-service/jsonutil"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
// HeadTracker records the current chain head pointers for a single chain. // HeadTracker records the current chain head pointers for a single chain.
...@@ -18,9 +22,95 @@ type HeadTracker struct { ...@@ -18,9 +22,95 @@ type HeadTracker struct {
path string path string
current *Heads current *Heads
logger log.Logger
}
func (t *HeadTracker) CrossUnsafe(id types.ChainID) HeadPointer {
return t.current.Get(id).CrossUnsafe
}
func (t *HeadTracker) CrossSafe(id types.ChainID) HeadPointer {
return t.current.Get(id).CrossSafe
}
func (t *HeadTracker) CrossFinalized(id types.ChainID) HeadPointer {
return t.current.Get(id).CrossFinalized
}
func (t *HeadTracker) LocalUnsafe(id types.ChainID) HeadPointer {
return t.current.Get(id).Unsafe
}
func (t *HeadTracker) LocalSafe(id types.ChainID) HeadPointer {
return t.current.Get(id).LocalSafe
}
func (t *HeadTracker) LocalFinalized(id types.ChainID) HeadPointer {
return t.current.Get(id).LocalFinalized
}
func (t *HeadTracker) UpdateCrossUnsafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Cross-unsafe update", "pointer", pointer)
h := heads.Get(id)
h.CrossUnsafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateCrossSafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Cross-safe update", "pointer", pointer)
h := heads.Get(id)
h.CrossSafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateCrossFinalized(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Cross-finalized update", "pointer", pointer)
h := heads.Get(id)
h.CrossFinalized = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateLocalUnsafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Local-unsafe update", "pointer", pointer)
h := heads.Get(id)
h.Unsafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateLocalSafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Local-safe update", "pointer", pointer)
h := heads.Get(id)
h.LocalSafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateLocalFinalized(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Local-finalized update", "pointer", pointer)
h := heads.Get(id)
h.LocalFinalized = pointer
heads.Put(id, h)
return nil
}))
} }
func NewHeadTracker(path string) (*HeadTracker, error) { func NewHeadTracker(logger log.Logger, path string) (*HeadTracker, error) {
current := NewHeads() current := NewHeads()
if data, err := os.ReadFile(path); errors.Is(err, os.ErrNotExist) { if data, err := os.ReadFile(path); errors.Is(err, os.ErrNotExist) {
// No existing file, just use empty heads // No existing file, just use empty heads
...@@ -34,6 +124,7 @@ func NewHeadTracker(path string) (*HeadTracker, error) { ...@@ -34,6 +124,7 @@ func NewHeadTracker(path string) (*HeadTracker, error) {
return &HeadTracker{ return &HeadTracker{
path: path, path: path,
current: current, current: current,
logger: logger,
}, nil }, nil
} }
......
package heads package heads
/*
import ( import (
"errors" "errors"
"os" "os"
...@@ -99,3 +100,4 @@ func TestHeads_NoChangesMadeIfWriteFails(t *testing.T) { ...@@ -99,3 +100,4 @@ func TestHeads_NoChangesMadeIfWriteFails(t *testing.T) {
require.ErrorIs(t, err, os.ErrNotExist) require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, ChainHeads{}, orig.Current().Get(chainA)) require.Equal(t, ChainHeads{}, orig.Current().Get(chainA))
} }
*/
...@@ -3,23 +3,48 @@ package heads ...@@ -3,23 +3,48 @@ package heads
import ( import (
"encoding/json" "encoding/json"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
type HeadPointer struct {
// LastSealedBlockHash is the last fully-processed block
LastSealedBlockHash common.Hash
LastSealedBlockNum uint64
// Number of logs that have been verified since the LastSealedBlock.
// These logs are contained in the block that builds on top of the LastSealedBlock.
LogsSince uint32
}
// WithinRange checks if the given log, in the given block,
// is within range (i.e. before or equal to the head-pointer).
// This does not guarantee that the log exists.
func (ptr *HeadPointer) WithinRange(blockNum uint64, logIdx uint32) bool {
if ptr.LastSealedBlockHash == (common.Hash{}) {
return false // no block yet
}
return blockNum <= ptr.LastSealedBlockNum ||
(blockNum+1 == ptr.LastSealedBlockNum && logIdx < ptr.LogsSince)
}
func (ptr *HeadPointer) IsSealed(blockNum uint64) bool {
if ptr.LastSealedBlockHash == (common.Hash{}) {
return false // no block yet
}
return blockNum <= ptr.LastSealedBlockNum
}
// ChainHeads provides the serialization format for the current chain heads. // ChainHeads provides the serialization format for the current chain heads.
// The values here could be block numbers or just the index of entries in the log db.
// If they're log db entries, we can't detect if things changed because of a reorg though (if the logdb write succeeded and head update failed).
// So we probably need to store actual block IDs here... but then we don't have the block hash for every block in the log db.
// Only jumping the head forward on checkpoint blocks doesn't work though...
type ChainHeads struct { type ChainHeads struct {
Unsafe entrydb.EntryIdx `json:"localUnsafe"` Unsafe HeadPointer `json:"localUnsafe"`
CrossUnsafe entrydb.EntryIdx `json:"crossUnsafe"` CrossUnsafe HeadPointer `json:"crossUnsafe"`
LocalSafe entrydb.EntryIdx `json:"localSafe"` LocalSafe HeadPointer `json:"localSafe"`
CrossSafe entrydb.EntryIdx `json:"crossSafe"` CrossSafe HeadPointer `json:"crossSafe"`
LocalFinalized entrydb.EntryIdx `json:"localFinalized"` LocalFinalized HeadPointer `json:"localFinalized"`
CrossFinalized entrydb.EntryIdx `json:"crossFinalized"` CrossFinalized HeadPointer `json:"crossFinalized"`
} }
type Heads struct { type Heads struct {
...@@ -35,6 +60,26 @@ func (h *Heads) Get(id types.ChainID) ChainHeads { ...@@ -35,6 +60,26 @@ func (h *Heads) Get(id types.ChainID) ChainHeads {
if !ok { if !ok {
return ChainHeads{} return ChainHeads{}
} }
// init to genesis
if chain.LocalFinalized == (HeadPointer{}) && chain.Unsafe.LastSealedBlockNum == 0 {
chain.LocalFinalized = chain.Unsafe
}
// Make sure the data is consistent
if chain.LocalSafe == (HeadPointer{}) {
chain.LocalSafe = chain.LocalFinalized
}
if chain.Unsafe == (HeadPointer{}) {
chain.Unsafe = chain.LocalSafe
}
if chain.CrossFinalized == (HeadPointer{}) && chain.LocalFinalized.LastSealedBlockNum == 0 {
chain.CrossFinalized = chain.LocalFinalized
}
if chain.CrossSafe == (HeadPointer{}) {
chain.CrossSafe = chain.CrossFinalized
}
if chain.CrossUnsafe == (HeadPointer{}) {
chain.CrossUnsafe = chain.CrossSafe
}
return chain return chain
} }
...@@ -50,7 +95,7 @@ func (h *Heads) Copy() *Heads { ...@@ -50,7 +95,7 @@ func (h *Heads) Copy() *Heads {
return c return c
} }
func (h Heads) MarshalJSON() ([]byte, error) { func (h *Heads) MarshalJSON() ([]byte, error) {
data := make(map[hexutil.U256]ChainHeads) data := make(map[hexutil.U256]ChainHeads)
for id, heads := range h.Chains { for id, heads := range h.Chains {
data[hexutil.U256(id)] = heads data[hexutil.U256(id)] = heads
......
...@@ -3,38 +3,52 @@ package heads ...@@ -3,38 +3,52 @@ package heads
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand" // nosemgrep
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
func TestHeads(t *testing.T) { func TestHeads(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
randHeadPtr := func() HeadPointer {
var h common.Hash
rng.Read(h[:])
return HeadPointer{
LastSealedBlockHash: h,
LastSealedBlockNum: rng.Uint64(),
LogsSince: rng.Uint32(),
}
}
t.Run("RoundTripViaJson", func(t *testing.T) { t.Run("RoundTripViaJson", func(t *testing.T) {
heads := NewHeads() heads := NewHeads()
heads.Put(types.ChainIDFromUInt64(3), ChainHeads{ heads.Put(types.ChainIDFromUInt64(3), ChainHeads{
Unsafe: 10, Unsafe: randHeadPtr(),
CrossUnsafe: 9, CrossUnsafe: randHeadPtr(),
LocalSafe: 8, LocalSafe: randHeadPtr(),
CrossSafe: 7, CrossSafe: randHeadPtr(),
LocalFinalized: 6, LocalFinalized: randHeadPtr(),
CrossFinalized: 5, CrossFinalized: randHeadPtr(),
}) })
heads.Put(types.ChainIDFromUInt64(9), ChainHeads{ heads.Put(types.ChainIDFromUInt64(9), ChainHeads{
Unsafe: 90, Unsafe: randHeadPtr(),
CrossUnsafe: 80, CrossUnsafe: randHeadPtr(),
LocalSafe: 70, LocalSafe: randHeadPtr(),
CrossSafe: 60, CrossSafe: randHeadPtr(),
LocalFinalized: 50, LocalFinalized: randHeadPtr(),
CrossFinalized: 40, CrossFinalized: randHeadPtr(),
}) })
heads.Put(types.ChainIDFromUInt64(4892497242424), ChainHeads{ heads.Put(types.ChainIDFromUInt64(4892497242424), ChainHeads{
Unsafe: 1000, Unsafe: randHeadPtr(),
CrossUnsafe: 900, CrossUnsafe: randHeadPtr(),
LocalSafe: 800, LocalSafe: randHeadPtr(),
CrossSafe: 700, CrossSafe: randHeadPtr(),
LocalFinalized: 600, LocalFinalized: randHeadPtr(),
CrossFinalized: 400, CrossFinalized: randHeadPtr(),
}) })
j, err := json.Marshal(heads) j, err := json.Marshal(heads)
...@@ -51,16 +65,16 @@ func TestHeads(t *testing.T) { ...@@ -51,16 +65,16 @@ func TestHeads(t *testing.T) {
chainA := types.ChainIDFromUInt64(3) chainA := types.ChainIDFromUInt64(3)
chainB := types.ChainIDFromUInt64(4) chainB := types.ChainIDFromUInt64(4)
chainAOrigHeads := ChainHeads{ chainAOrigHeads := ChainHeads{
Unsafe: 1, Unsafe: randHeadPtr(),
} }
chainAModifiedHeads1 := ChainHeads{ chainAModifiedHeads1 := ChainHeads{
Unsafe: 2, Unsafe: randHeadPtr(),
} }
chainAModifiedHeads2 := ChainHeads{ chainAModifiedHeads2 := ChainHeads{
Unsafe: 4, Unsafe: randHeadPtr(),
} }
chainBModifiedHeads := ChainHeads{ chainBModifiedHeads := ChainHeads{
Unsafe: 2, Unsafe: randHeadPtr(),
} }
heads := NewHeads() heads := NewHeads()
......
...@@ -149,37 +149,10 @@ func (db *DB) updateEntryCountMetric() { ...@@ -149,37 +149,10 @@ func (db *DB) updateEntryCountMetric() {
db.m.RecordDBEntryCount(db.store.Size()) db.m.RecordDBEntryCount(db.store.Size())
} }
func (db *DB) IteratorStartingAt(i entrydb.EntryIdx) (Iterator, error) { func (db *DB) IteratorStartingAt(sealedNum uint64, logsSince uint32) (Iterator, error) {
db.rwLock.RLock() db.rwLock.RLock()
defer db.rwLock.RUnlock() defer db.rwLock.RUnlock()
if i > db.lastEntryContext.nextEntryIndex { return db.newIteratorAt(sealedNum, logsSince)
return nil, ErrFuture
}
// TODO(#12031): Workaround while we not have IteratorStartingAt(heads.HeadPointer):
// scroll back from the index, to find block info.
idx := i
for ; idx >= 0; i-- {
entry, err := db.store.Read(idx)
if err != nil {
if errors.Is(err, io.EOF) {
continue // traverse to when we did have blocks
}
return nil, err
}
if entry.Type() == entrydb.TypeSearchCheckpoint {
break
}
if idx == 0 {
return nil, fmt.Errorf("empty DB, no block entry, cannot start at %d", i)
}
}
iter := db.newIterator(idx)
for iter.NextIndex() < i {
if _, err := iter.next(); err != nil {
return nil, errors.New("failed to process back up to the head pointer")
}
}
return iter, nil
} }
// FindSealedBlock finds the requested block, to check if it exists, // FindSealedBlock finds the requested block, to check if it exists,
......
package db package db
/*
import ( import (
"errors" "errors"
"testing" "testing"
...@@ -211,3 +212,4 @@ func TestCheck(t *testing.T) { ...@@ -211,3 +212,4 @@ func TestCheck(t *testing.T) {
}) })
} }
} }
*/
...@@ -5,16 +5,17 @@ import ( ...@@ -5,16 +5,17 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching" "github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
) )
// TODO(optimism#11032) Make these configurable and a sensible default // TODO(optimism#11032) Make these configurable and a sensible default
const epochPollInterval = 30 * time.Second const epochPollInterval = 3 * time.Second
const pollInterval = 2 * time.Second const pollInterval = 2 * time.Second
const trustRpc = false const trustRpc = false
const rpcKind = sources.RPCKindStandard const rpcKind = sources.RPCKindStandard
...@@ -25,6 +26,7 @@ type Metrics interface { ...@@ -25,6 +26,7 @@ type Metrics interface {
type Storage interface { type Storage interface {
LogStorage LogStorage
Heads() db.HeadsStorage
DatabaseRewinder DatabaseRewinder
LatestBlockNum(chainID types.ChainID) (num uint64, ok bool) LatestBlockNum(chainID types.ChainID) (num uint64, ok bool)
} }
...@@ -32,8 +34,9 @@ type Storage interface { ...@@ -32,8 +34,9 @@ type Storage interface {
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform // ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur. // interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct { type ChainMonitor struct {
log log.Logger log log.Logger
headMonitor *HeadMonitor headMonitor *HeadMonitor
chainProcessor *ChainProcessor
} }
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) { func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) {
...@@ -43,26 +46,26 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID ...@@ -43,26 +46,26 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID
return nil, err return nil, err
} }
latest, ok := store.LatestBlockNum(chainID) // Create the log processor and fetcher
if !ok { processLogs := newLogProcessor(chainID, store)
logger.Warn("") unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, processLogs, store)
}
startingHead := eth.L1BlockRef{ // create head processors which only update the head
Number: latest, unsafeHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalUnsafe)
} safeHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalSafe)
finalizedHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalFinalized)
processLogs := newLogProcessor(chainID, store) unsafeProcessors := []HeadProcessor{unsafeBlockProcessor, unsafeHeadProcessor}
fetchReceipts := newLogFetcher(cl, processLogs) safeProcessors := []HeadProcessor{safeHeadProcessor}
unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, startingHead, fetchReceipts, store) finalizedProcessors := []HeadProcessor{finalizedHeadProcessor}
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor} callback := newHeadUpdateProcessor(logger, unsafeProcessors, safeProcessors, finalizedProcessors)
callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback) headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback)
return &ChainMonitor{ return &ChainMonitor{
log: logger, log: logger,
headMonitor: headMonitor, headMonitor: headMonitor,
chainProcessor: unsafeBlockProcessor,
}, nil }, nil
} }
...@@ -72,6 +75,7 @@ func (c *ChainMonitor) Start() error { ...@@ -72,6 +75,7 @@ func (c *ChainMonitor) Start() error {
} }
func (c *ChainMonitor) Stop() error { func (c *ChainMonitor) Stop() error {
c.chainProcessor.Close()
return c.headMonitor.Stop() return c.headMonitor.Stop()
} }
......
...@@ -2,22 +2,31 @@ package source ...@@ -2,22 +2,31 @@ package source
import ( import (
"context" "context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
) )
type BlockByNumberSource interface { type Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error)
} }
type BlockProcessor interface { type LogProcessor interface {
ProcessBlock(ctx context.Context, block eth.L1BlockRef) error ProcessLogs(ctx context.Context, block eth.L1BlockRef, receipts gethtypes.Receipts) error
} }
type DatabaseRewinder interface { type DatabaseRewinder interface {
Rewind(chain types.ChainID, headBlockNum uint64) error Rewind(chain types.ChainID, headBlockNum uint64) error
LatestBlockNum(chain types.ChainID) (num uint64, ok bool)
} }
type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error
...@@ -29,58 +38,145 @@ func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRe ...@@ -29,58 +38,145 @@ func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRe
// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events. // ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped. // It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct { type ChainProcessor struct {
log log.Logger log log.Logger
client BlockByNumberSource client Source
chain types.ChainID
lastBlock eth.L1BlockRef chain types.ChainID
processor BlockProcessor
processor LogProcessor
rewinder DatabaseRewinder rewinder DatabaseRewinder
// the last known head. May be 0 if not known.
lastHead atomic.Uint64
// channel with capacity of 1, full if there is work to do
newHead chan struct{}
// bool to indicate if calls are synchronous
synchronous bool
// channel with capacity of 1, to signal work complete if running in synchroneous mode
out chan struct{}
// lifetime management of the chain processor
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
} }
func NewChainProcessor(log log.Logger, client BlockByNumberSource, chain types.ChainID, startingHead eth.L1BlockRef, processor BlockProcessor, rewinder DatabaseRewinder) *ChainProcessor { func NewChainProcessor(log log.Logger, client Source, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor {
return &ChainProcessor{ ctx, cancel := context.WithCancel(context.Background())
out := &ChainProcessor{
log: log, log: log,
client: client, client: client,
chain: chain, chain: chain,
lastBlock: startingHead,
processor: processor, processor: processor,
rewinder: rewinder, rewinder: rewinder,
newHead: make(chan struct{}, 1),
// default to synchronous because we want other processors to wait for this
// in the future we could make this async and have a separate mechanism which forwards the work signal to other processors
synchronous: true,
out: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
} }
out.wg.Add(1)
go out.worker()
return out
} }
func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) { func (s *ChainProcessor) nextNum() uint64 {
s.log.Debug("Processing chain", "chain", s.chain, "head", head, "last", s.lastBlock) headNum, ok := s.rewinder.LatestBlockNum(s.chain)
if head.Number <= s.lastBlock.Number { if !ok {
s.log.Info("head is not newer than last processed block", "head", head, "lastBlock", s.lastBlock) return 0 // genesis. We could change this to start at a later block.
return
} }
for s.lastBlock.Number+1 < head.Number { return headNum + 1
s.log.Debug("Filling in skipped block", "chain", s.chain, "lastBlock", s.lastBlock, "head", head) }
blockNum := s.lastBlock.Number + 1
nextBlock, err := s.client.L1BlockRefByNumber(ctx, blockNum) func (s *ChainProcessor) worker() {
if err != nil { defer s.wg.Done()
s.log.Error("Failed to fetch block info", "number", blockNum, "err", err)
delay := time.NewTicker(time.Second * 5)
for {
if s.ctx.Err() != nil { // check if we are closing down
return return
} }
if ok := s.processBlock(ctx, nextBlock); !ok { target := s.nextNum()
if err := s.update(target); err != nil {
s.log.Error("Failed to process new block", "err", err)
// idle until next update trigger
} else if x := s.lastHead.Load(); target+1 <= x {
s.log.Debug("Continuing with next block",
"newTarget", target+1, "lastHead", x)
continue // instantly continue processing, no need to idle
} else {
s.log.Debug("Idling block-processing, reached latest block", "head", target)
}
if s.synchronous {
s.out <- struct{}{}
}
// await next time we process, or detect shutdown
select {
case <-s.ctx.Done():
delay.Stop()
return return
case <-s.newHead:
s.log.Debug("Responding to new head signal")
continue
case <-delay.C:
s.log.Debug("Checking for updates")
continue
} }
} }
s.processBlock(ctx, head)
} }
func (s *ChainProcessor) processBlock(ctx context.Context, block eth.L1BlockRef) bool { func (s *ChainProcessor) update(nextNum uint64) error {
if err := s.processor.ProcessBlock(ctx, block); err != nil { ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
s.log.Error("Failed to process block", "block", block, "err", err) next, err := s.client.L1BlockRefByNumber(ctx, nextNum)
cancel()
if err != nil {
return fmt.Errorf("failed to fetch next block: %w", err)
}
// Try and fetch the receipts
ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
_, receipts, err := s.client.FetchReceipts(ctx, next.Hash)
cancel()
if err != nil {
return fmt.Errorf("failed to fetch receipts of block: %w", err)
}
if err := s.processor.ProcessLogs(ctx, next, receipts); err != nil {
s.log.Error("Failed to process block", "block", next, "err", err)
if next.Number == 0 { // cannot rewind genesis
return nil
}
// Try to rewind the database to the previous block to remove any logs from this block that were written // Try to rewind the database to the previous block to remove any logs from this block that were written
if err := s.rewinder.Rewind(s.chain, s.lastBlock.Number); err != nil { if err := s.rewinder.Rewind(s.chain, nextNum-1); err != nil {
// If any logs were written, our next attempt to write will fail and we'll retry this rewind. // If any logs were written, our next attempt to write will fail and we'll retry this rewind.
// If no logs were written successfully then the rewind wouldn't have done anything anyway. // If no logs were written successfully then the rewind wouldn't have done anything anyway.
s.log.Error("Failed to rewind after error processing block", "block", block, "err", err) s.log.Error("Failed to rewind after error processing block", "block", next, "err", err)
} }
return false // Don't update the last processed block so we will retry on next update
} }
s.lastBlock = block return nil
return true }
func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) error {
// update the latest target
s.lastHead.Store(head.Number)
// signal that we have something to process
select {
case s.newHead <- struct{}{}:
default:
// already requested an update
}
// if we are running synchronously, wait for the work to complete
if s.synchronous {
<-s.out
}
return nil
}
func (s *ChainProcessor) Close() {
s.cancel()
s.wg.Wait()
} }
package source package source
/* TODO
import ( import (
"context" "context"
"errors" "errors"
...@@ -22,7 +23,7 @@ func TestUnsafeBlocksStage(t *testing.T) { ...@@ -22,7 +23,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{} client := &stubBlockByNumberSource{}
processor := &stubBlockProcessor{} processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, processorChainID, eth.L1BlockRef{Number: 100}, processor, &stubRewinder{}) stage := NewChainProcessor(logger, client, processorChainID, processor, &stubRewinder{})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100}) stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99}) stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99})
...@@ -185,3 +186,4 @@ func (s *stubRewinder) Rewind(chainID types.ChainID, headBlockNum uint64) error ...@@ -185,3 +186,4 @@ func (s *stubRewinder) Rewind(chainID types.ChainID, headBlockNum uint64) error
s.rewindCalled = true s.rewindCalled = true
return nil return nil
} }
*/
package source
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type LogSource interface {
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
}
type ReceiptProcessor interface {
ProcessLogs(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error
}
type ReceiptProcessorFn func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error
func (r ReceiptProcessorFn) ProcessLogs(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
return r(ctx, block, rcpts)
}
type logFetcher struct {
client LogSource
processor ReceiptProcessor
}
func newLogFetcher(client LogSource, processor ReceiptProcessor) *logFetcher {
return &logFetcher{
client: client,
processor: processor,
}
}
var _ BlockProcessor = (*logFetcher)(nil)
func (l *logFetcher) ProcessBlock(ctx context.Context, block eth.L1BlockRef) error {
_, rcpts, err := l.client.FetchReceipts(ctx, block.Hash)
if err != nil {
return fmt.Errorf("failed to fetch receipts for block %v: %w", block, err)
}
return l.processor.ProcessLogs(ctx, block, rcpts)
}
package source
import (
"context"
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
func TestFetchLogs(t *testing.T) {
ctx := context.Background()
rcpts := types.Receipts{&types.Receipt{Type: 3}, &types.Receipt{Type: 4}}
t.Run("Success", func(t *testing.T) {
client := &stubLogSource{
rcpts: rcpts,
}
var processed []types.Receipts
processor := ReceiptProcessorFn(func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
processed = append(processed, rcpts)
return nil
})
fetcher := newLogFetcher(client, processor)
block := eth.L1BlockRef{Number: 11, Hash: common.Hash{0xaa}}
err := fetcher.ProcessBlock(ctx, block)
require.NoError(t, err)
require.Equal(t, []types.Receipts{rcpts}, processed)
})
t.Run("ReceiptFetcherError", func(t *testing.T) {
client := &stubLogSource{
err: errors.New("boom"),
}
processor := ReceiptProcessorFn(func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
t.Fatal("should not be called")
return nil
})
fetcher := newLogFetcher(client, processor)
block := eth.L1BlockRef{Number: 11, Hash: common.Hash{0xaa}}
err := fetcher.ProcessBlock(ctx, block)
require.ErrorIs(t, err, client.err)
})
t.Run("ProcessorError", func(t *testing.T) {
expectedErr := errors.New("boom")
client := &stubLogSource{
rcpts: rcpts,
}
processor := ReceiptProcessorFn(func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
return expectedErr
})
fetcher := newLogFetcher(client, processor)
block := eth.L1BlockRef{Number: 11, Hash: common.Hash{0xaa}}
err := fetcher.ProcessBlock(ctx, block)
require.ErrorIs(t, err, expectedErr)
})
}
type stubLogSource struct {
err error
rcpts types.Receipts
}
func (s *stubLogSource) FetchReceipts(_ context.Context, _ common.Hash) (eth.BlockInfo, types.Receipts, error) {
if s.err != nil {
return nil, nil, s.err
}
return nil, s.rcpts, nil
}
...@@ -3,18 +3,21 @@ package source ...@@ -3,18 +3,21 @@ package source
import ( import (
"context" "context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
type HeadProcessor interface { type HeadProcessor interface {
OnNewHead(ctx context.Context, head eth.L1BlockRef) OnNewHead(ctx context.Context, head eth.L1BlockRef) error
} }
type HeadProcessorFn func(ctx context.Context, head eth.L1BlockRef) type HeadProcessorFn func(ctx context.Context, head eth.L1BlockRef) error
func (f HeadProcessorFn) OnNewHead(ctx context.Context, head eth.L1BlockRef) { func (f HeadProcessorFn) OnNewHead(ctx context.Context, head eth.L1BlockRef) error {
f(ctx, head) return f(ctx, head)
} }
// headUpdateProcessor handles head update events and routes them to the appropriate handlers // headUpdateProcessor handles head update events and routes them to the appropriate handlers
...@@ -37,19 +40,37 @@ func newHeadUpdateProcessor(log log.Logger, unsafeProcessors []HeadProcessor, sa ...@@ -37,19 +40,37 @@ func newHeadUpdateProcessor(log log.Logger, unsafeProcessors []HeadProcessor, sa
func (n *headUpdateProcessor) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) { func (n *headUpdateProcessor) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New unsafe head", "block", block) n.log.Debug("New unsafe head", "block", block)
for _, processor := range n.unsafeProcessors { for _, processor := range n.unsafeProcessors {
processor.OnNewHead(ctx, block) if err := processor.OnNewHead(ctx, block); err != nil {
n.log.Error("unsafe-head processing failed", "err", err)
}
} }
} }
func (n *headUpdateProcessor) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) { func (n *headUpdateProcessor) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New safe head", "block", block) n.log.Debug("New safe head", "block", block)
for _, processor := range n.safeProcessors { for _, processor := range n.safeProcessors {
processor.OnNewHead(ctx, block) if err := processor.OnNewHead(ctx, block); err != nil {
n.log.Error("safe-head processing failed", "err", err)
}
} }
} }
func (n *headUpdateProcessor) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) { func (n *headUpdateProcessor) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New finalized head", "block", block) n.log.Debug("New finalized head", "block", block)
for _, processor := range n.finalizedProcessors { for _, processor := range n.finalizedProcessors {
processor.OnNewHead(ctx, block) if err := processor.OnNewHead(ctx, block); err != nil {
n.log.Error("finalized-head processing failed", "err", err)
}
}
}
// OnNewHead is a util function to turn a head-signal processor into head-pointer updater
func OnNewHead(id types.ChainID, apply func(id types.ChainID, v heads.HeadPointer) error) HeadProcessorFn {
return func(ctx context.Context, head eth.L1BlockRef) error {
return apply(id, heads.HeadPointer{
LastSealedBlockHash: head.Hash,
LastSealedBlockNum: head.Number,
LogsSince: 0,
})
} }
} }
...@@ -16,8 +16,9 @@ func TestHeadUpdateProcessor(t *testing.T) { ...@@ -16,8 +16,9 @@ func TestHeadUpdateProcessor(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3) processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor { makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) { return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) error {
processed[idx] = head processed[idx] = head
return nil
}) })
} }
headUpdates := newHeadUpdateProcessor(logger, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil, nil) headUpdates := newHeadUpdateProcessor(logger, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil, nil)
...@@ -30,8 +31,9 @@ func TestHeadUpdateProcessor(t *testing.T) { ...@@ -30,8 +31,9 @@ func TestHeadUpdateProcessor(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3) processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor { makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) { return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) error {
processed[idx] = head processed[idx] = head
return nil
}) })
} }
headUpdates := newHeadUpdateProcessor(logger, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil) headUpdates := newHeadUpdateProcessor(logger, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil)
...@@ -44,8 +46,9 @@ func TestHeadUpdateProcessor(t *testing.T) { ...@@ -44,8 +46,9 @@ func TestHeadUpdateProcessor(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3) processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor { makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) { return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) error {
processed[idx] = head processed[idx] = head
return nil
}) })
} }
headUpdates := newHeadUpdateProcessor(logger, nil, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}) headUpdates := newHeadUpdateProcessor(logger, nil, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)})
......
...@@ -73,7 +73,7 @@ func (lvl SafetyLevel) String() string { ...@@ -73,7 +73,7 @@ func (lvl SafetyLevel) String() string {
func (lvl SafetyLevel) Valid() bool { func (lvl SafetyLevel) Valid() bool {
switch lvl { switch lvl {
case Finalized, Safe, CrossUnsafe, Unsafe: case CrossFinalized, Finalized, Safe, CrossUnsafe, Unsafe:
return true return true
default: default:
return false return false
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment