Commit 745b251d authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor: cleanup, refactor, local-safe info from op-node (#12427)

* op-supervisor: cleanup, refactor to take local-safe info from op-node

* Refactor ChainProcessor Worker

* remove unneeded err check

* semgrep

---------
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
parent 77289937
...@@ -2,7 +2,6 @@ package interop ...@@ -2,7 +2,6 @@ package interop
import ( import (
"context" "context"
"fmt"
"math/big" "math/big"
"testing" "testing"
"time" "time"
...@@ -86,14 +85,12 @@ func TestInteropTrivial(t *testing.T) { ...@@ -86,14 +85,12 @@ func TestInteropTrivial(t *testing.T) {
require.Equal(t, expectedBalance, bobBalance) require.Equal(t, expectedBalance, bobBalance)
s2.DeployEmitterContract(chainA, "Alice") s2.DeployEmitterContract(chainA, "Alice")
rec := s2.EmitData(chainA, "Alice", "0x1234567890abcdef")
fmt.Println("Result of emitting event:", rec)
s2.DeployEmitterContract(chainB, "Alice") s2.DeployEmitterContract(chainB, "Alice")
rec = s2.EmitData(chainB, "Alice", "0x1234567890abcdef") for i := 0; i < 1; i++ {
s2.EmitData(chainA, "Alice", "0x1234567890abcdef")
fmt.Println("Result of emitting event:", rec) s2.EmitData(chainB, "Alice", "0x1234567890abcdef")
}
time.Sleep(60 * time.Second) time.Sleep(60 * time.Second)
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -68,7 +67,12 @@ func (cl *SupervisorClient) AddL2RPC( ...@@ -68,7 +67,12 @@ func (cl *SupervisorClient) AddL2RPC(
func (cl *SupervisorClient) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) { func (cl *SupervisorClient) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
var result types.ReferenceView var result types.ReferenceView
err := cl.client.CallContext(ctx, &result, "supervisor_unsafeView", (*hexutil.U256)(&chainID), unsafe) err := cl.client.CallContext(
ctx,
&result,
"supervisor_unsafeView",
chainID,
unsafe)
if err != nil { if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to share unsafe block view %s (chain %s): %w", unsafe, chainID, err) return types.ReferenceView{}, fmt.Errorf("failed to share unsafe block view %s (chain %s): %w", unsafe, chainID, err)
} }
...@@ -77,7 +81,12 @@ func (cl *SupervisorClient) UnsafeView(ctx context.Context, chainID types.ChainI ...@@ -77,7 +81,12 @@ func (cl *SupervisorClient) UnsafeView(ctx context.Context, chainID types.ChainI
func (cl *SupervisorClient) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) { func (cl *SupervisorClient) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
var result types.ReferenceView var result types.ReferenceView
err := cl.client.CallContext(ctx, &result, "supervisor_safeView", (*hexutil.U256)(&chainID), safe) err := cl.client.CallContext(
ctx,
&result,
"supervisor_safeView",
chainID,
safe)
if err != nil { if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to share safe block view %s (chain %s): %w", safe, chainID, err) return types.ReferenceView{}, fmt.Errorf("failed to share safe block view %s (chain %s): %w", safe, chainID, err)
} }
......
This diff is collapsed.
...@@ -4,20 +4,17 @@ import ( ...@@ -4,20 +4,17 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/safety"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
var ( var ErrUnknownChain = errors.New("unknown chain")
ErrUnknownChain = errors.New("unknown chain")
)
type LogStorage interface { type LogStorage interface {
io.Closer io.Closer
...@@ -31,58 +28,92 @@ type LogStorage interface { ...@@ -31,58 +28,92 @@ type LogStorage interface {
LatestSealedBlockNum() (n uint64, ok bool) LatestSealedBlockNum() (n uint64, ok bool)
// FindSealedBlock finds the requested block, to check if it exists, // FindSealedBlock finds the requested block by number, to check if it exists,
// returning the next index after it where things continue from. // returning the block seal if it was found.
// returns ErrFuture if the block is too new to be able to tell // returns ErrFuture if the block is too new to be able to tell.
// returns ErrDifferent if the known block does not match FindSealedBlock(number uint64) (block types.BlockSeal, err error)
FindSealedBlock(block eth.BlockID) (nextEntry entrydb.EntryIdx, err error)
IteratorStartingAt(sealedNum uint64, logsSince uint32) (logs.Iterator, error) IteratorStartingAt(sealedNum uint64, logsSince uint32) (logs.Iterator, error)
// returns ErrConflict if the log does not match the canonical chain. // Contains returns no error iff the specified logHash is recorded in the specified blockNum and logIdx.
// returns ErrFuture if the log is out of reach. // If the log is out of reach, then ErrFuture is returned.
// returns nil if the log is known and matches the canonical chain. // If the log is determined to conflict with the canonical chain, then ErrConflict is returned.
Contains(blockNum uint64, logIdx uint32, logHash common.Hash) (nextIndex entrydb.EntryIdx, err error) // logIdx is the index of the log in the array of all logs in the block.
// This can be used to check the validity of cross-chain interop events.
// The block-seal of the blockNum block, that the log was included in, is returned.
// This seal may be fully zeroed, without error, if the block isn't fully known yet.
Contains(blockNum uint64, logIdx uint32, logHash common.Hash) (includedIn types.BlockSeal, err error)
}
type LocalDerivedFromStorage interface {
Last() (derivedFrom eth.BlockRef, derived eth.BlockRef, err error)
AddDerived(derivedFrom eth.BlockRef, derived eth.BlockRef) error
LastDerived(derivedFrom eth.BlockID) (derived eth.BlockID, err error)
DerivedFrom(derived eth.BlockID) (derivedFrom eth.BlockID, err error)
}
type CrossDerivedFromStorage interface {
LocalDerivedFromStorage
// This will start to differ with reorg support
} }
var _ LogStorage = (*logs.DB)(nil) var _ LogStorage = (*logs.DB)(nil)
// ChainsDB is a database that stores logs and heads for multiple chains. // ChainsDB is a database that stores logs and derived-from data for multiple chains.
// it implements the ChainsStorage interface. // it implements the ChainsStorage interface.
type ChainsDB struct { type ChainsDB struct {
logDBs map[types.ChainID]LogStorage // RW mutex:
safetyIndex safety.SafetyIndex // Read = chains can be read / mutated.
logger log.Logger // Write = set of chains is changing.
mu sync.RWMutex
// unsafe info: the sequence of block seals and events
logDBs map[types.ChainID]LogStorage
// cross-unsafe: how far we have processed the unsafe data.
crossUnsafe map[types.ChainID]types.BlockSeal
// local-safe: index of what we optimistically know about L2 blocks being derived from L1
localDBs map[types.ChainID]LocalDerivedFromStorage
// cross-safe: index of L2 blocks we know to only have cross-L2 valid dependencies
crossDBs map[types.ChainID]CrossDerivedFromStorage
// finalized: the L1 finality progress. This can be translated into what may be considered as finalized in L2.
// It is initially zeroed, and the L2 finality query will return
// an error until it has this L1 finality to work with.
finalizedL1 eth.L1BlockRef
logger log.Logger
} }
func NewChainsDB(logDBs map[types.ChainID]LogStorage, l log.Logger) *ChainsDB { func NewChainsDB(l log.Logger) *ChainsDB {
ret := &ChainsDB{ return &ChainsDB{
logDBs: logDBs, logDBs: make(map[types.ChainID]LogStorage),
logger: l, logger: l,
localDBs: make(map[types.ChainID]LocalDerivedFromStorage),
crossDBs: make(map[types.ChainID]CrossDerivedFromStorage),
crossUnsafe: make(map[types.ChainID]types.BlockSeal),
} }
ret.safetyIndex = safety.NewSafetyIndex(l, ret)
return ret
} }
func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) { func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) {
db.mu.Lock()
defer db.mu.Unlock()
if db.logDBs[chain] != nil { if db.logDBs[chain] != nil {
log.Warn("overwriting existing logDB for chain", "chain", chain) log.Warn("overwriting existing logDB for chain", "chain", chain)
} }
db.logDBs[chain] = logDB db.logDBs[chain] = logDB
} }
func (db *ChainsDB) IteratorStartingAt(chain types.ChainID, sealedNum uint64, logIndex uint32) (logs.Iterator, error) {
logDB, ok := db.logDBs[chain]
if !ok {
return nil, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.IteratorStartingAt(sealedNum, logIndex)
}
// ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart. // ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database, // It rewinds the database to the last block that is guaranteed to have been fully recorded to the database,
// to ensure it can resume recording from the first log of the next block. // to ensure it can resume recording from the first log of the next block.
func (db *ChainsDB) ResumeFromLastSealedBlock() error { func (db *ChainsDB) ResumeFromLastSealedBlock() error {
db.mu.RLock()
defer db.mu.RUnlock()
for chain, logStore := range db.logDBs { for chain, logStore := range db.logDBs {
headNum, ok := logStore.LatestSealedBlockNum() headNum, ok := logStore.LatestSealedBlockNum()
if !ok { if !ok {
...@@ -98,100 +129,10 @@ func (db *ChainsDB) ResumeFromLastSealedBlock() error { ...@@ -98,100 +129,10 @@ func (db *ChainsDB) ResumeFromLastSealedBlock() error {
return nil return nil
} }
// Check calls the underlying logDB to determine if the given log entry is safe with respect to the checker's criteria.
func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (common.Hash, error) {
logDB, ok := db.logDBs[chain]
if !ok {
return common.Hash{}, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
_, err := logDB.Contains(blockNum, logIdx, logHash)
if err != nil {
return common.Hash{}, err
}
// TODO(#11693): need to get the actual block hash for this log entry for reorg detection
return common.Hash{}, nil
}
// Safest returns the strongest safety level that can be guaranteed for the given log entry.
// it assumes the log entry has already been checked and is valid, this funcion only checks safety levels.
func (db *ChainsDB) Safest(chainID types.ChainID, blockNum uint64, index uint32) (safest types.SafetyLevel) {
safest = types.LocalUnsafe
if crossUnsafe, err := db.safetyIndex.CrossUnsafeL2(chainID); err == nil && crossUnsafe.WithinRange(blockNum, index) {
safest = types.CrossUnsafe
}
if localSafe, err := db.safetyIndex.LocalSafeL2(chainID); err == nil && localSafe.WithinRange(blockNum, index) {
safest = types.LocalSafe
}
if crossSafe, err := db.safetyIndex.LocalSafeL2(chainID); err == nil && crossSafe.WithinRange(blockNum, index) {
safest = types.CrossSafe
}
if finalized, err := db.safetyIndex.FinalizedL2(chainID); err == nil {
if finalized.Number >= blockNum {
safest = types.Finalized
}
}
return
}
func (db *ChainsDB) FindSealedBlock(chain types.ChainID, block eth.BlockID) (nextEntry entrydb.EntryIdx, err error) {
logDB, ok := db.logDBs[chain]
if !ok {
return 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.FindSealedBlock(block)
}
// LatestBlockNum returns the latest fully-sealed block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
// The block number might not be available (empty database, or non-existent chain).
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) {
logDB, knownChain := db.logDBs[chain]
if !knownChain {
return 0, false
}
return logDB.LatestSealedBlockNum()
}
func (db *ChainsDB) AddLog(
chain types.ChainID,
logHash common.Hash,
parentBlock eth.BlockID,
logIdx uint32,
execMsg *types.ExecutingMessage) error {
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.AddLog(logHash, parentBlock, logIdx, execMsg)
}
func (db *ChainsDB) SealBlock(
chain types.ChainID,
block eth.BlockRef) error {
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
err := logDB.SealBlock(block.ParentHash, block.ID(), block.Time)
if err != nil {
return fmt.Errorf("failed to seal block %v: %w", block, err)
}
err = db.safetyIndex.UpdateLocalUnsafe(chain, block)
if err != nil {
return fmt.Errorf("failed to update local-unsafe: %w", err)
}
return nil
}
func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error {
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.Rewind(headBlockNum)
}
func (db *ChainsDB) Close() error { func (db *ChainsDB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
var combined error var combined error
for id, logDB := range db.logDBs { for id, logDB := range db.logDBs {
if err := logDB.Close(); err != nil { if err := logDB.Close(); err != nil {
......
This diff is collapsed.
...@@ -9,71 +9,19 @@ import ( ...@@ -9,71 +9,19 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const ( const EntrySize = 34
EntrySize = 34
)
type EntryIdx int64 type EntryIdx int64
type Entry [EntrySize]byte type EntryType interface {
String() string
func (entry Entry) Type() EntryType { ~uint8
return EntryType(entry[0])
} }
type EntryTypeFlag uint8 type Entry[T EntryType] [EntrySize]byte
const (
FlagSearchCheckpoint EntryTypeFlag = 1 << TypeSearchCheckpoint
FlagCanonicalHash EntryTypeFlag = 1 << TypeCanonicalHash
FlagInitiatingEvent EntryTypeFlag = 1 << TypeInitiatingEvent
FlagExecutingLink EntryTypeFlag = 1 << TypeExecutingLink
FlagExecutingCheck EntryTypeFlag = 1 << TypeExecutingCheck
FlagPadding EntryTypeFlag = 1 << TypePadding
// for additional padding
FlagPadding2 EntryTypeFlag = FlagPadding << 1
)
func (ex EntryTypeFlag) Any(v EntryTypeFlag) bool {
return ex&v != 0
}
func (ex *EntryTypeFlag) Add(v EntryTypeFlag) {
*ex = *ex | v
}
func (ex *EntryTypeFlag) Remove(v EntryTypeFlag) { func (entry Entry[T]) Type() T {
*ex = *ex &^ v return T(entry[0])
}
type EntryType uint8
const (
TypeSearchCheckpoint EntryType = iota
TypeCanonicalHash
TypeInitiatingEvent
TypeExecutingLink
TypeExecutingCheck
TypePadding
)
func (d EntryType) String() string {
switch d {
case TypeSearchCheckpoint:
return "searchCheckpoint"
case TypeCanonicalHash:
return "canonicalHash"
case TypeInitiatingEvent:
return "initiatingEvent"
case TypeExecutingLink:
return "executingLink"
case TypeExecutingCheck:
return "executingCheck"
case TypePadding:
return "padding"
default:
return fmt.Sprintf("unknown-%d", uint8(d))
}
} }
// dataAccess defines a minimal API required to manipulate the actual stored data. // dataAccess defines a minimal API required to manipulate the actual stored data.
...@@ -85,7 +33,7 @@ type dataAccess interface { ...@@ -85,7 +33,7 @@ type dataAccess interface {
Truncate(size int64) error Truncate(size int64) error
} }
type EntryDB struct { type EntryDB[T EntryType] struct {
data dataAccess data dataAccess
lastEntryIdx EntryIdx lastEntryIdx EntryIdx
...@@ -97,7 +45,7 @@ type EntryDB struct { ...@@ -97,7 +45,7 @@ type EntryDB struct {
// If the file exists it will be used as the existing data. // If the file exists it will be used as the existing data.
// Returns ErrRecoveryRequired if the existing file is not a valid entry db. A EntryDB is still returned but all // Returns ErrRecoveryRequired if the existing file is not a valid entry db. A EntryDB is still returned but all
// operations will return ErrRecoveryRequired until the Recover method is called. // operations will return ErrRecoveryRequired until the Recover method is called.
func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) { func NewEntryDB[T EntryType](logger log.Logger, path string) (*EntryDB[T], error) {
logger.Info("Opening entry database", "path", path) logger.Info("Opening entry database", "path", path)
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil { if err != nil {
...@@ -108,7 +56,7 @@ func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) { ...@@ -108,7 +56,7 @@ func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) {
return nil, fmt.Errorf("failed to stat database at %v: %w", path, err) return nil, fmt.Errorf("failed to stat database at %v: %w", path, err)
} }
size := info.Size() / EntrySize size := info.Size() / EntrySize
db := &EntryDB{ db := &EntryDB[T]{
data: file, data: file,
lastEntryIdx: EntryIdx(size - 1), lastEntryIdx: EntryIdx(size - 1),
} }
...@@ -121,24 +69,24 @@ func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) { ...@@ -121,24 +69,24 @@ func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) {
return db, nil return db, nil
} }
func (e *EntryDB) Size() int64 { func (e *EntryDB[T]) Size() int64 {
return int64(e.lastEntryIdx) + 1 return int64(e.lastEntryIdx) + 1
} }
func (e *EntryDB) LastEntryIdx() EntryIdx { func (e *EntryDB[T]) LastEntryIdx() EntryIdx {
return e.lastEntryIdx return e.lastEntryIdx
} }
// Read an entry from the database by index. Returns io.EOF iff idx is after the last entry. // Read an entry from the database by index. Returns io.EOF iff idx is after the last entry.
func (e *EntryDB) Read(idx EntryIdx) (Entry, error) { func (e *EntryDB[T]) Read(idx EntryIdx) (Entry[T], error) {
if idx > e.lastEntryIdx { if idx > e.lastEntryIdx {
return Entry{}, io.EOF return Entry[T]{}, io.EOF
} }
var out Entry var out Entry[T]
read, err := e.data.ReadAt(out[:], int64(idx)*EntrySize) read, err := e.data.ReadAt(out[:], int64(idx)*EntrySize)
// Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte // Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte
if err != nil && !(errors.Is(err, io.EOF) && read == EntrySize) { if err != nil && !(errors.Is(err, io.EOF) && read == EntrySize) {
return Entry{}, fmt.Errorf("failed to read entry %v: %w", idx, err) return Entry[T]{}, fmt.Errorf("failed to read entry %v: %w", idx, err)
} }
return out, nil return out, nil
} }
...@@ -147,7 +95,7 @@ func (e *EntryDB) Read(idx EntryIdx) (Entry, error) { ...@@ -147,7 +95,7 @@ func (e *EntryDB) Read(idx EntryIdx) (Entry, error) {
// The entries are combined in memory and passed to a single Write invocation. // The entries are combined in memory and passed to a single Write invocation.
// If the write fails, it will attempt to truncate any partially written data. // If the write fails, it will attempt to truncate any partially written data.
// Subsequent writes to this instance will fail until partially written data is truncated. // Subsequent writes to this instance will fail until partially written data is truncated.
func (e *EntryDB) Append(entries ...Entry) error { func (e *EntryDB[T]) Append(entries ...Entry[T]) error {
if e.cleanupFailedWrite { if e.cleanupFailedWrite {
// Try to rollback partially written data from a previous Append // Try to rollback partially written data from a previous Append
if truncateErr := e.Truncate(e.lastEntryIdx); truncateErr != nil { if truncateErr := e.Truncate(e.lastEntryIdx); truncateErr != nil {
...@@ -177,7 +125,7 @@ func (e *EntryDB) Append(entries ...Entry) error { ...@@ -177,7 +125,7 @@ func (e *EntryDB) Append(entries ...Entry) error {
} }
// Truncate the database so that the last retained entry is idx. Any entries after idx are deleted. // Truncate the database so that the last retained entry is idx. Any entries after idx are deleted.
func (e *EntryDB) Truncate(idx EntryIdx) error { func (e *EntryDB[T]) Truncate(idx EntryIdx) error {
if err := e.data.Truncate((int64(idx) + 1) * EntrySize); err != nil { if err := e.data.Truncate((int64(idx) + 1) * EntrySize); err != nil {
return fmt.Errorf("failed to truncate to entry %v: %w", idx, err) return fmt.Errorf("failed to truncate to entry %v: %w", idx, err)
} }
...@@ -188,13 +136,13 @@ func (e *EntryDB) Truncate(idx EntryIdx) error { ...@@ -188,13 +136,13 @@ func (e *EntryDB) Truncate(idx EntryIdx) error {
} }
// recover an invalid database by truncating back to the last complete event. // recover an invalid database by truncating back to the last complete event.
func (e *EntryDB) recover() error { func (e *EntryDB[T]) recover() error {
if err := e.data.Truncate((e.Size()) * EntrySize); err != nil { if err := e.data.Truncate((e.Size()) * EntrySize); err != nil {
return fmt.Errorf("failed to truncate trailing partial entries: %w", err) return fmt.Errorf("failed to truncate trailing partial entries: %w", err)
} }
return nil return nil
} }
func (e *EntryDB) Close() error { func (e *EntryDB[T]) Close() error {
return e.data.Close() return e.data.Close()
} }
...@@ -3,6 +3,7 @@ package entrydb ...@@ -3,6 +3,7 @@ package entrydb
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
...@@ -13,6 +14,14 @@ import ( ...@@ -13,6 +14,14 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type TestEntryType uint8
func (typ TestEntryType) String() string {
return fmt.Sprintf("%d", uint8(typ))
}
type TestEntry = Entry[TestEntryType]
func TestReadWrite(t *testing.T) { func TestReadWrite(t *testing.T) {
t.Run("BasicReadWrite", func(t *testing.T) { t.Run("BasicReadWrite", func(t *testing.T) {
db := createEntryDB(t) db := createEntryDB(t)
...@@ -114,7 +123,7 @@ func TestTruncateTrailingPartialEntries(t *testing.T) { ...@@ -114,7 +123,7 @@ func TestTruncateTrailingPartialEntries(t *testing.T) {
copy(invalidData[EntrySize:], entry2[:]) copy(invalidData[EntrySize:], entry2[:])
invalidData[len(invalidData)-1] = 3 // Some invalid trailing data invalidData[len(invalidData)-1] = 3 // Some invalid trailing data
require.NoError(t, os.WriteFile(file, invalidData, 0o644)) require.NoError(t, os.WriteFile(file, invalidData, 0o644))
db, err := NewEntryDB(logger, file) db, err := NewEntryDB[TestEntryType](logger, file)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
...@@ -177,19 +186,19 @@ func TestWriteErrors(t *testing.T) { ...@@ -177,19 +186,19 @@ func TestWriteErrors(t *testing.T) {
}) })
} }
func requireRead(t *testing.T, db *EntryDB, idx EntryIdx, expected Entry) { func requireRead(t *testing.T, db *EntryDB[TestEntryType], idx EntryIdx, expected TestEntry) {
actual, err := db.Read(idx) actual, err := db.Read(idx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expected, actual) require.Equal(t, expected, actual)
} }
func createEntry(i byte) Entry { func createEntry(i byte) TestEntry {
return Entry(bytes.Repeat([]byte{i}, EntrySize)) return TestEntry(bytes.Repeat([]byte{i}, EntrySize))
} }
func createEntryDB(t *testing.T) *EntryDB { func createEntryDB(t *testing.T) *EntryDB[TestEntryType] {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
db, err := NewEntryDB(logger, filepath.Join(t.TempDir(), "entries.db")) db, err := NewEntryDB[TestEntryType](logger, filepath.Join(t.TempDir(), "entries.db"))
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
...@@ -197,9 +206,9 @@ func createEntryDB(t *testing.T) *EntryDB { ...@@ -197,9 +206,9 @@ func createEntryDB(t *testing.T) *EntryDB {
return db return db
} }
func createEntryDBWithStubData() (*EntryDB, *stubDataAccess) { func createEntryDBWithStubData() (*EntryDB[TestEntryType], *stubDataAccess) {
stubData := &stubDataAccess{} stubData := &stubDataAccess{}
db := &EntryDB{data: stubData, lastEntryIdx: -1} db := &EntryDB[TestEntryType]{data: stubData, lastEntryIdx: -1}
return db, stubData return db, stubData
} }
......
package entrydb
import "errors"
var (
// ErrOutOfOrder happens when you try to add data to the DB,
// but it does not actually fit onto the latest data (by being too old or new).
ErrOutOfOrder = errors.New("data out of order")
// ErrDataCorruption happens when the underlying DB has some I/O issue
ErrDataCorruption = errors.New("data corruption")
// ErrSkipped happens when we try to retrieve data that is not available (pruned)
// It may also happen if we erroneously skip data, that was not considered a conflict, if the DB is corrupted.
ErrSkipped = errors.New("skipped data")
// ErrFuture happens when data is just not yet available
ErrFuture = errors.New("future data")
// ErrConflict happens when we know for sure that there is different canonical data
ErrConflict = errors.New("conflicting data")
// ErrStop can be used in iterators to indicate iteration has to stop
ErrStop = errors.New("iter stop")
)
package heads
import (
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/ioutil"
"github.com/ethereum-optimism/optimism/op-service/jsonutil"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// HeadTracker records the current chain head pointers for a single chain.
type HeadTracker struct {
rwLock sync.RWMutex
path string
current *Heads
logger log.Logger
}
func (t *HeadTracker) CrossUnsafe(id types.ChainID) HeadPointer {
return t.current.Get(id).CrossUnsafe
}
func (t *HeadTracker) CrossSafe(id types.ChainID) HeadPointer {
return t.current.Get(id).CrossSafe
}
func (t *HeadTracker) CrossFinalized(id types.ChainID) HeadPointer {
return t.current.Get(id).CrossFinalized
}
func (t *HeadTracker) LocalUnsafe(id types.ChainID) HeadPointer {
return t.current.Get(id).Unsafe
}
func (t *HeadTracker) LocalSafe(id types.ChainID) HeadPointer {
return t.current.Get(id).LocalSafe
}
func (t *HeadTracker) LocalFinalized(id types.ChainID) HeadPointer {
return t.current.Get(id).LocalFinalized
}
func (t *HeadTracker) UpdateCrossUnsafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Cross-unsafe update", "pointer", pointer)
h := heads.Get(id)
h.CrossUnsafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateCrossSafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Cross-safe update", "pointer", pointer)
h := heads.Get(id)
h.CrossSafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateCrossFinalized(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Cross-finalized update", "pointer", pointer)
h := heads.Get(id)
h.CrossFinalized = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateLocalUnsafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Local-unsafe update", "pointer", pointer)
h := heads.Get(id)
h.Unsafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateLocalSafe(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Local-safe update", "pointer", pointer)
h := heads.Get(id)
h.LocalSafe = pointer
heads.Put(id, h)
return nil
}))
}
func (t *HeadTracker) UpdateLocalFinalized(id types.ChainID, pointer HeadPointer) error {
return t.Apply(OperationFn(func(heads *Heads) error {
t.logger.Info("Local-finalized update", "pointer", pointer)
h := heads.Get(id)
h.LocalFinalized = pointer
heads.Put(id, h)
return nil
}))
}
func NewHeadTracker(logger log.Logger, path string) (*HeadTracker, error) {
current := NewHeads()
if data, err := os.ReadFile(path); errors.Is(err, os.ErrNotExist) {
// No existing file, just use empty heads
} else if err != nil {
return nil, fmt.Errorf("failed to read existing heads from %v: %w", path, err)
} else {
if err := json.Unmarshal(data, current); err != nil {
return nil, fmt.Errorf("invalid existing heads file %v: %w", path, err)
}
}
return &HeadTracker{
path: path,
current: current,
logger: logger,
}, nil
}
func (t *HeadTracker) Apply(op Operation) error {
t.rwLock.Lock()
defer t.rwLock.Unlock()
// Store a copy of the heads prior to changing so we can roll back if needed.
modified := t.current.Copy()
if err := op.Apply(modified); err != nil {
return fmt.Errorf("operation failed: %w", err)
}
if err := t.write(modified); err != nil {
return fmt.Errorf("failed to store updated heads: %w", err)
}
t.current = modified
return nil
}
func (t *HeadTracker) Current() *Heads {
t.rwLock.RLock()
defer t.rwLock.RUnlock()
return t.current.Copy()
}
func (t *HeadTracker) write(heads *Heads) error {
if err := jsonutil.WriteJSON(heads, ioutil.ToAtomicFile(t.path, 0o644)); err != nil {
return fmt.Errorf("failed to write new heads: %w", err)
}
return nil
}
func (t *HeadTracker) Close() error {
return nil
}
package heads
/*
import (
"errors"
"os"
"path/filepath"
"testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require"
)
func TestHeads_SaveAndReload(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "heads.json")
chainA := types.ChainIDFromUInt64(3)
chainAHeads := ChainHeads{
Unsafe: 1,
CrossUnsafe: 2,
LocalSafe: 3,
CrossSafe: 4,
LocalFinalized: 5,
CrossFinalized: 6,
}
chainB := types.ChainIDFromUInt64(5)
chainBHeads := ChainHeads{
Unsafe: 11,
CrossUnsafe: 12,
LocalSafe: 13,
CrossSafe: 14,
LocalFinalized: 15,
CrossFinalized: 16,
}
orig, err := NewHeadTracker(path)
require.NoError(t, err)
err = orig.Apply(OperationFn(func(heads *Heads) error {
heads.Put(chainA, chainAHeads)
heads.Put(chainB, chainBHeads)
return nil
}))
require.NoError(t, err)
require.Equal(t, orig.Current().Get(chainA), chainAHeads)
require.Equal(t, orig.Current().Get(chainB), chainBHeads)
loaded, err := NewHeadTracker(path)
require.NoError(t, err)
require.EqualValues(t, loaded.Current(), orig.Current())
}
func TestHeads_NoChangesMadeIfOperationFails(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "heads.json")
chainA := types.ChainIDFromUInt64(3)
chainAHeads := ChainHeads{
Unsafe: 1,
CrossUnsafe: 2,
LocalSafe: 3,
CrossSafe: 4,
LocalFinalized: 5,
CrossFinalized: 6,
}
orig, err := NewHeadTracker(path)
require.NoError(t, err)
boom := errors.New("boom")
err = orig.Apply(OperationFn(func(heads *Heads) error {
heads.Put(chainA, chainAHeads)
return boom
}))
require.ErrorIs(t, err, boom)
require.Equal(t, ChainHeads{}, orig.Current().Get(chainA))
// Should be able to load from disk too
loaded, err := NewHeadTracker(path)
require.NoError(t, err)
require.EqualValues(t, loaded.Current(), orig.Current())
}
func TestHeads_NoChangesMadeIfWriteFails(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "invalid/heads.json")
chainA := types.ChainIDFromUInt64(3)
chainAHeads := ChainHeads{
Unsafe: 1,
CrossUnsafe: 2,
LocalSafe: 3,
CrossSafe: 4,
LocalFinalized: 5,
CrossFinalized: 6,
}
orig, err := NewHeadTracker(path)
require.NoError(t, err)
err = orig.Apply(OperationFn(func(heads *Heads) error {
heads.Put(chainA, chainAHeads)
return nil
}))
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, ChainHeads{}, orig.Current().Get(chainA))
}
*/
package heads
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type HeadPointer struct {
// LastSealedBlockHash is the last fully-processed block
LastSealedBlockHash common.Hash
LastSealedBlockNum uint64
LastSealedTimestamp uint64
// Number of logs that have been verified since the LastSealedBlock.
// These logs are contained in the block that builds on top of the LastSealedBlock.
LogsSince uint32
}
// WithinRange checks if the given log, in the given block,
// is within range (i.e. before or equal to the head-pointer).
// This does not guarantee that the log exists.
func (ptr *HeadPointer) WithinRange(blockNum uint64, logIdx uint32) bool {
if ptr.LastSealedBlockHash == (common.Hash{}) {
return false // no block yet
}
return blockNum <= ptr.LastSealedBlockNum ||
(blockNum+1 == ptr.LastSealedBlockNum && logIdx < ptr.LogsSince)
}
func (ptr *HeadPointer) IsSealed(blockNum uint64) bool {
if ptr.LastSealedBlockHash == (common.Hash{}) {
return false // no block yet
}
return blockNum <= ptr.LastSealedBlockNum
}
// ChainHeads provides the serialization format for the current chain heads.
type ChainHeads struct {
Unsafe HeadPointer `json:"localUnsafe"`
CrossUnsafe HeadPointer `json:"crossUnsafe"`
LocalSafe HeadPointer `json:"localSafe"`
CrossSafe HeadPointer `json:"crossSafe"`
LocalFinalized HeadPointer `json:"localFinalized"`
CrossFinalized HeadPointer `json:"crossFinalized"`
}
type Heads struct {
Chains map[types.ChainID]ChainHeads
}
func NewHeads() *Heads {
return &Heads{Chains: make(map[types.ChainID]ChainHeads)}
}
func (h *Heads) Get(id types.ChainID) ChainHeads {
chain, ok := h.Chains[id]
if !ok {
return ChainHeads{}
}
// init to genesis
if chain.LocalFinalized == (HeadPointer{}) && chain.Unsafe.LastSealedBlockNum == 0 {
chain.LocalFinalized = chain.Unsafe
}
// Make sure the data is consistent
if chain.LocalSafe == (HeadPointer{}) {
chain.LocalSafe = chain.LocalFinalized
}
if chain.Unsafe == (HeadPointer{}) {
chain.Unsafe = chain.LocalSafe
}
if chain.CrossFinalized == (HeadPointer{}) && chain.LocalFinalized.LastSealedBlockNum == 0 {
chain.CrossFinalized = chain.LocalFinalized
}
if chain.CrossSafe == (HeadPointer{}) {
chain.CrossSafe = chain.CrossFinalized
}
if chain.CrossUnsafe == (HeadPointer{}) {
chain.CrossUnsafe = chain.CrossSafe
}
return chain
}
func (h *Heads) Put(id types.ChainID, head ChainHeads) {
h.Chains[id] = head
}
func (h *Heads) Copy() *Heads {
c := &Heads{Chains: make(map[types.ChainID]ChainHeads)}
for id, heads := range h.Chains {
c.Chains[id] = heads
}
return c
}
func (h *Heads) MarshalJSON() ([]byte, error) {
data := make(map[hexutil.U256]ChainHeads)
for id, heads := range h.Chains {
data[hexutil.U256(id)] = heads
}
return json.Marshal(data)
}
func (h *Heads) UnmarshalJSON(data []byte) error {
hexData := make(map[hexutil.U256]ChainHeads)
if err := json.Unmarshal(data, &hexData); err != nil {
return err
}
h.Chains = make(map[types.ChainID]ChainHeads)
for id, heads := range hexData {
h.Put(types.ChainID(id), heads)
}
return nil
}
type Operation interface {
Apply(head *Heads) error
}
type OperationFn func(heads *Heads) error
func (f OperationFn) Apply(heads *Heads) error {
return f(heads)
}
package heads
import (
"encoding/json"
"fmt"
"math/rand" // nosemgrep
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func TestHeads(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
randHeadPtr := func() HeadPointer {
var h common.Hash
rng.Read(h[:])
return HeadPointer{
LastSealedBlockHash: h,
LastSealedBlockNum: rng.Uint64(),
LogsSince: rng.Uint32(),
}
}
t.Run("RoundTripViaJson", func(t *testing.T) {
heads := NewHeads()
heads.Put(types.ChainIDFromUInt64(3), ChainHeads{
Unsafe: randHeadPtr(),
CrossUnsafe: randHeadPtr(),
LocalSafe: randHeadPtr(),
CrossSafe: randHeadPtr(),
LocalFinalized: randHeadPtr(),
CrossFinalized: randHeadPtr(),
})
heads.Put(types.ChainIDFromUInt64(9), ChainHeads{
Unsafe: randHeadPtr(),
CrossUnsafe: randHeadPtr(),
LocalSafe: randHeadPtr(),
CrossSafe: randHeadPtr(),
LocalFinalized: randHeadPtr(),
CrossFinalized: randHeadPtr(),
})
heads.Put(types.ChainIDFromUInt64(4892497242424), ChainHeads{
Unsafe: randHeadPtr(),
CrossUnsafe: randHeadPtr(),
LocalSafe: randHeadPtr(),
CrossSafe: randHeadPtr(),
LocalFinalized: randHeadPtr(),
CrossFinalized: randHeadPtr(),
})
j, err := json.Marshal(heads)
require.NoError(t, err)
fmt.Println(string(j))
var result Heads
err = json.Unmarshal(j, &result)
require.NoError(t, err)
require.Equal(t, heads.Chains, result.Chains)
})
t.Run("Copy", func(t *testing.T) {
chainA := types.ChainIDFromUInt64(3)
chainB := types.ChainIDFromUInt64(4)
chainAOrigHeads := ChainHeads{
Unsafe: randHeadPtr(),
}
chainAModifiedHeads1 := ChainHeads{
Unsafe: randHeadPtr(),
}
chainAModifiedHeads2 := ChainHeads{
Unsafe: randHeadPtr(),
}
chainBModifiedHeads := ChainHeads{
Unsafe: randHeadPtr(),
}
heads := NewHeads()
heads.Put(chainA, chainAOrigHeads)
otherHeads := heads.Copy()
otherHeads.Put(chainA, chainAModifiedHeads1)
otherHeads.Put(chainB, chainBModifiedHeads)
require.Equal(t, heads.Get(chainA), chainAOrigHeads)
require.Equal(t, heads.Get(chainB), ChainHeads{})
heads.Put(chainA, chainAModifiedHeads2)
require.Equal(t, heads.Get(chainA), chainAModifiedHeads2)
require.Equal(t, otherHeads.Get(chainA), chainAModifiedHeads1)
require.Equal(t, otherHeads.Get(chainB), chainBModifiedHeads)
})
}
...@@ -7,12 +7,13 @@ import ( ...@@ -7,12 +7,13 @@ import (
"os" "os"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
) )
type statInvariant func(stat os.FileInfo, m *stubMetrics) error type statInvariant func(stat os.FileInfo, m *stubMetrics) error
type entryInvariant func(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error type entryInvariant func(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error
// checkDBInvariants reads the database log directly and asserts a set of invariants on the data. // checkDBInvariants reads the database log directly and asserts a set of invariants on the data.
func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) { func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
...@@ -30,7 +31,7 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) { ...@@ -30,7 +31,7 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
// Read all entries as binary blobs // Read all entries as binary blobs
file, err := os.OpenFile(dbPath, os.O_RDONLY, 0o644) file, err := os.OpenFile(dbPath, os.O_RDONLY, 0o644)
require.NoError(t, err) require.NoError(t, err)
entries := make([]entrydb.Entry, stat.Size()/entrydb.EntrySize) entries := make([]Entry, stat.Size()/entrydb.EntrySize)
for i := range entries { for i := range entries {
n, err := io.ReadFull(file, entries[i][:]) n, err := io.ReadFull(file, entries[i][:])
require.NoErrorf(t, err, "failed to read entry %v", i) require.NoErrorf(t, err, "failed to read entry %v", i)
...@@ -56,7 +57,7 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) { ...@@ -56,7 +57,7 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
} }
} }
func fmtEntries(entries []entrydb.Entry) string { func fmtEntries(entries []Entry) string {
out := "" out := ""
for i, entry := range entries { for i, entry := range entries {
out += fmt.Sprintf("%v: %x\n", i, entry) out += fmt.Sprintf("%v: %x\n", i, entry)
...@@ -80,44 +81,44 @@ func invariantFileSizeMatchesEntryCountMetric(stat os.FileInfo, m *stubMetrics) ...@@ -80,44 +81,44 @@ func invariantFileSizeMatchesEntryCountMetric(stat os.FileInfo, m *stubMetrics)
return nil return nil
} }
func invariantSearchCheckpointAtEverySearchCheckpointFrequency(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantSearchCheckpointAtEverySearchCheckpointFrequency(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error {
if entryIdx%searchCheckpointFrequency == 0 && entry.Type() != entrydb.TypeSearchCheckpoint { if entryIdx%searchCheckpointFrequency == 0 && entry.Type() != TypeSearchCheckpoint {
return fmt.Errorf("should have search checkpoints every %v entries but entry %v was %x", searchCheckpointFrequency, entryIdx, entry) return fmt.Errorf("should have search checkpoints every %v entries but entry %v was %x", searchCheckpointFrequency, entryIdx, entry)
} }
return nil return nil
} }
func invariantCanonicalHashOrCheckpointAfterEverySearchCheckpoint(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantCanonicalHashOrCheckpointAfterEverySearchCheckpoint(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error {
if entry.Type() != entrydb.TypeSearchCheckpoint { if entry.Type() != TypeSearchCheckpoint {
return nil return nil
} }
if entryIdx+1 >= len(entries) { if entryIdx+1 >= len(entries) {
return fmt.Errorf("expected canonical hash or checkpoint after search checkpoint at entry %v but no further entries found", entryIdx) return fmt.Errorf("expected canonical hash or checkpoint after search checkpoint at entry %v but no further entries found", entryIdx)
} }
nextEntry := entries[entryIdx+1] nextEntry := entries[entryIdx+1]
if nextEntry.Type() != entrydb.TypeCanonicalHash && nextEntry.Type() != entrydb.TypeSearchCheckpoint { if nextEntry.Type() != TypeCanonicalHash && nextEntry.Type() != TypeSearchCheckpoint {
return fmt.Errorf("expected canonical hash or checkpoint after search checkpoint at entry %v but got %x", entryIdx, nextEntry) return fmt.Errorf("expected canonical hash or checkpoint after search checkpoint at entry %v but got %x", entryIdx, nextEntry)
} }
return nil return nil
} }
// invariantSearchCheckpointBeforeEveryCanonicalHash ensures we don't have extra canonical-hash entries // invariantSearchCheckpointBeforeEveryCanonicalHash ensures we don't have extra canonical-hash entries
func invariantSearchCheckpointBeforeEveryCanonicalHash(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantSearchCheckpointBeforeEveryCanonicalHash(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error {
if entry.Type() != entrydb.TypeCanonicalHash { if entry.Type() != TypeCanonicalHash {
return nil return nil
} }
if entryIdx == 0 { if entryIdx == 0 {
return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but no previous entries present", entryIdx) return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but no previous entries present", entryIdx)
} }
prevEntry := entries[entryIdx-1] prevEntry := entries[entryIdx-1]
if prevEntry.Type() != entrydb.TypeSearchCheckpoint { if prevEntry.Type() != TypeSearchCheckpoint {
return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but got %x", entryIdx, prevEntry) return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but got %x", entryIdx, prevEntry)
} }
return nil return nil
} }
func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error {
if entry.Type() != entrydb.TypeInitiatingEvent { if entry.Type() != TypeInitiatingEvent {
return nil return nil
} }
hasExecMessage := entry[1]&eventFlagHasExecutingMessage != 0 hasExecMessage := entry[1]&eventFlagHasExecutingMessage != 0
...@@ -131,14 +132,14 @@ func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entr ...@@ -131,14 +132,14 @@ func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entr
if len(entries) <= linkIdx { if len(entries) <= linkIdx {
return fmt.Errorf("expected executing link after initiating event with exec msg flag set at entry %v but there were no more events", entryIdx) return fmt.Errorf("expected executing link after initiating event with exec msg flag set at entry %v but there were no more events", entryIdx)
} }
if entries[linkIdx].Type() != entrydb.TypeExecutingLink { if entries[linkIdx].Type() != TypeExecutingLink {
return fmt.Errorf("expected executing link at idx %v after initiating event with exec msg flag set at entry %v but got type %v", linkIdx, entryIdx, entries[linkIdx][0]) return fmt.Errorf("expected executing link at idx %v after initiating event with exec msg flag set at entry %v but got type %v", linkIdx, entryIdx, entries[linkIdx][0])
} }
return nil return nil
} }
func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error {
if entry.Type() != entrydb.TypeExecutingLink { if entry.Type() != TypeExecutingLink {
return nil return nil
} }
if entryIdx == 0 { if entryIdx == 0 {
...@@ -152,7 +153,7 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en ...@@ -152,7 +153,7 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en
return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx) return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx)
} }
initEntry := entries[initIdx] initEntry := entries[initIdx]
if initEntry.Type() != entrydb.TypeInitiatingEvent { if initEntry.Type() != TypeInitiatingEvent {
return fmt.Errorf("expected initiating event at entry %v prior to executing link at %v but got %x", initIdx, entryIdx, initEntry[0]) return fmt.Errorf("expected initiating event at entry %v prior to executing link at %v but got %x", initIdx, entryIdx, initEntry[0])
} }
flags := initEntry[1] flags := initEntry[1]
...@@ -162,8 +163,8 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en ...@@ -162,8 +163,8 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en
return nil return nil
} }
func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecCheckAfterExecLink(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error {
if entry.Type() != entrydb.TypeExecutingLink { if entry.Type() != TypeExecutingLink {
return nil return nil
} }
checkIdx := entryIdx + 1 checkIdx := entryIdx + 1
...@@ -174,14 +175,14 @@ func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries ...@@ -174,14 +175,14 @@ func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries
return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but ran out of entries", entryIdx, checkIdx) return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but ran out of entries", entryIdx, checkIdx)
} }
checkEntry := entries[checkIdx] checkEntry := entries[checkIdx]
if checkEntry.Type() != entrydb.TypeExecutingCheck { if checkEntry.Type() != TypeExecutingCheck {
return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but got type %v", entryIdx, checkIdx, checkEntry[0]) return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but got type %v", entryIdx, checkIdx, checkEntry[0])
} }
return nil return nil
} }
func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry Entry, entries []Entry, m *stubMetrics) error {
if entry.Type() != entrydb.TypeExecutingCheck { if entry.Type() != TypeExecutingCheck {
return nil return nil
} }
if entryIdx == 0 { if entryIdx == 0 {
...@@ -195,7 +196,7 @@ func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entr ...@@ -195,7 +196,7 @@ func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entr
return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx) return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx)
} }
linkEntry := entries[linkIdx] linkEntry := entries[linkIdx]
if linkEntry.Type() != entrydb.TypeExecutingLink { if linkEntry.Type() != TypeExecutingLink {
return fmt.Errorf("expected executing link at entry %v prior to executing check at %v but got %x", linkIdx, entryIdx, linkEntry[0]) return fmt.Errorf("expected executing link at entry %v prior to executing check at %v but got %x", linkIdx, entryIdx, linkEntry[0])
} }
return nil return nil
......
...@@ -90,7 +90,7 @@ func TestLatestSealedBlockNum(t *testing.T) { ...@@ -90,7 +90,7 @@ func TestLatestSealedBlockNum(t *testing.T) {
require.False(t, ok, "empty db expected") require.False(t, ok, "empty db expected")
require.Zero(t, n) require.Zero(t, n)
idx, err := db.searchCheckpoint(0, 0) idx, err := db.searchCheckpoint(0, 0)
require.ErrorIs(t, err, ErrFuture, "no checkpoint in empty db") require.ErrorIs(t, err, entrydb.ErrFuture, "no checkpoint in empty db")
require.Zero(t, idx) require.Zero(t, idx)
}) })
}) })
...@@ -123,7 +123,7 @@ func TestLatestSealedBlockNum(t *testing.T) { ...@@ -123,7 +123,7 @@ func TestLatestSealedBlockNum(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, idx, "anchor block as checkpoint 0") require.Zero(t, idx, "anchor block as checkpoint 0")
_, err = db.searchCheckpoint(0, 0) _, err = db.searchCheckpoint(0, 0)
require.ErrorIs(t, err, ErrSkipped, "no checkpoint before genesis") require.ErrorIs(t, err, entrydb.ErrSkipped, "no checkpoint before genesis")
}) })
}) })
t.Run("Block 1 case", func(t *testing.T) { t.Run("Block 1 case", func(t *testing.T) {
...@@ -175,7 +175,7 @@ func TestAddLog(t *testing.T) { ...@@ -175,7 +175,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
genesis := eth.BlockID{Hash: createHash(15), Number: 0} genesis := eth.BlockID{Hash: createHash(15), Number: 0}
err := db.AddLog(createHash(1), genesis, 0, nil) err := db.AddLog(createHash(1), genesis, 0, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder) require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
}) })
}) })
...@@ -265,7 +265,7 @@ func TestAddLog(t *testing.T) { ...@@ -265,7 +265,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
bl14 := eth.BlockID{Hash: createHash(14), Number: 14} bl14 := eth.BlockID{Hash: createHash(14), Number: 14}
err := db.SealBlock(createHash(13), bl14, 5000) err := db.SealBlock(createHash(13), bl14, 5000)
require.ErrorIs(t, err, ErrConflict) require.ErrorIs(t, err, entrydb.ErrConflict)
}) })
}) })
...@@ -282,7 +282,7 @@ func TestAddLog(t *testing.T) { ...@@ -282,7 +282,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
onto := eth.BlockID{Hash: createHash(14), Number: 14} onto := eth.BlockID{Hash: createHash(14), Number: 14}
err := db.AddLog(createHash(1), onto, 0, nil) err := db.AddLog(createHash(1), onto, 0, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder, "cannot build logs on 14 when 15 is already sealed") require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "cannot build logs on 14 when 15 is already sealed")
}) })
}) })
...@@ -298,7 +298,7 @@ func TestAddLog(t *testing.T) { ...@@ -298,7 +298,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15} bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 0, nil) err := db.AddLog(createHash(1), bl15, 0, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder, "already at log index 2") require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "already at log index 2")
}) })
}) })
...@@ -313,7 +313,7 @@ func TestAddLog(t *testing.T) { ...@@ -313,7 +313,7 @@ func TestAddLog(t *testing.T) {
}, },
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
err := db.AddLog(createHash(1), eth.BlockID{Hash: createHash(16), Number: 16}, 0, nil) err := db.AddLog(createHash(1), eth.BlockID{Hash: createHash(16), Number: 16}, 0, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder) require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
}) })
}) })
...@@ -329,7 +329,7 @@ func TestAddLog(t *testing.T) { ...@@ -329,7 +329,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15} bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 1, nil) err := db.AddLog(createHash(1), bl15, 1, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder, "already at log index 2") require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "already at log index 2")
}) })
}) })
...@@ -345,7 +345,7 @@ func TestAddLog(t *testing.T) { ...@@ -345,7 +345,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
bl15 := eth.BlockID{Hash: createHash(16), Number: 16} bl15 := eth.BlockID{Hash: createHash(16), Number: 16}
err := db.AddLog(createHash(1), bl15, 2, nil) err := db.AddLog(createHash(1), bl15, 2, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder) require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
}) })
}) })
...@@ -360,7 +360,7 @@ func TestAddLog(t *testing.T) { ...@@ -360,7 +360,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15} bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 2, nil) err := db.AddLog(createHash(1), bl15, 2, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder) require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
}) })
}) })
...@@ -373,7 +373,7 @@ func TestAddLog(t *testing.T) { ...@@ -373,7 +373,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15} bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 5, nil) err := db.AddLog(createHash(1), bl15, 5, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder) require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
}) })
}) })
...@@ -394,7 +394,7 @@ func TestAddLog(t *testing.T) { ...@@ -394,7 +394,7 @@ func TestAddLog(t *testing.T) {
err = db.SealBlock(bl15.Hash, bl16, 5001) err = db.SealBlock(bl15.Hash, bl16, 5001)
require.NoError(t, err) require.NoError(t, err)
err = db.AddLog(createHash(1), bl16, 1, nil) err = db.AddLog(createHash(1), bl16, 1, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder) require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
}) })
}) })
...@@ -698,9 +698,8 @@ func TestGetBlockInfo(t *testing.T) { ...@@ -698,9 +698,8 @@ func TestGetBlockInfo(t *testing.T) {
runDBTest(t, runDBTest(t,
func(t *testing.T, db *DB, m *stubMetrics) {}, func(t *testing.T, db *DB, m *stubMetrics) {},
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
bl10 := eth.BlockID{Hash: createHash(10), Number: 10} _, err := db.FindSealedBlock(10)
_, err := db.FindSealedBlock(bl10) require.ErrorIs(t, err, entrydb.ErrFuture)
require.ErrorIs(t, err, ErrFuture)
}) })
}) })
...@@ -714,9 +713,8 @@ func TestGetBlockInfo(t *testing.T) { ...@@ -714,9 +713,8 @@ func TestGetBlockInfo(t *testing.T) {
}, },
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
// if the DB starts at 11, then shouldn't find 10 // if the DB starts at 11, then shouldn't find 10
bl10 := eth.BlockID{Hash: createHash(10), Number: 10} _, err := db.FindSealedBlock(10)
_, err := db.FindSealedBlock(bl10) require.ErrorIs(t, err, entrydb.ErrSkipped)
require.ErrorIs(t, err, ErrSkipped)
}) })
}) })
...@@ -727,10 +725,10 @@ func TestGetBlockInfo(t *testing.T) { ...@@ -727,10 +725,10 @@ func TestGetBlockInfo(t *testing.T) {
require.NoError(t, db.SealBlock(common.Hash{}, block, 500)) require.NoError(t, db.SealBlock(common.Hash{}, block, 500))
}, },
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
index, err := db.FindSealedBlock(block) seal, err := db.FindSealedBlock(block.Number)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, entrydb.EntryIdx(2), index, require.Equal(t, block, seal.ID())
"expecting to continue after search checkpoint that declared the block") require.Equal(t, uint64(500), seal.Timestamp)
}) })
}) })
} }
...@@ -755,7 +753,7 @@ func requireConflicts(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logH ...@@ -755,7 +753,7 @@ func requireConflicts(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logH
m, ok := db.m.(*stubMetrics) m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type") require.True(t, ok, "Did not get the expected metrics type")
_, err := db.Contains(blockNum, logIdx, logHash) _, err := db.Contains(blockNum, logIdx, logHash)
require.ErrorIs(t, err, ErrConflict, "canonical chain must not include this log") require.ErrorIs(t, err, entrydb.ErrConflict, "canonical chain must not include this log")
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency*2), "Should not need to read more than between two checkpoints") require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency*2), "Should not need to read more than between two checkpoints")
} }
...@@ -763,7 +761,7 @@ func requireFuture(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash ...@@ -763,7 +761,7 @@ func requireFuture(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash
m, ok := db.m.(*stubMetrics) m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type") require.True(t, ok, "Did not get the expected metrics type")
_, err := db.Contains(blockNum, logIdx, logHash) _, err := db.Contains(blockNum, logIdx, logHash)
require.ErrorIs(t, err, ErrFuture, "canonical chain does not yet include this log") require.ErrorIs(t, err, entrydb.ErrFuture, "canonical chain does not yet include this log")
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency*2), "Should not need to read more than between two checkpoints") require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency*2), "Should not need to read more than between two checkpoints")
} }
...@@ -791,7 +789,7 @@ func TestRecoverOnCreate(t *testing.T) { ...@@ -791,7 +789,7 @@ func TestRecoverOnCreate(t *testing.T) {
return db, m, err return db, m, err
} }
storeWithEvents := func(evts ...entrydb.Entry) *stubEntryStore { storeWithEvents := func(evts ...Entry) *stubEntryStore {
store := &stubEntryStore{} store := &stubEntryStore{}
store.entries = append(store.entries, evts...) store.entries = append(store.entries, evts...)
return store return store
...@@ -924,9 +922,9 @@ func TestRewind(t *testing.T) { ...@@ -924,9 +922,9 @@ func TestRewind(t *testing.T) {
t.Run("WhenEmpty", func(t *testing.T) { t.Run("WhenEmpty", func(t *testing.T) {
runDBTest(t, func(t *testing.T, db *DB, m *stubMetrics) {}, runDBTest(t, func(t *testing.T, db *DB, m *stubMetrics) {},
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.Rewind(100), ErrFuture) require.ErrorIs(t, db.Rewind(100), entrydb.ErrFuture)
// Genesis is a block to, not present in an empty DB // Genesis is a block to, not present in an empty DB
require.ErrorIs(t, db.Rewind(0), ErrFuture) require.ErrorIs(t, db.Rewind(0), entrydb.ErrFuture)
}) })
}) })
...@@ -944,7 +942,7 @@ func TestRewind(t *testing.T) { ...@@ -944,7 +942,7 @@ func TestRewind(t *testing.T) {
require.NoError(t, db.SealBlock(bl51.Hash, bl52, 504)) require.NoError(t, db.SealBlock(bl51.Hash, bl52, 504))
require.NoError(t, db.AddLog(createHash(4), bl52, 0, nil)) require.NoError(t, db.AddLog(createHash(4), bl52, 0, nil))
// cannot rewind to a block that is not sealed yet // cannot rewind to a block that is not sealed yet
require.ErrorIs(t, db.Rewind(53), ErrFuture) require.ErrorIs(t, db.Rewind(53), entrydb.ErrFuture)
}, },
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
requireContains(t, db, 51, 0, createHash(1)) requireContains(t, db, 51, 0, createHash(1))
...@@ -963,7 +961,7 @@ func TestRewind(t *testing.T) { ...@@ -963,7 +961,7 @@ func TestRewind(t *testing.T) {
require.NoError(t, db.AddLog(createHash(1), bl50, 0, nil)) require.NoError(t, db.AddLog(createHash(1), bl50, 0, nil))
require.NoError(t, db.AddLog(createHash(2), bl50, 1, nil)) require.NoError(t, db.AddLog(createHash(2), bl50, 1, nil))
// cannot go back to an unknown block // cannot go back to an unknown block
require.ErrorIs(t, db.Rewind(25), ErrSkipped) require.ErrorIs(t, db.Rewind(25), entrydb.ErrSkipped)
}, },
func(t *testing.T, db *DB, m *stubMetrics) { func(t *testing.T, db *DB, m *stubMetrics) {
requireContains(t, db, 51, 0, createHash(1)) requireContains(t, db, 51, 0, createHash(1))
...@@ -1088,12 +1086,12 @@ func TestRewind(t *testing.T) { ...@@ -1088,12 +1086,12 @@ func TestRewind(t *testing.T) {
bl29 := eth.BlockID{Hash: createHash(29), Number: 29} bl29 := eth.BlockID{Hash: createHash(29), Number: 29}
// 29 was deleted // 29 was deleted
err := db.AddLog(createHash(2), bl29, 1, nil) err := db.AddLog(createHash(2), bl29, 1, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder, "Cannot add log on removed block") require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "Cannot add log on removed block")
// 15 is older, we have up to 16 // 15 is older, we have up to 16
bl15 := eth.BlockID{Hash: createHash(15), Number: 15} bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
// try to add a third log to 15 // try to add a third log to 15
err = db.AddLog(createHash(10), bl15, 2, nil) err = db.AddLog(createHash(10), bl15, 2, nil)
require.ErrorIs(t, err, ErrLogOutOfOrder) require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
bl16 := eth.BlockID{Hash: createHash(16), Number: 16} bl16 := eth.BlockID{Hash: createHash(16), Number: 16}
// try to add a log to 17, on top of 16 // try to add a log to 17, on top of 16
err = db.AddLog(createHash(42), bl16, 0, nil) err = db.AddLog(createHash(42), bl16, 0, nil)
...@@ -1119,7 +1117,7 @@ func (s *stubMetrics) RecordDBSearchEntriesRead(count int64) { ...@@ -1119,7 +1117,7 @@ func (s *stubMetrics) RecordDBSearchEntriesRead(count int64) {
var _ Metrics = (*stubMetrics)(nil) var _ Metrics = (*stubMetrics)(nil)
type stubEntryStore struct { type stubEntryStore struct {
entries []entrydb.Entry entries []Entry
} }
func (s *stubEntryStore) Size() int64 { func (s *stubEntryStore) Size() int64 {
...@@ -1130,14 +1128,14 @@ func (s *stubEntryStore) LastEntryIdx() entrydb.EntryIdx { ...@@ -1130,14 +1128,14 @@ func (s *stubEntryStore) LastEntryIdx() entrydb.EntryIdx {
return entrydb.EntryIdx(s.Size() - 1) return entrydb.EntryIdx(s.Size() - 1)
} }
func (s *stubEntryStore) Read(idx entrydb.EntryIdx) (entrydb.Entry, error) { func (s *stubEntryStore) Read(idx entrydb.EntryIdx) (Entry, error) {
if idx < entrydb.EntryIdx(len(s.entries)) { if idx < entrydb.EntryIdx(len(s.entries)) {
return s.entries[idx], nil return s.entries[idx], nil
} }
return entrydb.Entry{}, io.EOF return Entry{}, io.EOF
} }
func (s *stubEntryStore) Append(entries ...entrydb.Entry) error { func (s *stubEntryStore) Append(entries ...Entry) error {
s.entries = append(s.entries, entries...) s.entries = append(s.entries, entries...)
return nil return nil
} }
......
...@@ -27,9 +27,9 @@ func newSearchCheckpoint(blockNum uint64, logsSince uint32, timestamp uint64) se ...@@ -27,9 +27,9 @@ func newSearchCheckpoint(blockNum uint64, logsSince uint32, timestamp uint64) se
} }
} }
func newSearchCheckpointFromEntry(data entrydb.Entry) (searchCheckpoint, error) { func newSearchCheckpointFromEntry(data Entry) (searchCheckpoint, error) {
if data.Type() != entrydb.TypeSearchCheckpoint { if data.Type() != TypeSearchCheckpoint {
return searchCheckpoint{}, fmt.Errorf("%w: attempting to decode search checkpoint but was type %s", ErrDataCorruption, data.Type()) return searchCheckpoint{}, fmt.Errorf("%w: attempting to decode search checkpoint but was type %s", entrydb.ErrDataCorruption, data.Type())
} }
return searchCheckpoint{ return searchCheckpoint{
blockNum: binary.LittleEndian.Uint64(data[1:9]), blockNum: binary.LittleEndian.Uint64(data[1:9]),
...@@ -40,9 +40,9 @@ func newSearchCheckpointFromEntry(data entrydb.Entry) (searchCheckpoint, error) ...@@ -40,9 +40,9 @@ func newSearchCheckpointFromEntry(data entrydb.Entry) (searchCheckpoint, error)
// encode creates a checkpoint entry // encode creates a checkpoint entry
// type 0: "search checkpoint" <type><uint64 block number: 8 bytes><uint32 logsSince count: 4 bytes><uint64 timestamp: 8 bytes> = 21 bytes // type 0: "search checkpoint" <type><uint64 block number: 8 bytes><uint32 logsSince count: 4 bytes><uint64 timestamp: 8 bytes> = 21 bytes
func (s searchCheckpoint) encode() entrydb.Entry { func (s searchCheckpoint) encode() Entry {
var data entrydb.Entry var data Entry
data[0] = uint8(entrydb.TypeSearchCheckpoint) data[0] = uint8(TypeSearchCheckpoint)
binary.LittleEndian.PutUint64(data[1:9], s.blockNum) binary.LittleEndian.PutUint64(data[1:9], s.blockNum)
binary.LittleEndian.PutUint32(data[9:13], s.logsSince) binary.LittleEndian.PutUint32(data[9:13], s.logsSince)
binary.LittleEndian.PutUint64(data[13:21], s.timestamp) binary.LittleEndian.PutUint64(data[13:21], s.timestamp)
...@@ -57,16 +57,16 @@ func newCanonicalHash(hash common.Hash) canonicalHash { ...@@ -57,16 +57,16 @@ func newCanonicalHash(hash common.Hash) canonicalHash {
return canonicalHash{hash: hash} return canonicalHash{hash: hash}
} }
func newCanonicalHashFromEntry(data entrydb.Entry) (canonicalHash, error) { func newCanonicalHashFromEntry(data Entry) (canonicalHash, error) {
if data.Type() != entrydb.TypeCanonicalHash { if data.Type() != TypeCanonicalHash {
return canonicalHash{}, fmt.Errorf("%w: attempting to decode canonical hash but was type %s", ErrDataCorruption, data.Type()) return canonicalHash{}, fmt.Errorf("%w: attempting to decode canonical hash but was type %s", entrydb.ErrDataCorruption, data.Type())
} }
return newCanonicalHash(common.Hash(data[1:33])), nil return newCanonicalHash(common.Hash(data[1:33])), nil
} }
func (c canonicalHash) encode() entrydb.Entry { func (c canonicalHash) encode() Entry {
var entry entrydb.Entry var entry Entry
entry[0] = uint8(entrydb.TypeCanonicalHash) entry[0] = uint8(TypeCanonicalHash)
copy(entry[1:33], c.hash[:]) copy(entry[1:33], c.hash[:])
return entry return entry
} }
...@@ -76,9 +76,9 @@ type initiatingEvent struct { ...@@ -76,9 +76,9 @@ type initiatingEvent struct {
logHash common.Hash logHash common.Hash
} }
func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) { func newInitiatingEventFromEntry(data Entry) (initiatingEvent, error) {
if data.Type() != entrydb.TypeInitiatingEvent { if data.Type() != TypeInitiatingEvent {
return initiatingEvent{}, fmt.Errorf("%w: attempting to decode initiating event but was type %s", ErrDataCorruption, data.Type()) return initiatingEvent{}, fmt.Errorf("%w: attempting to decode initiating event but was type %s", entrydb.ErrDataCorruption, data.Type())
} }
flags := data[1] flags := data[1]
return initiatingEvent{ return initiatingEvent{
...@@ -96,9 +96,9 @@ func newInitiatingEvent(logHash common.Hash, hasExecMsg bool) initiatingEvent { ...@@ -96,9 +96,9 @@ func newInitiatingEvent(logHash common.Hash, hasExecMsg bool) initiatingEvent {
// encode creates an initiating event entry // encode creates an initiating event entry
// type 2: "initiating event" <type><flags><event-hash: 20 bytes> = 22 bytes // type 2: "initiating event" <type><flags><event-hash: 20 bytes> = 22 bytes
func (i initiatingEvent) encode() entrydb.Entry { func (i initiatingEvent) encode() Entry {
var data entrydb.Entry var data Entry
data[0] = uint8(entrydb.TypeInitiatingEvent) data[0] = uint8(TypeInitiatingEvent)
flags := byte(0) flags := byte(0)
if i.hasExecMsg { if i.hasExecMsg {
flags = flags | eventFlagHasExecutingMessage flags = flags | eventFlagHasExecutingMessage
...@@ -127,9 +127,9 @@ func newExecutingLink(msg types.ExecutingMessage) (executingLink, error) { ...@@ -127,9 +127,9 @@ func newExecutingLink(msg types.ExecutingMessage) (executingLink, error) {
}, nil }, nil
} }
func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) { func newExecutingLinkFromEntry(data Entry) (executingLink, error) {
if data.Type() != entrydb.TypeExecutingLink { if data.Type() != TypeExecutingLink {
return executingLink{}, fmt.Errorf("%w: attempting to decode executing link but was type %s", ErrDataCorruption, data.Type()) return executingLink{}, fmt.Errorf("%w: attempting to decode executing link but was type %s", entrydb.ErrDataCorruption, data.Type())
} }
timestamp := binary.LittleEndian.Uint64(data[16:24]) timestamp := binary.LittleEndian.Uint64(data[16:24])
return executingLink{ return executingLink{
...@@ -142,9 +142,9 @@ func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) { ...@@ -142,9 +142,9 @@ func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) {
// encode creates an executing link entry // encode creates an executing link entry
// type 3: "executing link" <type><chain: 4 bytes><blocknum: 8 bytes><event index: 3 bytes><uint64 timestamp: 8 bytes> = 24 bytes // type 3: "executing link" <type><chain: 4 bytes><blocknum: 8 bytes><event index: 3 bytes><uint64 timestamp: 8 bytes> = 24 bytes
func (e executingLink) encode() entrydb.Entry { func (e executingLink) encode() Entry {
var entry entrydb.Entry var entry Entry
entry[0] = uint8(entrydb.TypeExecutingLink) entry[0] = uint8(TypeExecutingLink)
binary.LittleEndian.PutUint32(entry[1:5], e.chain) binary.LittleEndian.PutUint32(entry[1:5], e.chain)
binary.LittleEndian.PutUint64(entry[5:13], e.blockNum) binary.LittleEndian.PutUint64(entry[5:13], e.blockNum)
...@@ -164,18 +164,18 @@ func newExecutingCheck(hash common.Hash) executingCheck { ...@@ -164,18 +164,18 @@ func newExecutingCheck(hash common.Hash) executingCheck {
return executingCheck{hash: hash} return executingCheck{hash: hash}
} }
func newExecutingCheckFromEntry(data entrydb.Entry) (executingCheck, error) { func newExecutingCheckFromEntry(data Entry) (executingCheck, error) {
if data.Type() != entrydb.TypeExecutingCheck { if data.Type() != TypeExecutingCheck {
return executingCheck{}, fmt.Errorf("%w: attempting to decode executing check but was type %s", ErrDataCorruption, data.Type()) return executingCheck{}, fmt.Errorf("%w: attempting to decode executing check but was type %s", entrydb.ErrDataCorruption, data.Type())
} }
return newExecutingCheck(common.Hash(data[1:33])), nil return newExecutingCheck(common.Hash(data[1:33])), nil
} }
// encode creates an executing check entry // encode creates an executing check entry
// type 4: "executing check" <type><event-hash: 32 bytes> = 33 bytes // type 4: "executing check" <type><event-hash: 32 bytes> = 33 bytes
func (e executingCheck) encode() entrydb.Entry { func (e executingCheck) encode() Entry {
var entry entrydb.Entry var entry Entry
entry[0] = uint8(entrydb.TypeExecutingCheck) entry[0] = uint8(TypeExecutingCheck)
copy(entry[1:33], e.hash[:]) copy(entry[1:33], e.hash[:])
return entry return entry
} }
...@@ -184,8 +184,8 @@ type paddingEntry struct{} ...@@ -184,8 +184,8 @@ type paddingEntry struct{}
// encoding of the padding entry // encoding of the padding entry
// type 5: "padding" <type><padding: 33 bytes> = 34 bytes // type 5: "padding" <type><padding: 33 bytes> = 34 bytes
func (e paddingEntry) encode() entrydb.Entry { func (e paddingEntry) encode() Entry {
var entry entrydb.Entry var entry Entry
entry[0] = uint8(entrydb.TypePadding) entry[0] = uint8(TypePadding)
return entry return entry
} }
package logs
import (
"fmt"
"strings"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
)
type EntryObj interface {
encode() Entry
}
type Entry = entrydb.Entry[EntryType]
type EntryTypeFlag uint8
const (
FlagSearchCheckpoint EntryTypeFlag = 1 << TypeSearchCheckpoint
FlagCanonicalHash EntryTypeFlag = 1 << TypeCanonicalHash
FlagInitiatingEvent EntryTypeFlag = 1 << TypeInitiatingEvent
FlagExecutingLink EntryTypeFlag = 1 << TypeExecutingLink
FlagExecutingCheck EntryTypeFlag = 1 << TypeExecutingCheck
FlagPadding EntryTypeFlag = 1 << TypePadding
// for additional padding
FlagPadding2 EntryTypeFlag = FlagPadding << 1
)
func (x EntryTypeFlag) String() string {
var out []string
for i := EntryTypeFlag(1); i != 0; i <<= 1 { // iterate to bitmask
if x.Any(i) {
out = append(out, i.String())
}
}
return strings.Join(out, "|")
}
func (x EntryTypeFlag) Any(v EntryTypeFlag) bool {
return x&v != 0
}
func (x *EntryTypeFlag) Add(v EntryTypeFlag) {
*x = *x | v
}
func (x *EntryTypeFlag) Remove(v EntryTypeFlag) {
*x = *x &^ v
}
type EntryType uint8
const (
TypeSearchCheckpoint EntryType = iota
TypeCanonicalHash
TypeInitiatingEvent
TypeExecutingLink
TypeExecutingCheck
TypePadding
)
func (x EntryType) String() string {
switch x {
case TypeSearchCheckpoint:
return "searchCheckpoint"
case TypeCanonicalHash:
return "canonicalHash"
case TypeInitiatingEvent:
return "initiatingEvent"
case TypeExecutingLink:
return "executingLink"
case TypeExecutingCheck:
return "executingCheck"
case TypePadding:
return "padding"
default:
return fmt.Sprintf("unknown-%d", uint8(x))
}
}
...@@ -8,14 +8,13 @@ import ( ...@@ -8,14 +8,13 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
type IteratorState interface { type IteratorState interface {
NextIndex() entrydb.EntryIdx NextIndex() entrydb.EntryIdx
HeadPointer() (heads.HeadPointer, error)
SealedBlock() (hash common.Hash, num uint64, ok bool) SealedBlock() (hash common.Hash, num uint64, ok bool)
SealedTimestamp() (timestamp uint64, ok bool)
InitMessage() (hash common.Hash, logIndex uint32, ok bool) InitMessage() (hash common.Hash, logIndex uint32, ok bool)
ExecMessage() *types.ExecutingMessage ExecMessage() *types.ExecutingMessage
} }
...@@ -42,7 +41,7 @@ type traverseConditionalFn func(state IteratorState) error ...@@ -42,7 +41,7 @@ type traverseConditionalFn func(state IteratorState) error
func (i *iterator) End() error { func (i *iterator) End() error {
for { for {
_, err := i.next() _, err := i.next()
if errors.Is(err, ErrFuture) { if errors.Is(err, entrydb.ErrFuture) {
return nil return nil
} else if err != nil { } else if err != nil {
return err return err
...@@ -59,7 +58,7 @@ func (i *iterator) NextInitMsg() error { ...@@ -59,7 +58,7 @@ func (i *iterator) NextInitMsg() error {
if err != nil { if err != nil {
return err return err
} }
if typ == entrydb.TypeInitiatingEvent { if typ == TypeInitiatingEvent {
seenLog = true seenLog = true
} }
if !i.current.hasCompleteBlock() { if !i.current.hasCompleteBlock() {
...@@ -98,7 +97,7 @@ func (i *iterator) NextBlock() error { ...@@ -98,7 +97,7 @@ func (i *iterator) NextBlock() error {
if err != nil { if err != nil {
return err return err
} }
if typ == entrydb.TypeSearchCheckpoint { if typ == TypeSearchCheckpoint {
seenBlock = true seenBlock = true
} }
if !i.current.hasCompleteBlock() { if !i.current.hasCompleteBlock() {
...@@ -130,12 +129,12 @@ func (i *iterator) TraverseConditional(fn traverseConditionalFn) error { ...@@ -130,12 +129,12 @@ func (i *iterator) TraverseConditional(fn traverseConditionalFn) error {
} }
// Read and apply the next entry. // Read and apply the next entry.
func (i *iterator) next() (entrydb.EntryType, error) { func (i *iterator) next() (EntryType, error) {
index := i.current.nextEntryIndex index := i.current.nextEntryIndex
entry, err := i.db.store.Read(index) entry, err := i.db.store.Read(index)
if err != nil { if err != nil {
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
return 0, ErrFuture return 0, entrydb.ErrFuture
} }
return 0, fmt.Errorf("failed to read entry %d: %w", index, err) return 0, fmt.Errorf("failed to read entry %d: %w", index, err)
} }
...@@ -157,6 +156,11 @@ func (i *iterator) SealedBlock() (hash common.Hash, num uint64, ok bool) { ...@@ -157,6 +156,11 @@ func (i *iterator) SealedBlock() (hash common.Hash, num uint64, ok bool) {
return i.current.SealedBlock() return i.current.SealedBlock()
} }
// SealedTimestamp returns the timestamp of SealedBlock
func (i *iterator) SealedTimestamp() (timestamp uint64, ok bool) {
return i.current.SealedTimestamp()
}
// InitMessage returns the current initiating message, if any is available. // InitMessage returns the current initiating message, if any is available.
func (i *iterator) InitMessage() (hash common.Hash, logIndex uint32, ok bool) { func (i *iterator) InitMessage() (hash common.Hash, logIndex uint32, ok bool) {
return i.current.InitMessage() return i.current.InitMessage()
...@@ -166,7 +170,3 @@ func (i *iterator) InitMessage() (hash common.Hash, logIndex uint32, ok bool) { ...@@ -166,7 +170,3 @@ func (i *iterator) InitMessage() (hash common.Hash, logIndex uint32, ok bool) {
func (i *iterator) ExecMessage() *types.ExecutingMessage { func (i *iterator) ExecMessage() *types.ExecutingMessage {
return i.current.ExecMessage() return i.current.ExecMessage()
} }
func (i *iterator) HeadPointer() (heads.HeadPointer, error) {
return i.current.HeadPointer()
}
package db
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func (db *ChainsDB) FindSealedBlock(chain types.ChainID, number uint64) (seal types.BlockSeal, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
logDB, ok := db.logDBs[chain]
if !ok {
return types.BlockSeal{}, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.FindSealedBlock(number)
}
// LatestBlockNum returns the latest fully-sealed block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
// The block number might not be available (empty database, or non-existent chain).
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) {
db.mu.RLock()
defer db.mu.RUnlock()
logDB, knownChain := db.logDBs[chain]
if !knownChain {
return 0, false
}
return logDB.LatestSealedBlockNum()
}
func (db *ChainsDB) LocalUnsafe(chainID types.ChainID) (types.BlockSeal, error) {
db.mu.RLock()
defer db.mu.RUnlock()
eventsDB, ok := db.logDBs[chainID]
if !ok {
return types.BlockSeal{}, ErrUnknownChain
}
n, ok := eventsDB.LatestSealedBlockNum()
if !ok {
return types.BlockSeal{}, entrydb.ErrFuture
}
return eventsDB.FindSealedBlock(n)
}
func (db *ChainsDB) CrossUnsafe(chainID types.ChainID) (types.BlockSeal, error) {
db.mu.RLock()
defer db.mu.RUnlock()
result, ok := db.crossUnsafe[chainID]
if !ok {
return types.BlockSeal{}, ErrUnknownChain
}
return result, nil
}
func (db *ChainsDB) LocalSafe(chainID types.ChainID) (derivedFrom eth.BlockRef, derived eth.BlockRef, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
localDB, ok := db.localDBs[chainID]
if !ok {
return eth.BlockRef{}, eth.BlockRef{}, ErrUnknownChain
}
return localDB.Last()
}
func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom eth.BlockRef, derived eth.BlockRef, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
crossDB, ok := db.crossDBs[chainID]
if !ok {
return eth.BlockRef{}, eth.BlockRef{}, ErrUnknownChain
}
return crossDB.Last()
}
func (db *ChainsDB) Finalized(chainID types.ChainID) (eth.BlockID, error) {
db.mu.RLock()
defer db.mu.RUnlock()
finalizedL1 := db.finalizedL1
if finalizedL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, errors.New("no finalized L1 signal, cannot determine L2 finality yet")
}
derived, err := db.LastDerivedFrom(chainID, finalizedL1.ID())
if err != nil {
return eth.BlockID{}, errors.New("could not find what was last derived from the finalized L1 block")
}
return derived, nil
}
func (db *ChainsDB) LastDerivedFrom(chainID types.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error) {
crossDB, ok := db.crossDBs[chainID]
if !ok {
return eth.BlockID{}, ErrUnknownChain
}
return crossDB.LastDerived(derivedFrom)
}
func (db *ChainsDB) DerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
localDB, ok := db.localDBs[chainID]
if !ok {
return eth.BlockID{}, ErrUnknownChain
}
return localDB.DerivedFrom(derived)
}
// Check calls the underlying logDB to determine if the given log entry exists at the given location.
// If the block-seal of the block that includes the log is known, it is returned. It is fully zeroed otherwise, if the block is in-progress.
func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (includedIn types.BlockSeal, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
logDB, ok := db.logDBs[chain]
if !ok {
return types.BlockSeal{}, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.Contains(blockNum, logIdx, logHash)
}
// Safest returns the strongest safety level that can be guaranteed for the given log entry.
// it assumes the log entry has already been checked and is valid, this function only checks safety levels.
// Cross-safety levels are all considered to be more safe than any form of local-safety.
func (db *ChainsDB) Safest(chainID types.ChainID, blockNum uint64, index uint32) (safest types.SafetyLevel, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
if finalized, err := db.Finalized(chainID); err == nil {
if finalized.Number >= blockNum {
return types.Finalized, nil
}
}
_, crossSafe, err := db.CrossSafe(chainID)
if err != nil {
return types.Invalid, err
}
if crossSafe.Number >= blockNum {
return types.CrossSafe, nil
}
crossUnsafe, err := db.CrossUnsafe(chainID)
if err != nil {
return types.Invalid, err
}
// TODO(#12425): API: "index" for in-progress block building shouldn't be exposed from DB.
// For now we're not counting anything cross-safe until the block is sealed.
if blockNum <= crossUnsafe.Number {
return types.CrossUnsafe, nil
}
_, localSafe, err := db.LocalSafe(chainID)
if err != nil {
return types.Invalid, err
}
if blockNum <= localSafe.Number {
return types.LocalSafe, nil
}
return types.LocalUnsafe, nil
}
func (db *ChainsDB) IteratorStartingAt(chain types.ChainID, sealedNum uint64, logIndex uint32) (logs.Iterator, error) {
logDB, ok := db.logDBs[chain]
if !ok {
return nil, fmt.Errorf("%w: %v", ErrUnknownChain, chain)
}
return logDB.IteratorStartingAt(sealedNum, logIndex)
}
package db
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func (db *ChainsDB) AddLog(
chain types.ChainID,
logHash common.Hash,
parentBlock eth.BlockID,
logIdx uint32,
execMsg *types.ExecutingMessage) error {
db.mu.RLock()
defer db.mu.RUnlock()
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("cannot AddLog: %w: %v", ErrUnknownChain, chain)
}
return logDB.AddLog(logHash, parentBlock, logIdx, execMsg)
}
func (db *ChainsDB) SealBlock(chain types.ChainID, block eth.BlockRef) error {
db.mu.RLock()
defer db.mu.RUnlock()
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("cannot SealBlock: %w: %v", ErrUnknownChain, chain)
}
err := logDB.SealBlock(block.ParentHash, block.ID(), block.Time)
if err != nil {
return fmt.Errorf("failed to seal block %v: %w", block, err)
}
return nil
}
func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error {
db.mu.RLock()
defer db.mu.RUnlock()
logDB, ok := db.logDBs[chain]
if !ok {
return fmt.Errorf("cannot Rewind: %w: %s", ErrUnknownChain, chain)
}
return logDB.Rewind(headBlockNum)
}
func (db *ChainsDB) UpdateLocalSafe(chain types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
db.mu.RLock()
defer db.mu.RUnlock()
localDB, ok := db.localDBs[chain]
if !ok {
return fmt.Errorf("cannot UpdateLocalSafe: %w: %v", ErrUnknownChain, chain)
}
return localDB.AddDerived(derivedFrom, lastDerived)
}
func (db *ChainsDB) UpdateCrossUnsafe(chain types.ChainID, crossUnsafe types.BlockSeal) error {
db.mu.RLock()
defer db.mu.RUnlock()
if _, ok := db.crossUnsafe[chain]; !ok {
return fmt.Errorf("cannot UpdateCrossUnsafe: %w: %s", ErrUnknownChain, chain)
}
db.crossUnsafe[chain] = crossUnsafe
return nil
}
func (db *ChainsDB) UpdateCrossSafe(chain types.ChainID, l1View eth.BlockRef, lastCrossDerived eth.BlockRef) error {
db.mu.RLock()
defer db.mu.RUnlock()
crossDB, ok := db.crossDBs[chain]
if !ok {
return fmt.Errorf("cannot UpdateCrossSafe: %w: %s", ErrUnknownChain, chain)
}
return crossDB.AddDerived(l1View, lastCrossDerived)
}
func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
db.mu.RLock()
defer db.mu.RUnlock()
if db.finalizedL1.Number > finalized.Number {
return fmt.Errorf("cannot rewind finalized L1 head from %s to %s", db.finalizedL1, finalized)
}
db.finalizedL1 = finalized
return nil
}
...@@ -6,12 +6,10 @@ import ( ...@@ -6,12 +6,10 @@ import (
"io" "io"
"sync/atomic" "sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
) )
type MockBackend struct { type MockBackend struct {
...@@ -52,12 +50,32 @@ func (m *MockBackend) CheckMessages(messages []types.Message, minSafety types.Sa ...@@ -52,12 +50,32 @@ func (m *MockBackend) CheckMessages(messages []types.Message, minSafety types.Sa
return nil return nil
} }
func (m *MockBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) { func (m *MockBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
return types.CrossUnsafe, nil return types.ReferenceView{}, nil
} }
func (m *MockBackend) DerivedFrom(ctx context.Context, t types.ChainID, parentHash common.Hash, n uint64) (eth.BlockRef, error) { func (m *MockBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
return eth.BlockRef{}, nil return types.ReferenceView{}, nil
}
func (m *MockBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
return eth.BlockID{}, nil
}
func (m *MockBackend) DerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) {
return eth.BlockID{}, nil
}
func (m *MockBackend) UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) error {
return nil
}
func (m *MockBackend) UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
return nil
}
func (m *MockBackend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) error {
return nil
} }
func (m *MockBackend) Close() error { func (m *MockBackend) Close() error {
......
package source package processors
import ( import (
"context" "context"
...@@ -91,10 +91,35 @@ func (s *ChainProcessor) nextNum() uint64 { ...@@ -91,10 +91,35 @@ func (s *ChainProcessor) nextNum() uint64 {
return headNum + 1 return headNum + 1
} }
// worker is the main loop of the chain processor's worker
// it manages work by request or on a timer, and watches for shutdown
func (s *ChainProcessor) worker() { func (s *ChainProcessor) worker() {
defer s.wg.Done() defer s.wg.Done()
delay := time.NewTicker(time.Second * 5) delay := time.NewTicker(time.Second * 5)
for {
// await next time we process, or detect shutdown
select {
case <-s.ctx.Done():
delay.Stop()
return
case <-s.newHead:
s.log.Debug("Responding to new head signal")
s.work()
// if this chain processor is synchronous, signal completion
// to be picked up by the caller (ChainProcessor.OnNewHead)
if s.synchronous {
s.out <- struct{}{}
}
case <-delay.C:
s.log.Debug("Checking for updates")
s.work()
}
}
}
// work processes the next block in the chain repeatedly until it reaches the head
func (s *ChainProcessor) work() {
for { for {
if s.ctx.Err() != nil { // check if we are closing down if s.ctx.Err() != nil { // check if we are closing down
return return
...@@ -104,27 +129,12 @@ func (s *ChainProcessor) worker() { ...@@ -104,27 +129,12 @@ func (s *ChainProcessor) worker() {
s.log.Error("Failed to process new block", "err", err) s.log.Error("Failed to process new block", "err", err)
// idle until next update trigger // idle until next update trigger
} else if x := s.lastHead.Load(); target+1 <= x { } else if x := s.lastHead.Load(); target+1 <= x {
s.log.Debug("Continuing with next block", s.log.Debug("Continuing with next block", "newTarget", target+1, "lastHead", x)
"newTarget", target+1, "lastHead", x)
continue // instantly continue processing, no need to idle continue // instantly continue processing, no need to idle
} else { } else {
s.log.Debug("Idling block-processing, reached latest block", "head", target) s.log.Debug("Idling block-processing, reached latest block", "head", target)
} }
if s.synchronous { return
s.out <- struct{}{}
}
// await next time we process, or detect shutdown
select {
case <-s.ctx.Done():
delay.Stop()
return
case <-s.newHead:
s.log.Debug("Responding to new head signal")
continue
case <-delay.C:
s.log.Debug("Checking for updates")
continue
}
} }
} }
...@@ -166,7 +176,7 @@ func (s *ChainProcessor) update(nextNum uint64) error { ...@@ -166,7 +176,7 @@ func (s *ChainProcessor) update(nextNum uint64) error {
return nil return nil
} }
func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.BlockRef) error { func (s *ChainProcessor) OnNewHead(head eth.BlockRef) error {
// update the latest target // update the latest target
s.lastHead.Store(head.Number) s.lastHead.Store(head.Number)
// signal that we have something to process // signal that we have something to process
......
package processors
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
)
// NewEthClient creates an Eth RPC client for event-log fetching.
func NewEthClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC,
pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate)
if err != nil {
return nil, fmt.Errorf("failed to create new RPC client: %w", err)
}
l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100))
if err != nil {
return nil, fmt.Errorf("failed to connect client: %w", err)
}
return l1Client, nil
}
...@@ -123,7 +123,7 @@ func identifierFromBytes(identifierBytes io.Reader) (contractIdentifier, error) ...@@ -123,7 +123,7 @@ func identifierFromBytes(identifierBytes io.Reader) (contractIdentifier, error)
// which is then hashed again. This is the hash that is stored in the log storage. // which is then hashed again. This is the hash that is stored in the log storage.
// The logHash can then be used to traverse from the executing message // The logHash can then be used to traverse from the executing message
// to the log the referenced initiating message. // to the log the referenced initiating message.
// TODO: this function is duplicated between contracts and backend/source/log_processor.go // TODO(#12424): this function is duplicated between contracts and backend/source/log_processor.go
// to avoid a circular dependency. It should be reorganized to avoid this duplication. // to avoid a circular dependency. It should be reorganized to avoid this duplication.
func payloadHashToLogHash(payloadHash common.Hash, addr common.Address) common.Hash { func payloadHashToLogHash(payloadHash common.Hash, addr common.Address) common.Hash {
msg := make([]byte, 0, 2*common.HashLength) msg := make([]byte, 0, 2*common.HashLength)
......
package source package processors
import ( import (
"context" "context"
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source/contracts" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors/contracts"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -34,7 +34,7 @@ type logProcessor struct { ...@@ -34,7 +34,7 @@ type logProcessor struct {
eventDecoder EventDecoder eventDecoder EventDecoder
} }
func newLogProcessor(chain types.ChainID, logStore LogStorage) *logProcessor { func NewLogProcessor(chain types.ChainID, logStore LogStorage) LogProcessor {
return &logProcessor{ return &logProcessor{
chain: chain, chain: chain,
logStore: logStore, logStore: logStore,
......
package source package processors
import ( import (
"context" "context"
...@@ -25,7 +25,7 @@ func TestLogProcessor(t *testing.T) { ...@@ -25,7 +25,7 @@ func TestLogProcessor(t *testing.T) {
} }
t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) { t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) {
store := &stubLogStorage{} store := &stubLogStorage{}
processor := newLogProcessor(logProcessorChainID, store) processor := NewLogProcessor(logProcessorChainID, store)
err := processor.ProcessLogs(ctx, block1, ethTypes.Receipts{}) err := processor.ProcessLogs(ctx, block1, ethTypes.Receipts{})
require.NoError(t, err) require.NoError(t, err)
...@@ -59,7 +59,7 @@ func TestLogProcessor(t *testing.T) { ...@@ -59,7 +59,7 @@ func TestLogProcessor(t *testing.T) {
}, },
} }
store := &stubLogStorage{} store := &stubLogStorage{}
processor := newLogProcessor(logProcessorChainID, store) processor := NewLogProcessor(logProcessorChainID, store)
err := processor.ProcessLogs(ctx, block1, rcpts) err := processor.ProcessLogs(ctx, block1, rcpts)
require.NoError(t, err) require.NoError(t, err)
...@@ -115,7 +115,7 @@ func TestLogProcessor(t *testing.T) { ...@@ -115,7 +115,7 @@ func TestLogProcessor(t *testing.T) {
Hash: common.Hash{0xaa}, Hash: common.Hash{0xaa},
} }
store := &stubLogStorage{} store := &stubLogStorage{}
processor := newLogProcessor(types.ChainID{4}, store) processor := NewLogProcessor(types.ChainID{4}, store).(*logProcessor)
processor.eventDecoder = EventDecoderFn(func(l *ethTypes.Log) (types.ExecutingMessage, error) { processor.eventDecoder = EventDecoderFn(func(l *ethTypes.Log) (types.ExecutingMessage, error) {
require.Equal(t, rcpts[0].Logs[0], l) require.Equal(t, rcpts[0].Logs[0], l)
return execMsg, nil return execMsg, nil
......
This diff is collapsed.
package safety
import (
"errors"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type View struct {
chainID types.ChainID
iter logs.Iterator
localView heads.HeadPointer
localDerivedFrom eth.BlockRef
validWithinView func(l1View uint64, execMsg *types.ExecutingMessage) error
}
func (vi *View) Cross() (heads.HeadPointer, error) {
return vi.iter.HeadPointer()
}
func (vi *View) Local() (heads.HeadPointer, error) {
if vi.localView == (heads.HeadPointer{}) {
return heads.HeadPointer{}, logs.ErrFuture
}
return vi.localView, nil
}
func (vi *View) UpdateLocal(at eth.BlockRef, ref eth.BlockRef) error {
vi.localView = heads.HeadPointer{
LastSealedBlockHash: ref.Hash,
LastSealedBlockNum: ref.Number,
//LastSealedTimestamp: ref.Time,
LogsSince: 0,
}
vi.localDerivedFrom = at
// TODO(#11693): reorg check against existing DB
// TODO(#12186): localView may be larger than what DB contents we have
return nil
}
func (vi *View) Process() error {
err := vi.iter.TraverseConditional(func(state logs.IteratorState) error {
hash, num, ok := state.SealedBlock()
if !ok {
return logs.ErrFuture // maybe a more specific error for no-genesis case?
}
// TODO(#11693): reorg check in the future. To make sure that what we traverse is still canonical.
_ = hash
// check if L2 block is within view
if !vi.localView.WithinRange(num, 0) {
return logs.ErrFuture
}
_, initLogIndex, ok := state.InitMessage()
if !ok {
return nil // no readable message, just an empty block
}
// check if the message is within view
if !vi.localView.WithinRange(num, initLogIndex) {
return logs.ErrFuture
}
// check if it is an executing message. If so, check the dependency
if execMsg := state.ExecMessage(); execMsg != nil {
// Check if executing message is within cross L2 view,
// relative to the L1 view of current message.
// And check if the message is valid to execute at all
// (i.e. if it exists on the initiating side).
// TODO(#12187): it's inaccurate to check with the view of the local-unsafe
// it should be limited to the L1 view at the time of the inclusion of execution of the message.
err := vi.validWithinView(vi.localDerivedFrom.Number, execMsg)
if err != nil {
return err
}
}
return nil
})
if err == nil {
panic("expected reader to complete with an exit-error")
}
if errors.Is(err, logs.ErrFuture) {
// register the new cross-safe block as cross-safe up to the current L1 view
return nil
}
return err
}
package source
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// TODO(optimism#11032) Make these configurable and a sensible default
const epochPollInterval = 3 * time.Second
const pollInterval = 2 * time.Second
const trustRpc = false
const rpcKind = sources.RPCKindStandard
type Metrics interface {
caching.Metrics
}
type Storage interface {
ChainsDBClientForLogProcessor
DatabaseRewinder
LatestBlockNum(chainID types.ChainID) (num uint64, ok bool)
}
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
log log.Logger
headMonitor *HeadMonitor
chainProcessor *ChainProcessor
}
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) {
logger = logger.New("chainID", chainID)
cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
if err != nil {
return nil, err
}
// Create the log processor and fetcher
processLogs := newLogProcessor(chainID, store)
unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, processLogs, store)
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback)
return &ChainMonitor{
log: logger,
headMonitor: headMonitor,
chainProcessor: unsafeBlockProcessor,
}, nil
}
func (c *ChainMonitor) Start() error {
c.log.Info("Started monitoring chain")
return c.headMonitor.Start()
}
func (c *ChainMonitor) Stop() error {
c.chainProcessor.Close()
return c.headMonitor.Stop()
}
func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate)
if err != nil {
return nil, fmt.Errorf("failed to create new RPC client: %w", err)
}
l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100))
if err != nil {
return nil, fmt.Errorf("failed to connect client: %w", err)
}
return l1Client, nil
}
package source
/* TODO
import (
"context"
"errors"
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
var processorChainID = types.ChainIDFromUInt64(4)
func TestUnsafeBlocksStage(t *testing.T) {
t.Run("IgnoreEventsAtOrPriorToStartingHead", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, processorChainID, processor, &stubRewinder{})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99})
require.Empty(t, processor.processed)
require.Zero(t, client.calls)
})
t.Run("OutputNewHeadsWithNoMissedBlocks", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101}
block2 := eth.L1BlockRef{Number: 102}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
stage.OnNewHead(ctx, block2)
require.Equal(t, []eth.L1BlockRef{block1, block2}, processor.processed)
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{block1, block2, block3}, processor.processed)
require.Zero(t, client.calls, "should not need to request block info")
})
t.Run("IgnoreEventsAtOrPriorToPreviousHead", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block1)
require.NotEmpty(t, processor.processed)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
stage.OnNewHead(ctx, block0)
stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
require.Zero(t, client.calls, "should not need to request block info")
})
t.Run("OutputSkippedBlocks", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
require.Equal(t, 2, client.calls, "should only request the two missing blocks")
})
t.Run("DoNotUpdateLastBlockOnFetchError", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{err: errors.New("boom")}
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
stage.OnNewHead(ctx, block3)
require.Empty(t, processor.processed, "should not update any blocks because backfill failed")
client.err = nil
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
require.False(t, rewinder.rewindCalled, "should not rewind because no logs could have been written")
})
t.Run("DoNotUpdateLastBlockOnProcessorError", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{err: errors.New("boom")}
rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101)}, processor.processed, "Attempted to process block 101")
require.Equal(t, block0.Number, rewinder.rewoundTo, "should rewind to block before error")
processor.err = nil
stage.OnNewHead(ctx, block3)
// Attempts to process block 101 again, then carries on
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
})
t.Run("RewindWhenNewHeadProcessingFails", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101}
processor := &stubBlockProcessor{err: errors.New("boom")}
rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
// No skipped blocks
stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed, "Attempted to process block 101")
require.Equal(t, block0.Number, rewinder.rewoundTo, "should rewind to block before error")
})
}
type stubBlockByNumberSource struct {
calls int
err error
}
func (s *stubBlockByNumberSource) L1BlockRefByNumber(_ context.Context, number uint64) (eth.L1BlockRef, error) {
s.calls++
if s.err != nil {
return eth.L1BlockRef{}, s.err
}
return makeBlockRef(number), nil
}
type stubBlockProcessor struct {
processed []eth.L1BlockRef
err error
}
func (s *stubBlockProcessor) ProcessBlock(_ context.Context, block eth.L1BlockRef) error {
s.processed = append(s.processed, block)
return s.err
}
func makeBlockRef(number uint64) eth.L1BlockRef {
return eth.L1BlockRef{
Number: number,
Hash: common.Hash{byte(number)},
ParentHash: common.Hash{byte(number - 1)},
Time: number * 1000,
}
}
type stubRewinder struct {
rewoundTo uint64
rewindCalled bool
}
func (s *stubRewinder) Rewind(chainID types.ChainID, headBlockNum uint64) error {
if chainID != processorChainID {
return fmt.Errorf("chainID mismatch, expected %v but was %v", processorChainID, chainID)
}
s.rewoundTo = headBlockNum
s.rewindCalled = true
return nil
}
*/
package source
import (
"context"
"errors"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
type HeadMonitorClient interface {
eth.NewHeadSource
eth.L1BlockRefsSource
}
type HeadChangeCallback interface {
OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef)
OnNewSafeHead(ctx context.Context, block eth.L1BlockRef)
OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef)
}
// HeadMonitor monitors an L2 chain and sends notifications when the unsafe, safe or finalized head changes.
// Head updates may be coalesced, allowing the head block to skip forward multiple blocks.
// Reorgs are not identified.
type HeadMonitor struct {
log log.Logger
epochPollInterval time.Duration
rpc HeadMonitorClient
callback HeadChangeCallback
started atomic.Bool
headsSub event.Subscription
safeSub ethereum.Subscription
finalizedSub ethereum.Subscription
}
func NewHeadMonitor(logger log.Logger, epochPollInterval time.Duration, rpc HeadMonitorClient, callback HeadChangeCallback) *HeadMonitor {
return &HeadMonitor{
log: logger,
epochPollInterval: epochPollInterval,
rpc: rpc,
callback: callback,
}
}
func (h *HeadMonitor) Start() error {
if !h.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// Keep subscribed to the unsafe head, which changes frequently.
h.headsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
h.log.Warn("Resubscribing after failed heads subscription", "err", err)
}
return eth.WatchHeadChanges(ctx, h.rpc, h.callback.OnNewUnsafeHead)
})
go func() {
err, ok := <-h.headsSub.Err()
if !ok {
return
}
h.log.Error("Heads subscription error", "err", err)
}()
// Poll for the safe block and finalized block, which only change once per epoch at most and may be delayed.
h.safeSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewSafeHead, eth.Safe,
h.epochPollInterval, time.Second*10)
h.finalizedSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewFinalizedHead, eth.Finalized,
h.epochPollInterval, time.Second*10)
h.log.Info("Chain head monitoring started")
return nil
}
func (h *HeadMonitor) Stop() error {
if !h.started.CompareAndSwap(true, false) {
return errors.New("already stopped")
}
// stop heads feed
if h.headsSub != nil {
h.headsSub.Unsubscribe()
}
// stop polling for safe-head changes
if h.safeSub != nil {
h.safeSub.Unsubscribe()
}
// stop polling for finalized-head changes
if h.finalizedSub != nil {
h.finalizedSub.Unsubscribe()
}
return nil
}
package source
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
const waitDuration = 10 * time.Second
const checkInterval = 10 * time.Millisecond
func TestUnsafeHeadUpdates(t *testing.T) {
rng := rand.New(rand.NewSource(0x1337))
header1 := testutils.RandomHeader(rng)
header2 := testutils.RandomHeader(rng)
t.Run("NotifyOfNewHeads", func(t *testing.T) {
rpc, callback := startHeadMonitor(t)
rpc.NewUnsafeHead(t, header1)
callback.RequireUnsafeHeaders(t, header1)
rpc.NewUnsafeHead(t, header2)
callback.RequireUnsafeHeaders(t, header1, header2)
})
t.Run("ResubscribeOnError", func(t *testing.T) {
rpc, callback := startHeadMonitor(t)
rpc.SubscriptionError(t)
rpc.NewUnsafeHead(t, header1)
callback.RequireUnsafeHeaders(t, header1)
})
}
func TestSafeHeadUpdates(t *testing.T) {
rpc, callback := startHeadMonitor(t)
head1 := eth.L1BlockRef{
Hash: common.Hash{0xaa},
Number: 1,
}
head2 := eth.L1BlockRef{
Hash: common.Hash{0xbb},
Number: 2,
}
rpc.SetSafeHead(head1)
callback.RequireSafeHeaders(t, head1)
rpc.SetSafeHead(head2)
callback.RequireSafeHeaders(t, head1, head2)
}
func TestFinalizedHeadUpdates(t *testing.T) {
rpc, callback := startHeadMonitor(t)
head1 := eth.L1BlockRef{
Hash: common.Hash{0xaa},
Number: 1,
}
head2 := eth.L1BlockRef{
Hash: common.Hash{0xbb},
Number: 2,
}
rpc.SetFinalizedHead(head1)
callback.RequireFinalizedHeaders(t, head1)
rpc.SetFinalizedHead(head2)
callback.RequireFinalizedHeaders(t, head1, head2)
}
func startHeadMonitor(t *testing.T) (*stubRPC, *stubCallback) {
logger := testlog.Logger(t, log.LvlInfo)
rpc := &stubRPC{}
callback := &stubCallback{}
monitor := NewHeadMonitor(logger, 50*time.Millisecond, rpc, callback)
require.NoError(t, monitor.Start())
t.Cleanup(func() {
require.NoError(t, monitor.Stop())
})
return rpc, callback
}
type stubCallback struct {
sync.Mutex
unsafe []eth.L1BlockRef
safe []eth.L1BlockRef
finalized []eth.L1BlockRef
}
func (s *stubCallback) RequireUnsafeHeaders(t *testing.T, heads ...*types.Header) {
expected := make([]eth.L1BlockRef, len(heads))
for i, head := range heads {
expected[i] = eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head))
}
s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.unsafe }, expected)
}
func (s *stubCallback) RequireSafeHeaders(t *testing.T, expected ...eth.L1BlockRef) {
s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.safe }, expected)
}
func (s *stubCallback) RequireFinalizedHeaders(t *testing.T, expected ...eth.L1BlockRef) {
s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.finalized }, expected)
}
func (s *stubCallback) requireHeaders(t *testing.T, getter func(*stubCallback) []eth.L1BlockRef, expected []eth.L1BlockRef) {
require.Eventually(t, func() bool {
s.Lock()
defer s.Unlock()
return len(getter(s)) >= len(expected)
}, waitDuration, checkInterval)
s.Lock()
defer s.Unlock()
require.Equal(t, expected, getter(s))
}
func (s *stubCallback) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.unsafe = append(s.unsafe, block)
}
func (s *stubCallback) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.safe = append(s.safe, block)
}
func (s *stubCallback) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.finalized = append(s.finalized, block)
}
var _ HeadChangeCallback = (*stubCallback)(nil)
type stubRPC struct {
sync.Mutex
sub *mockSubscription
safeHead eth.L1BlockRef
finalizedHead eth.L1BlockRef
}
func (s *stubRPC) SubscribeNewHead(_ context.Context, unsafeCh chan<- *types.Header) (ethereum.Subscription, error) {
s.Lock()
defer s.Unlock()
if s.sub != nil {
return nil, errors.New("already subscribed to unsafe heads")
}
errChan := make(chan error)
s.sub = &mockSubscription{errChan, unsafeCh, s}
return s.sub, nil
}
func (s *stubRPC) SetSafeHead(head eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.safeHead = head
}
func (s *stubRPC) SetFinalizedHead(head eth.L1BlockRef) {
s.Lock()
defer s.Unlock()
s.finalizedHead = head
}
func (s *stubRPC) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) {
s.Lock()
defer s.Unlock()
switch label {
case eth.Safe:
if s.safeHead == (eth.L1BlockRef{}) {
return eth.L1BlockRef{}, errors.New("no unsafe head")
}
return s.safeHead, nil
case eth.Finalized:
if s.finalizedHead == (eth.L1BlockRef{}) {
return eth.L1BlockRef{}, errors.New("no finalized head")
}
return s.finalizedHead, nil
default:
return eth.L1BlockRef{}, fmt.Errorf("unknown label: %v", label)
}
}
func (s *stubRPC) NewUnsafeHead(t *testing.T, header *types.Header) {
s.WaitForSub(t)
s.Lock()
defer s.Unlock()
require.NotNil(t, s.sub, "Attempting to publish a header with no subscription")
s.sub.headers <- header
}
func (s *stubRPC) SubscriptionError(t *testing.T) {
s.WaitForSub(t)
s.Lock()
defer s.Unlock()
s.sub.errChan <- errors.New("subscription error")
s.sub = nil
}
func (s *stubRPC) WaitForSub(t *testing.T) {
require.Eventually(t, func() bool {
s.Lock()
defer s.Unlock()
return s.sub != nil
}, waitDuration, checkInterval, "Head monitor did not subscribe to unsafe head")
}
var _ HeadMonitorClient = (*stubRPC)(nil)
type mockSubscription struct {
errChan chan error
headers chan<- *types.Header
rpc *stubRPC
}
func (m *mockSubscription) Unsubscribe() {
fmt.Println("Unsubscribed")
m.rpc.Lock()
defer m.rpc.Unlock()
m.rpc.sub = nil
}
func (m *mockSubscription) Err() <-chan error {
return m.errChan
}
package source
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type HeadProcessor interface {
OnNewHead(ctx context.Context, head eth.L1BlockRef) error
}
type HeadProcessorFn func(ctx context.Context, head eth.L1BlockRef) error
func (f HeadProcessorFn) OnNewHead(ctx context.Context, head eth.L1BlockRef) error {
return f(ctx, head)
}
// headUpdateProcessor handles head update events and routes them to the appropriate handlers
type headUpdateProcessor struct {
log log.Logger
unsafeProcessors []HeadProcessor
safeProcessors []HeadProcessor
finalizedProcessors []HeadProcessor
}
func newHeadUpdateProcessor(log log.Logger, unsafeProcessors []HeadProcessor, safeProcessors []HeadProcessor, finalizedProcessors []HeadProcessor) *headUpdateProcessor {
return &headUpdateProcessor{
log: log,
unsafeProcessors: unsafeProcessors,
safeProcessors: safeProcessors,
finalizedProcessors: finalizedProcessors,
}
}
func (n *headUpdateProcessor) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New unsafe head", "block", block)
for _, processor := range n.unsafeProcessors {
if err := processor.OnNewHead(ctx, block); err != nil {
n.log.Error("unsafe-head processing failed", "err", err)
}
}
}
func (n *headUpdateProcessor) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New safe head", "block", block)
for _, processor := range n.safeProcessors {
if err := processor.OnNewHead(ctx, block); err != nil {
n.log.Error("safe-head processing failed", "err", err)
}
}
}
func (n *headUpdateProcessor) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New finalized head", "block", block)
for _, processor := range n.finalizedProcessors {
if err := processor.OnNewHead(ctx, block); err != nil {
n.log.Error("finalized-head processing failed", "err", err)
}
}
}
// OnNewHead is a util function to turn a head-signal processor into head-pointer updater
func OnNewHead(id types.ChainID, apply func(id types.ChainID, v heads.HeadPointer) error) HeadProcessorFn {
return func(ctx context.Context, head eth.L1BlockRef) error {
return apply(id, heads.HeadPointer{
LastSealedBlockHash: head.Hash,
LastSealedBlockNum: head.Number,
LogsSince: 0,
})
}
}
package source
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestHeadUpdateProcessor(t *testing.T) {
t.Run("NotifyUnsafeHeadProcessors", func(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) error {
processed[idx] = head
return nil
})
}
headUpdates := newHeadUpdateProcessor(logger, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil, nil)
block := eth.L1BlockRef{Number: 110, Hash: common.Hash{0xaa}}
headUpdates.OnNewUnsafeHead(context.Background(), block)
require.Equal(t, []eth.L1BlockRef{block, block, block}, processed)
})
t.Run("NotifySafeHeadProcessors", func(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) error {
processed[idx] = head
return nil
})
}
headUpdates := newHeadUpdateProcessor(logger, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil)
block := eth.L1BlockRef{Number: 110, Hash: common.Hash{0xaa}}
headUpdates.OnNewSafeHead(context.Background(), block)
require.Equal(t, []eth.L1BlockRef{block, block, block}, processed)
})
t.Run("NotifyFinalizedHeadProcessors", func(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) error {
processed[idx] = head
return nil
})
}
headUpdates := newHeadUpdateProcessor(logger, nil, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)})
block := eth.L1BlockRef{Number: 110, Hash: common.Hash{0xaa}}
headUpdates.OnNewFinalizedHead(context.Background(), block)
require.Equal(t, []eth.L1BlockRef{block, block, block}, processed)
})
}
...@@ -3,11 +3,9 @@ package frontend ...@@ -3,11 +3,9 @@ package frontend
import ( import (
"context" "context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
) )
type AdminBackend interface { type AdminBackend interface {
...@@ -19,19 +17,22 @@ type AdminBackend interface { ...@@ -19,19 +17,22 @@ type AdminBackend interface {
type QueryBackend interface { type QueryBackend interface {
CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error)
CheckMessages(messages []types.Message, minSafety types.SafetyLevel) error CheckMessages(messages []types.Message, minSafety types.SafetyLevel) error
CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) DerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error)
DerivedFrom(ctx context.Context, chainID types.ChainID, blockHash common.Hash, blockNumber uint64) (eth.BlockRef, error) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error)
SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error)
Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error)
} }
type UpdatesBackend interface { type UpdatesBackend interface {
UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) error
UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error
UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) error
} }
type Backend interface { type Backend interface {
AdminBackend AdminBackend
QueryBackend QueryBackend
UpdatesBackend
} }
type QueryFrontend struct { type QueryFrontend struct {
...@@ -53,23 +54,19 @@ func (q *QueryFrontend) CheckMessages( ...@@ -53,23 +54,19 @@ func (q *QueryFrontend) CheckMessages(
} }
func (q *QueryFrontend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) { func (q *QueryFrontend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
// TODO(#12358): attach to backend return q.Supervisor.UnsafeView(ctx, chainID, unsafe)
return types.ReferenceView{}, nil
} }
func (q *QueryFrontend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) { func (q *QueryFrontend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
// TODO(#12358): attach to backend return q.Supervisor.SafeView(ctx, chainID, safe)
return types.ReferenceView{}, nil
} }
func (q *QueryFrontend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) { func (q *QueryFrontend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
// TODO(#12358): attach to backend return q.Supervisor.Finalized(ctx, chainID)
return eth.BlockID{}, nil
} }
func (q *QueryFrontend) DerivedFrom(ctx context.Context, chainID types.ChainID, blockHash common.Hash, blockNumber uint64) (eth.BlockRef, error) { func (q *QueryFrontend) DerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) {
// TODO(#12358): attach to backend return q.Supervisor.DerivedFrom(ctx, chainID, derived)
return eth.BlockRef{}, nil
} }
type AdminFrontend struct { type AdminFrontend struct {
...@@ -95,14 +92,14 @@ type UpdatesFrontend struct { ...@@ -95,14 +92,14 @@ type UpdatesFrontend struct {
Supervisor UpdatesBackend Supervisor UpdatesBackend
} }
func (u *UpdatesFrontend) UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) { func (u *UpdatesFrontend) UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) error {
u.Supervisor.UpdateLocalUnsafe(chainID, head) return u.Supervisor.UpdateLocalUnsafe(chainID, head)
} }
func (u *UpdatesFrontend) UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) { func (u *UpdatesFrontend) UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
u.Supervisor.UpdateLocalSafe(chainID, derivedFrom, lastDerived) return u.Supervisor.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
} }
func (u *UpdatesFrontend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) { func (u *UpdatesFrontend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) error {
u.Supervisor.UpdateFinalizedL1(chainID, finalized) return u.Supervisor.UpdateFinalizedL1(chainID, finalized)
} }
...@@ -4,10 +4,8 @@ import ( ...@@ -4,10 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"sync/atomic" "sync/atomic"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
...@@ -16,6 +14,7 @@ import ( ...@@ -16,6 +14,7 @@ import (
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof" "github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics" "github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
...@@ -23,7 +22,6 @@ import ( ...@@ -23,7 +22,6 @@ import (
type Backend interface { type Backend interface {
frontend.Backend frontend.Backend
io.Closer
} }
// SupervisorService implements the full-environment bells and whistles around the Supervisor. // SupervisorService implements the full-environment bells and whistles around the Supervisor.
...@@ -149,6 +147,11 @@ func (su *SupervisorService) initRPCServer(cfg *config.Config) error { ...@@ -149,6 +147,11 @@ func (su *SupervisorService) initRPCServer(cfg *config.Config) error {
Service: &frontend.QueryFrontend{Supervisor: su.backend}, Service: &frontend.QueryFrontend{Supervisor: su.backend},
Authenticated: false, Authenticated: false,
}) })
server.AddAPI(rpc.API{
Namespace: "supervisor",
Service: &frontend.UpdatesFrontend{Supervisor: su.backend},
Authenticated: false,
})
su.rpcServer = server su.rpcServer = server
return nil return nil
} }
...@@ -179,16 +182,19 @@ func (su *SupervisorService) Stop(ctx context.Context) error { ...@@ -179,16 +182,19 @@ func (su *SupervisorService) Stop(ctx context.Context) error {
result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err)) result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err))
} }
} }
su.log.Info("Stopped RPC Server")
if su.backend != nil { if su.backend != nil {
if err := su.backend.Close(); err != nil { if err := su.backend.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close supervisor backend: %w", err)) result = errors.Join(result, fmt.Errorf("failed to close supervisor backend: %w", err))
} }
} }
su.log.Info("Stopped Backend")
if su.pprofService != nil { if su.pprofService != nil {
if err := su.pprofService.Stop(ctx); err != nil { if err := su.pprofService.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err)) result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err))
} }
} }
su.log.Info("Stopped PProf")
if su.metricsSrv != nil { if su.metricsSrv != nil {
if err := su.metricsSrv.Stop(ctx); err != nil { if err := su.metricsSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err)) result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
......
...@@ -23,6 +23,11 @@ type ExecutingMessage struct { ...@@ -23,6 +23,11 @@ type ExecutingMessage struct {
Hash common.Hash Hash common.Hash
} }
func (s *ExecutingMessage) String() string {
return fmt.Sprintf("ExecMsg(chain: %d, block: %d, log: %d, time: %d, logHash: %s)",
s.Chain, s.BlockNum, s.LogIdx, s.Timestamp, s.Hash)
}
type Message struct { type Message struct {
Identifier Identifier `json:"identifier"` Identifier Identifier `json:"identifier"`
PayloadHash common.Hash `json:"payloadHash"` PayloadHash common.Hash `json:"payloadHash"`
...@@ -171,3 +176,17 @@ type ReferenceView struct { ...@@ -171,3 +176,17 @@ type ReferenceView struct {
func (v ReferenceView) String() string { func (v ReferenceView) String() string {
return fmt.Sprintf("View(local: %s, cross: %s)", v.Local, v.Cross) return fmt.Sprintf("View(local: %s, cross: %s)", v.Local, v.Cross)
} }
type BlockSeal struct {
Hash common.Hash
Number uint64
Timestamp uint64
}
func (s BlockSeal) String() string {
return fmt.Sprintf("BlockSeal(hash:%s, number:%d, time:%d)", s.Hash, s.Number, s.Timestamp)
}
func (s BlockSeal) ID() eth.BlockID {
return eth.BlockID{Hash: s.Hash, Number: s.Number}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment