Commit 58793000 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Interop: XSafe Head Maintainer (#11458)

* WIP: Cross-Head Maintenance

* Add NextExecutingMessage ; Add ChainsDB Tests

* Add Tests for SafetyCheckers

* spelling

* correct test

* add safety_checkers_test.go

* Address Coments From Proto
parent 01306200
...@@ -6,6 +6,9 @@ import ( ...@@ -6,6 +6,9 @@ import (
"io" "io"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -20,11 +23,18 @@ type LogStorage interface { ...@@ -20,11 +23,18 @@ type LogStorage interface {
Rewind(newHeadBlockNum uint64) error Rewind(newHeadBlockNum uint64) error
LatestBlockNum() uint64 LatestBlockNum() uint64
ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error) ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error)
LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error)
NextExecutingMessage(logs.Iterator) (backendTypes.ExecutingMessage, error)
} }
type HeadsStorage interface { type HeadsStorage interface {
Current() *heads.Heads
Apply(op heads.Operation) error
} }
// ChainsDB is a database that stores logs and heads for multiple chains.
// it implements the ChainsStorage interface.
type ChainsDB struct { type ChainsDB struct {
logDBs map[types.ChainID]LogStorage logDBs map[types.ChainID]LogStorage
heads HeadsStorage heads HeadsStorage
...@@ -49,6 +59,79 @@ func (db *ChainsDB) Resume() error { ...@@ -49,6 +59,79 @@ func (db *ChainsDB) Resume() error {
return nil return nil
} }
// UpdateCrossSafeHeads updates the cross-heads of all chains
// this is an example of how to use the SafetyChecker to update the cross-heads
func (db *ChainsDB) UpdateCrossSafeHeads() error {
checker := NewSafetyChecker(Safe, *db)
return db.UpdateCrossHeads(checker)
}
// UpdateCrossHeadsForChain updates the cross-head for a single chain.
// the provided checker controls which heads are considered.
// TODO: we should invert control and have the underlying logDB call their own update
// for now, monolithic control is fine. There may be a stronger reason to refactor if the API needs it.
func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error {
// start with the xsafe head of the chain
xHead := checker.CrossHeadForChain(chainID)
// advance as far as the local head
localHead := checker.LocalHeadForChain(chainID)
// get an iterator for the last checkpoint behind the x-head
i, err := db.logDBs[chainID].LastCheckpointBehind(xHead)
if err != nil {
return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
}
// advance the logDB through all executing messages we can
// this loop will break:
// - when we reach the local head
// - when we reach a message that is not safe
// - if an error occurs
for {
exec, err := db.logDBs[chainID].NextExecutingMessage(i)
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err)
}
// if we are now beyond the local head, stop
if i.Index() > localHead {
break
}
// use the checker to determine if this message is safe
safe := checker.Check(
types.ChainIDFromUInt64(uint64(exec.Chain)),
exec.BlockNum,
exec.LogIdx,
exec.Hash)
if !safe {
break
}
// if all is well, prepare the x-head update to this point
xHead = i.Index()
}
// have the checker create an update to the x-head in question, and apply that update
err = db.heads.Apply(checker.Update(chainID, xHead))
if err != nil {
return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
}
return nil
}
// UpdateCrossSafeHeads updates the cross-heads of all chains
// based on the provided SafetyChecker. The SafetyChecker is used to determine
// the safety of each log entry in the database, and the cross-head associated with it.
func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
currentHeads := db.heads.Current()
for chainID := range currentHeads.Chains {
if err := db.UpdateCrossHeadsForChain(chainID, checker); err != nil {
return err
}
}
return nil
}
// LatestBlockNum returns the latest block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 { func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 {
logDB, ok := db.logDBs[chain] logDB, ok := db.logDBs[chain]
if !ok { if !ok {
......
package db package db
import ( import (
"fmt"
"io"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -47,11 +52,244 @@ func TestChainsDB_Rewind(t *testing.T) { ...@@ -47,11 +52,244 @@ func TestChainsDB_Rewind(t *testing.T) {
}) })
} }
type stubHeadStorage struct{} func TestChainsDB_UpdateCrossHeads(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)
// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
// 2. progress the iterator to the next log (16) because the first safety check will pass
// 3. fail the second safety check
// 4. update the cross-heads to the last successful safety check (16)
err := db.UpdateCrossHeads(checker)
require.NoError(t, err)
require.Equal(t, entrydb.EntryIdx(16), checker.updated)
}
func TestChainsDB_UpdateCrossHeadsBeyondLocal(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)
// set the safety checker to pass 99 times, effeciively allowing all messages to be safe
checker.numSafe = 99
// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
// 2. progress the iterator to repeatedly, as the safety check will pass 99 times.
// 3. exceed the local head, and update the cross-head to the local head (40)
err := db.UpdateCrossHeads(checker)
require.NoError(t, err)
require.Equal(t, entrydb.EntryIdx(40), checker.updated)
}
func TestChainsDB_UpdateCrossHeadsEOF(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)
// set the log DB to return an EOF error when trying to get the next executing message
// after processing 10 messages as safe (with more messages available to be safe)
logDB.errOverload = io.EOF
logDB.errAfter = 10
checker.numSafe = 99
// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
// 2. after processing 10 messages as safe, fail to find any executing messages (EOF)
// 3. update to the last successful safety check (25) without returning an error
err := db.UpdateCrossHeads(checker)
require.NoError(t, err)
require.Equal(t, entrydb.EntryIdx(25), checker.updated)
}
func TestChainsDB_UpdateCrossHeadsError(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)
// set the log DB to return an error when trying to get the next executing message
// after processing 3 messages as safe (with more messages available to be safe)
logDB.errOverload = fmt.Errorf("some error")
logDB.errAfter = 3
checker.numSafe = 99
// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 10)
// 2. fail during execution, even after processing 3 messages as safe
// 3. exit without updating, returning the error
err := db.UpdateCrossHeads(checker)
require.Error(t, err)
// the update was never set (aka 0-value)
require.Equal(t, entrydb.EntryIdx(0), checker.updated)
}
// setupStubbedForUpdateHeads sets up stubbed components for testing the UpdateCrossHeads method
// it returns stubbed structs which are suitable for their interfaces, and can be modified before testing
// TODO: the variables at the top of this function should be configurable by the caller.
// this isn't an issue for now, as all tests can modify the stubbed components directly after calling this function.
// but readability and maintainability would be improved by making this function more configurable.
func setupStubbedForUpdateHeads(chainID types.ChainID) (*stubLogDB, *stubChecker, *heads.Heads) {
// the checkpoint starts somewhere behind the last known cross-safe head
checkpoint := entrydb.EntryIdx(15)
// the last known cross-safe head is at 20
cross := entrydb.EntryIdx(20)
// the local head (the limit of the update) is at 40
local := entrydb.EntryIdx(40)
// the number of executing messages to make available (this should be more than the number of safety checks performed)
numExecutingMessages := 30
// number of safety checks that will pass before returning false
numSafe := 1
// number of calls to nextExecutingMessage before potentially returning an error
errAfter := 4
// set up stubbed logDB
logDB := &stubLogDB{}
// the log DB will start the iterator at the checkpoint index
logDB.lastCheckpointBehind = &stubIterator{checkpoint}
// rig the log DB to return an error after a certain number of calls to NextExecutingMessage
logDB.errAfter = errAfter
// set up stubbed executing messages that the ChainsDB can pass to the checker
logDB.executingMessages = []*backendTypes.ExecutingMessage{}
for i := 0; i < numExecutingMessages; i++ {
// executing messages are packed in groups of 3, with block numbers increasing by 1
logDB.executingMessages = append(logDB.executingMessages, &backendTypes.ExecutingMessage{
BlockNum: uint64(100 + int(i/3)),
LogIdx: uint32(i),
Hash: backendTypes.TruncatedHash{},
})
}
// set up stubbed checker
checker := &stubChecker{
localHeadForChain: local,
crossHeadForChain: cross,
// the first safety check will return true, the second false
numSafe: numSafe,
}
// set up stubbed heads with sample values
h := heads.NewHeads()
h.Chains[chainID] = heads.ChainHeads{}
return logDB, checker, h
}
type stubChecker struct {
localHeadForChain entrydb.EntryIdx
crossHeadForChain entrydb.EntryIdx
numSafe int
checkCalls int
updated entrydb.EntryIdx
}
func (s *stubChecker) LocalHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
return s.localHeadForChain
}
func (s *stubChecker) CrossHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
return s.crossHeadForChain
}
// stubbed Check returns true for the first numSafe calls, and false thereafter
func (s *stubChecker) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) bool {
if s.checkCalls >= s.numSafe {
return false
}
s.checkCalls++
return true
}
func (s *stubChecker) Update(chain types.ChainID, index entrydb.EntryIdx) heads.OperationFn {
s.updated = index
return func(heads *heads.Heads) error {
return nil
}
}
type stubHeadStorage struct {
heads *heads.Heads
}
func (s *stubHeadStorage) Apply(heads.Operation) error {
return nil
}
func (s *stubHeadStorage) Current() *heads.Heads {
if s.heads == nil {
s.heads = heads.NewHeads()
}
return s.heads.Copy()
}
type stubIterator struct {
index entrydb.EntryIdx
}
func (s *stubIterator) NextLog() (uint64, uint32, backendTypes.TruncatedHash, error) {
panic("not implemented")
}
func (s *stubIterator) Index() entrydb.EntryIdx {
return s.index
}
func (s *stubIterator) ExecMessage() (backendTypes.ExecutingMessage, error) {
panic("not implemented")
}
type stubLogDB struct { type stubLogDB struct {
addLogCalls int addLogCalls int
headBlockNum uint64 headBlockNum uint64
emIndex int
executingMessages []*backendTypes.ExecutingMessage
lastCheckpointBehind *stubIterator
errOverload error
errAfter int
containsResponse containsResponse
}
// stubbed LastCheckpointBehind returns a stubbed iterator which was passed in to the struct
func (s *stubLogDB) LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error) {
return s.lastCheckpointBehind, nil
}
func (s *stubLogDB) NextExecutingMessage(i logs.Iterator) (backendTypes.ExecutingMessage, error) {
// if error overload is set, return it to simulate a failure condition
if s.errOverload != nil && s.emIndex >= s.errAfter {
return backendTypes.ExecutingMessage{}, s.errOverload
}
// increment the iterator to mark advancement
i.(*stubIterator).index += 1
// return the next executing message
m := *s.executingMessages[s.emIndex]
// and increment to the next message for the next call
s.emIndex++
return m, nil
} }
func (s *stubLogDB) ClosestBlockInfo(_ uint64) (uint64, backendTypes.TruncatedHash, error) { func (s *stubLogDB) ClosestBlockInfo(_ uint64) (uint64, backendTypes.TruncatedHash, error) {
...@@ -63,6 +301,18 @@ func (s *stubLogDB) AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID ...@@ -63,6 +301,18 @@ func (s *stubLogDB) AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID
return nil return nil
} }
type containsResponse struct {
contains bool
index entrydb.EntryIdx
err error
}
// stubbed Contains records the arguments passed to it
// it returns the response set in the struct, or an empty response
func (s *stubLogDB) Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error) {
return s.containsResponse.contains, s.containsResponse.index, s.containsResponse.err
}
func (s *stubLogDB) Rewind(newHeadBlockNum uint64) error { func (s *stubLogDB) Rewind(newHeadBlockNum uint64) error {
s.headBlockNum = newHeadBlockNum s.headBlockNum = newHeadBlockNum
return nil return nil
......
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -48,6 +50,14 @@ type stubLogStore struct { ...@@ -48,6 +50,14 @@ type stubLogStore struct {
rewoundTo uint64 rewoundTo uint64
} }
func (s *stubLogStore) Contains(blockNum uint64, logIdx uint32, loghash types.TruncatedHash) (bool, entrydb.EntryIdx, error) {
panic("not supported")
}
func (s *stubLogStore) LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error) {
panic("not supported")
}
func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) { func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) {
if s.closestBlockErr != nil { if s.closestBlockErr != nil {
return 0, types.TruncatedHash{}, s.closestBlockErr return 0, types.TruncatedHash{}, s.closestBlockErr
...@@ -55,6 +65,10 @@ func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.Truncate ...@@ -55,6 +65,10 @@ func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.Truncate
return s.closestBlockNumber, types.TruncatedHash{}, nil return s.closestBlockNumber, types.TruncatedHash{}, nil
} }
func (s *stubLogStore) NextExecutingMessage(logs.Iterator) (types.ExecutingMessage, error) {
panic("not supported")
}
func (s *stubLogStore) Rewind(headBlockNum uint64) error { func (s *stubLogStore) Rewind(headBlockNum uint64) error {
s.rewoundTo = headBlockNum s.rewoundTo = headBlockNum
return nil return nil
......
...@@ -202,29 +202,42 @@ func (db *DB) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, er ...@@ -202,29 +202,42 @@ func (db *DB) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, er
return checkpoint.blockNum, entry.hash, nil return checkpoint.blockNum, entry.hash, nil
} }
// Get returns the truncated hash of the log at the specified blockNum and logIdx,
// or an error if the log is not found.
func (db *DB) Get(blockNum uint64, logiIdx uint32) (types.TruncatedHash, error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
hash, _, err := db.findLogInfo(blockNum, logiIdx)
return hash, err
}
// Contains return true iff the specified logHash is recorded in the specified blockNum and logIdx. // Contains return true iff the specified logHash is recorded in the specified blockNum and logIdx.
// logIdx is the index of the log in the array of all logs the block. // If the log is found, the entry index of the log is returned, too.
// logIdx is the index of the log in the array of all logs in the block.
// This can be used to check the validity of cross-chain interop events. // This can be used to check the validity of cross-chain interop events.
func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash types.TruncatedHash) (bool, error) { func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash types.TruncatedHash) (bool, entrydb.EntryIdx, error) {
db.rwLock.RLock() db.rwLock.RLock()
defer db.rwLock.RUnlock() defer db.rwLock.RUnlock()
db.log.Trace("Checking for log", "blockNum", blockNum, "logIdx", logIdx, "hash", logHash) db.log.Trace("Checking for log", "blockNum", blockNum, "logIdx", logIdx, "hash", logHash)
evtHash, _, err := db.findLogInfo(blockNum, logIdx) evtHash, iter, err := db.findLogInfo(blockNum, logIdx)
if errors.Is(err, ErrNotFound) { if errors.Is(err, ErrNotFound) {
// Did not find a log at blockNum and logIdx // Did not find a log at blockNum and logIdx
return false, nil return false, 0, nil
} else if err != nil { } else if err != nil {
return false, err return false, 0, err
} }
db.log.Trace("Found initiatingEvent", "blockNum", blockNum, "logIdx", logIdx, "hash", evtHash) db.log.Trace("Found initiatingEvent", "blockNum", blockNum, "logIdx", logIdx, "hash", evtHash)
// Found the requested block and log index, check if the hash matches // Found the requested block and log index, check if the hash matches
return evtHash == logHash, nil if evtHash == logHash {
return true, iter.Index(), nil
}
return false, 0, nil
} }
// Executes checks if the log identified by the specific block number and log index, has an ExecutingMessage associated // Executes checks if the log identified by the specific block number and log index, has an ExecutingMessage associated
// with it that needs to be checked as part of interop validation. // with it that needs to be checked as part of interop validation.
// logIdx is the index of the log in the array of all logs the block. // logIdx is the index of the log in the array of all logs in the block.
// Returns the ExecutingMessage if it exists, or ExecutingMessage{} if the log is found but has no ExecutingMessage. // Returns the ExecutingMessage if it exists, or ExecutingMessage{} if the log is found but has no ExecutingMessage.
// Returns ErrNotFound if the specified log does not exist in the database. // Returns ErrNotFound if the specified log does not exist in the database.
func (db *DB) Executes(blockNum uint64, logIdx uint32) (types.ExecutingMessage, error) { func (db *DB) Executes(blockNum uint64, logIdx uint32) (types.ExecutingMessage, error) {
...@@ -241,7 +254,7 @@ func (db *DB) Executes(blockNum uint64, logIdx uint32) (types.ExecutingMessage, ...@@ -241,7 +254,7 @@ func (db *DB) Executes(blockNum uint64, logIdx uint32) (types.ExecutingMessage,
return execMsg, nil return execMsg, nil
} }
func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (types.TruncatedHash, *iterator, error) { func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (types.TruncatedHash, Iterator, error) {
entryIdx, err := db.searchCheckpoint(blockNum, logIdx) entryIdx, err := db.searchCheckpoint(blockNum, logIdx)
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
// Did not find a checkpoint to start reading from so the log cannot be present. // Did not find a checkpoint to start reading from so the log cannot be present.
...@@ -477,6 +490,58 @@ func (db *DB) Rewind(headBlockNum uint64) error { ...@@ -477,6 +490,58 @@ func (db *DB) Rewind(headBlockNum uint64) error {
return nil return nil
} }
// NextExecutingMessage returns the next executing message in the log database.
// it skips over any non-executing messages, and will return an error if encountered.
// the iterator is modified in the process.
func (db *DB) NextExecutingMessage(iter Iterator) (types.ExecutingMessage, error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
// this for-loop will break:
// - when the iterator reaches the end of the log
// - when the iterator reaches an executing message
// - if an error occurs
for {
_, _, _, err := iter.NextLog()
if err != nil {
return types.ExecutingMessage{}, err
}
// if the log is not an executing message, both exec and err are empty
exec, err := iter.ExecMessage()
if err != nil {
return types.ExecutingMessage{}, fmt.Errorf("failed to get executing message: %w", err)
}
if exec != (types.ExecutingMessage{}) {
return exec, nil
}
}
}
// LastCheckpointBehind returns an iterator for the last checkpoint behind the specified entry index.
// If the entry index is a search checkpoint, the iterator will start at that checkpoint.
// After searching back long enough (the searchCheckpointFrequency), an error is returned,
// as checkpoints are expected to be found within the frequency.
func (db *DB) LastCheckpointBehind(entryIdx entrydb.EntryIdx) (Iterator, error) {
for attempts := 0; attempts < searchCheckpointFrequency; attempts++ {
// attempt to read the index entry as a search checkpoint
_, err := db.readSearchCheckpoint(entryIdx)
if err == nil {
return db.newIterator(entryIdx)
}
// ErrDataCorruption is the return value if the entry is not a search checkpoint
// if it's not that type of error, we should return it instead of continuing
if !errors.Is(err, ErrDataCorruption) {
return nil, err
}
// don't attempt to read behind the start of the data
if entryIdx == 0 {
break
}
// reverse if we haven't found it yet
entryIdx -= 1
}
return nil, fmt.Errorf("failed to find a search checkpoint in the last %v entries", searchCheckpointFrequency)
}
func (db *DB) readSearchCheckpoint(entryIdx entrydb.EntryIdx) (searchCheckpoint, error) { func (db *DB) readSearchCheckpoint(entryIdx entrydb.EntryIdx) (searchCheckpoint, error) {
data, err := db.store.Read(entryIdx) data, err := db.store.Read(entryIdx)
if err != nil { if err != nil {
......
...@@ -548,7 +548,7 @@ func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHa ...@@ -548,7 +548,7 @@ func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHa
require.LessOrEqual(t, len(execMsg), 1, "cannot have multiple executing messages for a single log") require.LessOrEqual(t, len(execMsg), 1, "cannot have multiple executing messages for a single log")
m, ok := db.m.(*stubMetrics) m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type") require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash)) result, _, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum) require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Truef(t, result, "Did not find log %v in block %v with hash %v", logIdx, blockNum, logHash) require.Truef(t, result, "Did not find log %v in block %v with hash %v", logIdx, blockNum, logHash)
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints") require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints")
...@@ -564,7 +564,7 @@ func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHa ...@@ -564,7 +564,7 @@ func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHa
func requireNotContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash) { func requireNotContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash) {
m, ok := db.m.(*stubMetrics) m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type") require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash)) result, _, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum) require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash) require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash)
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints") require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints")
...@@ -584,10 +584,10 @@ func requireExecutingMessage(t *testing.T, db *DB, blockNum uint64, logIdx uint3 ...@@ -584,10 +584,10 @@ func requireExecutingMessage(t *testing.T, db *DB, blockNum uint64, logIdx uint3
require.NotZero(t, m.entriesReadForSearch, "Must read at least some entries to find the log") require.NotZero(t, m.entriesReadForSearch, "Must read at least some entries to find the log")
} }
func requireWrongHash(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash, execMsg types.ExecutingMessage) { func requireWrongHash(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash, _ types.ExecutingMessage) {
m, ok := db.m.(*stubMetrics) m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type") require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash)) result, _, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum) require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash) require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash)
......
...@@ -9,6 +9,12 @@ import ( ...@@ -9,6 +9,12 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
) )
type Iterator interface {
NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error)
Index() entrydb.EntryIdx
ExecMessage() (types.ExecutingMessage, error)
}
type iterator struct { type iterator struct {
db *DB db *DB
nextEntryIdx entrydb.EntryIdx nextEntryIdx entrydb.EntryIdx
...@@ -63,6 +69,10 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.Trun ...@@ -63,6 +69,10 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.Trun
return return
} }
func (i *iterator) Index() entrydb.EntryIdx {
return i.nextEntryIdx - 1
}
func (i *iterator) ExecMessage() (types.ExecutingMessage, error) { func (i *iterator) ExecMessage() (types.ExecutingMessage, error) {
if !i.hasExecMsg { if !i.hasExecMsg {
return types.ExecutingMessage{}, nil return types.ExecutingMessage{}, nil
......
package db
import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
const (
Unsafe = "unsafe"
Safe = "safe"
Finalized = "finalized"
)
// SafetyChecker is an interface for checking the safety of a log entry
// and updating the local head for a chain.
type SafetyChecker interface {
LocalHeadForChain(chainID types.ChainID) entrydb.EntryIdx
CrossHeadForChain(chainID types.ChainID) entrydb.EntryIdx
Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) bool
Update(chain types.ChainID, index entrydb.EntryIdx) heads.OperationFn
}
// unsafeChecker is a SafetyChecker that uses the unsafe head as the view into the database
type unsafeChecker struct {
chainsDB ChainsDB
}
// safeChecker is a SafetyChecker that uses the safe head as the view into the database
type safeChecker struct {
chainsDB ChainsDB
}
// finalizedChecker is a SafetyChecker that uses the finalized head as the view into the database
type finalizedChecker struct {
chainsDB ChainsDB
}
// NewSafetyChecker creates a new SafetyChecker of the given type
func NewSafetyChecker(t string, chainsDB ChainsDB) SafetyChecker {
switch t {
case Unsafe:
return &unsafeChecker{
chainsDB: chainsDB,
}
case Safe:
return &safeChecker{
chainsDB: chainsDB,
}
case Finalized:
return &finalizedChecker{
chainsDB: chainsDB,
}
default:
panic("unknown safety checker type")
}
}
// LocalHeadForChain returns the local head for the given chain
// based on the type of SafetyChecker
func (c *unsafeChecker) LocalHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
heads := c.chainsDB.heads.Current().Get(chainID)
return heads.Unsafe
}
func (c *safeChecker) LocalHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
heads := c.chainsDB.heads.Current().Get(chainID)
return heads.LocalSafe
}
func (c *finalizedChecker) LocalHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
heads := c.chainsDB.heads.Current().Get(chainID)
return heads.LocalFinalized
}
// CrossHeadForChain returns the x-head for the given chain
// based on the type of SafetyChecker
func (c *unsafeChecker) CrossHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
heads := c.chainsDB.heads.Current().Get(chainID)
return heads.CrossUnsafe
}
func (c *safeChecker) CrossHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
heads := c.chainsDB.heads.Current().Get(chainID)
return heads.CrossSafe
}
func (c *finalizedChecker) CrossHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
heads := c.chainsDB.heads.Current().Get(chainID)
return heads.CrossFinalized
}
// check checks if the log entry is safe, provided a local head for the chain
// it is used by the individual SafetyCheckers to determine if a log entry is safe
func check(
chainsDB ChainsDB,
localHead entrydb.EntryIdx,
chain types.ChainID,
blockNum uint64,
logIdx uint32,
logHash backendTypes.TruncatedHash) bool {
// for the Check to be valid, the log must:
// exist at the blockNum and logIdx
// have a hash that matches the provided hash (implicit in the Contains call), and
// be less than or equal to the local head for the chain
exists, index, err := chainsDB.logDBs[chain].Contains(blockNum, logIdx, logHash)
if err != nil {
return false
}
return exists && index <= localHead
}
// Check checks if the log entry is safe, provided a local head for the chain
// it passes on the local head this checker is concerned with, along with its view of the database
func (c *unsafeChecker) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) bool {
return check(c.chainsDB, c.LocalHeadForChain(chain), chain, blockNum, logIdx, logHash)
}
func (c *safeChecker) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) bool {
return check(c.chainsDB, c.LocalHeadForChain(chain), chain, blockNum, logIdx, logHash)
}
func (c *finalizedChecker) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) bool {
return check(c.chainsDB, c.LocalHeadForChain(chain), chain, blockNum, logIdx, logHash)
}
// Update creates an Operation that updates the x-head for the chain, given an index to set it to
func (c *unsafeChecker) Update(chain types.ChainID, index entrydb.EntryIdx) heads.OperationFn {
return func(heads *heads.Heads) error {
chainHeads := heads.Get(chain)
chainHeads.CrossUnsafe = index
heads.Put(chain, chainHeads)
return nil
}
}
func (c *safeChecker) Update(chain types.ChainID, index entrydb.EntryIdx) heads.OperationFn {
return func(heads *heads.Heads) error {
chainHeads := heads.Get(chain)
chainHeads.CrossSafe = index
heads.Put(chain, chainHeads)
return nil
}
}
func (c *finalizedChecker) Update(chain types.ChainID, index entrydb.EntryIdx) heads.OperationFn {
return func(heads *heads.Heads) error {
chainHeads := heads.Get(chain)
chainHeads.CrossFinalized = index
heads.Put(chain, chainHeads)
return nil
}
}
package db
import (
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require"
)
// TestHeadsForChain tests the heads for a chain,
// confirming the Unsafe, Safe and Finalized all return the correct head for the chain.
// and confirming that the chainID matters when finding the value
func TestHeadsForChain(t *testing.T) {
h := heads.NewHeads()
chainHeads := heads.ChainHeads{
Unsafe: entrydb.EntryIdx(1),
CrossUnsafe: entrydb.EntryIdx(2),
LocalSafe: entrydb.EntryIdx(3),
CrossSafe: entrydb.EntryIdx(4),
LocalFinalized: entrydb.EntryIdx(5),
CrossFinalized: entrydb.EntryIdx(6),
}
h.Put(types.ChainIDFromUInt64(1), chainHeads)
chainsDB := NewChainsDB(nil, &stubHeadStorage{h})
tcases := []struct {
name string
chainID types.ChainID
checkerType string
expectedLocal entrydb.EntryIdx
expectedCross entrydb.EntryIdx
}{
{
"Unsafe Head",
types.ChainIDFromUInt64(1),
Unsafe,
entrydb.EntryIdx(1),
entrydb.EntryIdx(2),
},
{
"Safe Head",
types.ChainIDFromUInt64(1),
Safe,
entrydb.EntryIdx(3),
entrydb.EntryIdx(4),
},
{
"Finalized Head",
types.ChainIDFromUInt64(1),
Finalized,
entrydb.EntryIdx(5),
entrydb.EntryIdx(6),
},
{
"Incorrect Chain",
types.ChainIDFromUInt64(100),
Safe,
entrydb.EntryIdx(0),
entrydb.EntryIdx(0),
},
}
for _, c := range tcases {
t.Run(c.name, func(t *testing.T) {
checker := NewSafetyChecker(c.checkerType, *chainsDB)
localHead := checker.LocalHeadForChain(c.chainID)
crossHead := checker.CrossHeadForChain(c.chainID)
require.Equal(t, c.expectedLocal, localHead)
require.Equal(t, c.expectedCross, crossHead)
})
}
}
func TestCheck(t *testing.T) {
h := heads.NewHeads()
chainHeads := heads.ChainHeads{
Unsafe: entrydb.EntryIdx(6),
CrossUnsafe: entrydb.EntryIdx(5),
LocalSafe: entrydb.EntryIdx(4),
CrossSafe: entrydb.EntryIdx(3),
LocalFinalized: entrydb.EntryIdx(2),
CrossFinalized: entrydb.EntryIdx(1),
}
h.Put(types.ChainIDFromUInt64(1), chainHeads)
// the logStore contains just a single stubbed log DB
logDB := &stubLogDB{}
logsStore := map[types.ChainID]LogStorage{
types.ChainIDFromUInt64(1): logDB,
}
chainsDB := NewChainsDB(logsStore, &stubHeadStorage{h})
tcases := []struct {
name string
checkerType string
chainID types.ChainID
blockNum uint64
logIdx uint32
loghash backendTypes.TruncatedHash
containsResponse containsResponse
expected bool
}{
{
// confirm that checking Unsafe uses the unsafe head,
// and that we can find logs even *at* the unsafe head index
"Unsafe Log at Head",
Unsafe,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(6), nil},
true,
},
{
// confirm that checking the Safe head works
"Safe Log",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(3), nil},
true,
},
{
// confirm that checking the Finalized head works
"Finalized Log",
Finalized,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(1), nil},
true,
},
{
// confirm that when exists is false, we return false
"Does not Exist",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{false, entrydb.EntryIdx(1), nil},
false,
},
{
// confirm that when a head is out of range, we return false
"Unsafe Out of Range",
Unsafe,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(100), nil},
false,
},
{
// confirm that when a head is out of range, we return false
"Safe Out of Range",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(5), nil},
false,
},
{
// confirm that when a head is out of range, we return false
"Finalized Out of Range",
Finalized,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(3), nil},
false,
},
{
// confirm that when Contains returns an error, we return false
"Error",
Safe,
types.ChainIDFromUInt64(1),
1,
1,
backendTypes.TruncatedHash{1, 2, 3},
containsResponse{false, entrydb.EntryIdx(0), fmt.Errorf("error")},
false,
},
}
for _, c := range tcases {
t.Run(c.name, func(t *testing.T) {
// rig the logStore to return the expected response
logDB.containsResponse = c.containsResponse
checker := NewSafetyChecker(c.checkerType, *chainsDB)
r := checker.Check(c.chainID, c.blockNum, c.logIdx, c.loghash)
// confirm that the expected outcome is correct
require.Equal(t, c.expected, r)
})
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment