Commit 0543bf7a authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor: store block-checkpoints per block, implement block iteration (#11989)

* op-supervisor: register each L2 block, refactor
Signed-off-by: default avatarprotolambda <proto@protolambda.com>

* op-supervisor: fix lint and tests

* op-supervisor: minor fixes, logging

* op-supervisor: fix semgrep

---------
Signed-off-by: default avatarprotolambda <proto@protolambda.com>
parent 53d95228
...@@ -416,7 +416,8 @@ func (s *interopE2ESystem) newL2(id string, l2Out *interopgen.L2Output) l2Set { ...@@ -416,7 +416,8 @@ func (s *interopE2ESystem) newL2(id string, l2Out *interopgen.L2Output) l2Set {
// prepareSupervisor creates a new supervisor for the system // prepareSupervisor creates a new supervisor for the system
func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService { func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
logger := s.logger.New("role", "supervisor") // Be verbose with op-supervisor, it's in early test phase
logger := testlog.Logger(s.t, log.LevelDebug).New("role", "supervisor")
cfg := supervisorConfig.Config{ cfg := supervisorConfig.Config{
MetricsConfig: metrics.CLIConfig{ MetricsConfig: metrics.CLIConfig{
Enabled: false, Enabled: false,
......
...@@ -9,8 +9,13 @@ import ( ...@@ -9,8 +9,13 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
...@@ -19,9 +24,6 @@ import ( ...@@ -19,9 +24,6 @@ import (
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
) )
type SupervisorBackend struct { type SupervisorBackend struct {
...@@ -94,7 +96,7 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, ...@@ -94,7 +96,7 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger,
if err != nil { if err != nil {
return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err) return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
} }
logDB, err := logs.NewFromFile(logger, cm, path) logDB, err := logs.NewFromFile(logger, cm, path, true)
if err != nil { if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err) return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
} }
...@@ -133,8 +135,9 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { ...@@ -133,8 +135,9 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) { if !su.started.CompareAndSwap(false, true) {
return errors.New("already started") return errors.New("already started")
} }
// initiate "Resume" on the chains db, which rewinds the database to the last block that is guaranteed to have been fully recorded // initiate "ResumeFromLastSealedBlock" on the chains db,
if err := su.db.Resume(); err != nil { // which rewinds the database to the last block that is guaranteed to have been fully recorded
if err := su.db.ResumeFromLastSealedBlock(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err) return fmt.Errorf("failed to resume chains db: %w", err)
} }
// start chain monitors // start chain monitors
...@@ -144,8 +147,8 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { ...@@ -144,8 +147,8 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
} }
} }
// start db maintenance loop // start db maintenance loop
maintinenceCtx, cancel := context.WithCancel(context.Background()) maintenanceCtx, cancel := context.WithCancel(context.Background())
su.db.StartCrossHeadMaintenance(maintinenceCtx) su.db.StartCrossHeadMaintenance(maintenanceCtx)
su.maintenanceCancel = cancel su.maintenanceCancel = cancel
return nil return nil
} }
...@@ -188,13 +191,16 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa ...@@ -188,13 +191,16 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
chainID := identifier.ChainID chainID := identifier.ChainID
blockNum := identifier.BlockNumber blockNum := identifier.BlockNumber
logIdx := identifier.LogIndex logIdx := identifier.LogIndex
ok, i, err := su.db.Check(chainID, blockNum, uint32(logIdx), backendTypes.TruncateHash(payloadHash)) i, err := su.db.Check(chainID, blockNum, uint32(logIdx), backendTypes.TruncateHash(payloadHash))
if err != nil { if errors.Is(err, logs.ErrFuture) {
return types.Invalid, fmt.Errorf("failed to check log: %w", err) return types.Unsafe, nil
} }
if !ok { if errors.Is(err, logs.ErrConflict) {
return types.Invalid, nil return types.Invalid, nil
} }
if err != nil {
return types.Invalid, fmt.Errorf("failed to check log: %w", err)
}
safest := types.CrossUnsafe safest := types.CrossUnsafe
// at this point we have the log entry, and we can check if it is safe by various criteria // at this point we have the log entry, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{ for _, checker := range []db.SafetyChecker{
...@@ -231,16 +237,19 @@ func (su *SupervisorBackend) CheckMessages( ...@@ -231,16 +237,19 @@ func (su *SupervisorBackend) CheckMessages(
// The block is considered safe if all logs in the block are safe // The block is considered safe if all logs in the block are safe
// this is decided by finding the last log in the block and // this is decided by finding the last log in the block and
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) { func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
// TODO(#11612): this function ignores blockHash and assumes that the block in the db is the one we are looking for
// In order to check block hash, the database must *always* insert a block hash checkpoint, which is not currently done
safest := types.CrossUnsafe safest := types.CrossUnsafe
// find the last log index in the block // find the last log index in the block
i, err := su.db.LastLogInBlock(types.ChainID(*chainID), uint64(blockNumber)) id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)}
// TODO(#11836) checking for EOF as a non-error case is a bit of a code smell i, err := su.db.FindSealedBlock(types.ChainID(*chainID), id)
// and could potentially be incorrect if the given block number is intentionally far in the future if errors.Is(err, logs.ErrFuture) {
if err != nil && !errors.Is(err, io.EOF) { return types.Unsafe, nil
}
if errors.Is(err, logs.ErrConflict) {
return types.Invalid, nil
}
if err != nil {
su.logger.Error("failed to scan block", "err", err) su.logger.Error("failed to scan block", "err", err)
return types.Invalid, fmt.Errorf("failed to scan block: %w", err) return "", err
} }
// at this point we have the extent of the block, and we can check if it is safe by various criteria // at this point we have the extent of the block, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{ for _, checker := range []db.SafetyChecker{
......
This diff is collapsed.
...@@ -17,6 +17,65 @@ type EntryIdx int64 ...@@ -17,6 +17,65 @@ type EntryIdx int64
type Entry [EntrySize]byte type Entry [EntrySize]byte
func (entry Entry) Type() EntryType {
return EntryType(entry[0])
}
type EntryTypeFlag uint8
const (
FlagSearchCheckpoint EntryTypeFlag = 1 << TypeSearchCheckpoint
FlagCanonicalHash EntryTypeFlag = 1 << TypeCanonicalHash
FlagInitiatingEvent EntryTypeFlag = 1 << TypeInitiatingEvent
FlagExecutingLink EntryTypeFlag = 1 << TypeExecutingLink
FlagExecutingCheck EntryTypeFlag = 1 << TypeExecutingCheck
FlagPadding EntryTypeFlag = 1 << TypePadding
// for additional padding
FlagPadding2 EntryTypeFlag = FlagPadding << 1
)
func (ex EntryTypeFlag) Any(v EntryTypeFlag) bool {
return ex&v != 0
}
func (ex *EntryTypeFlag) Add(v EntryTypeFlag) {
*ex = *ex | v
}
func (ex *EntryTypeFlag) Remove(v EntryTypeFlag) {
*ex = *ex &^ v
}
type EntryType uint8
const (
TypeSearchCheckpoint EntryType = iota
TypeCanonicalHash
TypeInitiatingEvent
TypeExecutingLink
TypeExecutingCheck
TypePadding
)
func (d EntryType) String() string {
switch d {
case TypeSearchCheckpoint:
return "searchCheckpoint"
case TypeCanonicalHash:
return "canonicalHash"
case TypeInitiatingEvent:
return "initiatingEvent"
case TypeExecutingLink:
return "executingLink"
case TypeExecutingCheck:
return "executingCheck"
case TypePadding:
return "padding"
default:
return fmt.Sprintf("unknown-%d", uint8(d))
}
}
// dataAccess defines a minimal API required to manipulate the actual stored data. // dataAccess defines a minimal API required to manipulate the actual stored data.
// It is a subset of the os.File API but could (theoretically) be satisfied by an in-memory implementation for testing. // It is a subset of the os.File API but could (theoretically) be satisfied by an in-memory implementation for testing.
type dataAccess interface { type dataAccess interface {
......
package db
import (
"errors"
"fmt"
"io"
"math"
)
// Resume prepares the given LogStore to resume recording events.
// It returns the block number of the last block that is guaranteed to have been fully recorded to the database
// and rewinds the database to ensure it can resume recording from the first log of the next block.
func Resume(logDB LogStorage) error {
// Get the last checkpoint that was written then Rewind the db
// to the block prior to that block and start from there.
// Guarantees we will always roll back at least one block
// so we know we're always starting from a fully written block.
checkPointBlock, _, err := logDB.ClosestBlockInfo(math.MaxUint64)
if errors.Is(err, io.EOF) {
// No blocks recorded in the database, start from genesis
return nil
} else if err != nil {
return fmt.Errorf("failed to get block from checkpoint: %w", err)
}
if checkPointBlock == 0 {
return nil
}
block := checkPointBlock - 1
err = logDB.Rewind(block)
if err != nil {
return fmt.Errorf("failed to rewind the database: %w", err)
}
return nil
}
package db
import (
"fmt"
"io"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/stretchr/testify/require"
)
func TestRecover(t *testing.T) {
tests := []struct {
name string
stubDB *stubLogStore
expectRewoundTo uint64
}{
{
name: "emptydb",
stubDB: &stubLogStore{closestBlockErr: fmt.Errorf("no entries: %w", io.EOF)},
expectRewoundTo: 0,
},
{
name: "genesis",
stubDB: &stubLogStore{},
expectRewoundTo: 0,
},
{
name: "with_blocks",
stubDB: &stubLogStore{closestBlockNumber: 15},
expectRewoundTo: 14,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
err := Resume(test.stubDB)
require.NoError(t, err)
require.Equal(t, test.expectRewoundTo, test.stubDB.rewoundTo)
})
}
}
type stubLogStore struct {
closestBlockNumber uint64
closestBlockErr error
rewoundTo uint64
}
func (s *stubLogStore) Contains(blockNum uint64, logIdx uint32, loghash types.TruncatedHash) (bool, entrydb.EntryIdx, error) {
panic("not supported")
}
func (s *stubLogStore) ClosestBlockIterator(blockNum uint64) (logs.Iterator, error) {
panic("not supported")
}
func (s *stubLogStore) LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error) {
panic("not supported")
}
func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) {
if s.closestBlockErr != nil {
return 0, types.TruncatedHash{}, s.closestBlockErr
}
return s.closestBlockNumber, types.TruncatedHash{}, nil
}
func (s *stubLogStore) NextExecutingMessage(logs.Iterator) (types.ExecutingMessage, error) {
panic("not supported")
}
func (s *stubLogStore) Rewind(headBlockNum uint64) error {
s.rewoundTo = headBlockNum
return nil
}
func (s *stubLogStore) AddLog(logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error {
panic("not supported")
}
func (s *stubLogStore) LatestBlockNum() uint64 {
panic("not supported")
}
func (s *stubLogStore) Close() error {
return nil
}
...@@ -38,16 +38,13 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) { ...@@ -38,16 +38,13 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
} }
entryInvariants := []entryInvariant{ entryInvariants := []entryInvariant{
invariantSearchCheckpointOnlyAtFrequency,
invariantSearchCheckpointAtEverySearchCheckpointFrequency, invariantSearchCheckpointAtEverySearchCheckpointFrequency,
invariantCanonicalHashAfterEverySearchCheckpoint, invariantCanonicalHashOrCheckpointAfterEverySearchCheckpoint,
invariantSearchCheckpointBeforeEveryCanonicalHash, invariantSearchCheckpointBeforeEveryCanonicalHash,
invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash,
invariantExecLinkAfterInitEventWithFlagSet, invariantExecLinkAfterInitEventWithFlagSet,
invariantExecLinkOnlyAfterInitiatingEventWithFlagSet, invariantExecLinkOnlyAfterInitiatingEventWithFlagSet,
invariantExecCheckAfterExecLink, invariantExecCheckAfterExecLink,
invariantExecCheckOnlyAfterExecLink, invariantExecCheckOnlyAfterExecLink,
invariantValidLastEntry,
} }
for i, entry := range entries { for i, entry := range entries {
for _, invariant := range entryInvariants { for _, invariant := range entryInvariants {
...@@ -83,81 +80,47 @@ func invariantFileSizeMatchesEntryCountMetric(stat os.FileInfo, m *stubMetrics) ...@@ -83,81 +80,47 @@ func invariantFileSizeMatchesEntryCountMetric(stat os.FileInfo, m *stubMetrics)
return nil return nil
} }
func invariantSearchCheckpointOnlyAtFrequency(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeSearchCheckpoint {
return nil
}
if entryIdx%searchCheckpointFrequency != 0 {
return fmt.Errorf("should only have search checkpoints every %v entries but found at entry %v", searchCheckpointFrequency, entryIdx)
}
return nil
}
func invariantSearchCheckpointAtEverySearchCheckpointFrequency(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantSearchCheckpointAtEverySearchCheckpointFrequency(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entryIdx%searchCheckpointFrequency == 0 && entry[0] != typeSearchCheckpoint { if entryIdx%searchCheckpointFrequency == 0 && entry.Type() != entrydb.TypeSearchCheckpoint {
return fmt.Errorf("should have search checkpoints every %v entries but entry %v was %x", searchCheckpointFrequency, entryIdx, entry) return fmt.Errorf("should have search checkpoints every %v entries but entry %v was %x", searchCheckpointFrequency, entryIdx, entry)
} }
return nil return nil
} }
func invariantCanonicalHashAfterEverySearchCheckpoint(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantCanonicalHashOrCheckpointAfterEverySearchCheckpoint(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeSearchCheckpoint { if entry.Type() != entrydb.TypeSearchCheckpoint {
return nil return nil
} }
if entryIdx+1 >= len(entries) { if entryIdx+1 >= len(entries) {
return fmt.Errorf("expected canonical hash after search checkpoint at entry %v but no further entries found", entryIdx) return fmt.Errorf("expected canonical hash or checkpoint after search checkpoint at entry %v but no further entries found", entryIdx)
} }
nextEntry := entries[entryIdx+1] nextEntry := entries[entryIdx+1]
if nextEntry[0] != typeCanonicalHash { if nextEntry.Type() != entrydb.TypeCanonicalHash && nextEntry.Type() != entrydb.TypeSearchCheckpoint {
return fmt.Errorf("expected canonical hash after search checkpoint at entry %v but got %x", entryIdx, nextEntry) return fmt.Errorf("expected canonical hash or checkpoint after search checkpoint at entry %v but got %x", entryIdx, nextEntry)
} }
return nil return nil
} }
// invariantSearchCheckpointBeforeEveryCanonicalHash ensures we don't have extra canonical-hash entries // invariantSearchCheckpointBeforeEveryCanonicalHash ensures we don't have extra canonical-hash entries
func invariantSearchCheckpointBeforeEveryCanonicalHash(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantSearchCheckpointBeforeEveryCanonicalHash(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeCanonicalHash { if entry.Type() != entrydb.TypeCanonicalHash {
return nil return nil
} }
if entryIdx == 0 { if entryIdx == 0 {
return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but no previous entries present", entryIdx) return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but no previous entries present", entryIdx)
} }
prevEntry := entries[entryIdx-1] prevEntry := entries[entryIdx-1]
if prevEntry[0] != typeSearchCheckpoint { if prevEntry.Type() != entrydb.TypeSearchCheckpoint {
return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but got %x", entryIdx, prevEntry) return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but got %x", entryIdx, prevEntry)
} }
return nil return nil
} }
func invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeInitiatingEvent {
return nil
}
if entryIdx == 0 {
return fmt.Errorf("found initiating event at index %v before any search checkpoint", entryIdx)
}
blockDiff := entry[1]
flags := entry[2]
incrementsLogIdx := flags&eventFlagIncrementLogIdx != 0
prevEntry := entries[entryIdx-1]
prevEntryIsCanonicalHash := prevEntry[0] == typeCanonicalHash
if incrementsLogIdx && prevEntryIsCanonicalHash {
return fmt.Errorf("initiating event at index %v increments logIdx despite being immediately after canonical hash (prev entry %x)", entryIdx, prevEntry)
}
if incrementsLogIdx && blockDiff > 0 {
return fmt.Errorf("initiating event at index %v increments logIdx despite starting a new block", entryIdx)
}
if !incrementsLogIdx && !prevEntryIsCanonicalHash && blockDiff == 0 {
return fmt.Errorf("initiating event at index %v does not increment logIdx when block unchanged and not after canonical hash (prev entry %x)", entryIdx, prevEntry)
}
return nil
}
func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeInitiatingEvent { if entry.Type() != entrydb.TypeInitiatingEvent {
return nil return nil
} }
hasExecMessage := entry[2]&eventFlagHasExecutingMessage != 0 hasExecMessage := entry[1]&eventFlagHasExecutingMessage != 0
if !hasExecMessage { if !hasExecMessage {
return nil return nil
} }
...@@ -168,14 +131,14 @@ func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entr ...@@ -168,14 +131,14 @@ func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entr
if len(entries) <= linkIdx { if len(entries) <= linkIdx {
return fmt.Errorf("expected executing link after initiating event with exec msg flag set at entry %v but there were no more events", entryIdx) return fmt.Errorf("expected executing link after initiating event with exec msg flag set at entry %v but there were no more events", entryIdx)
} }
if entries[linkIdx][0] != typeExecutingLink { if entries[linkIdx].Type() != entrydb.TypeExecutingLink {
return fmt.Errorf("expected executing link at idx %v after initiating event with exec msg flag set at entry %v but got type %v", linkIdx, entryIdx, entries[linkIdx][0]) return fmt.Errorf("expected executing link at idx %v after initiating event with exec msg flag set at entry %v but got type %v", linkIdx, entryIdx, entries[linkIdx][0])
} }
return nil return nil
} }
func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeExecutingLink { if entry.Type() != entrydb.TypeExecutingLink {
return nil return nil
} }
if entryIdx == 0 { if entryIdx == 0 {
...@@ -189,10 +152,10 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en ...@@ -189,10 +152,10 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en
return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx) return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx)
} }
initEntry := entries[initIdx] initEntry := entries[initIdx]
if initEntry[0] != typeInitiatingEvent { if initEntry.Type() != entrydb.TypeInitiatingEvent {
return fmt.Errorf("expected initiating event at entry %v prior to executing link at %v but got %x", initIdx, entryIdx, initEntry[0]) return fmt.Errorf("expected initiating event at entry %v prior to executing link at %v but got %x", initIdx, entryIdx, initEntry[0])
} }
flags := initEntry[2] flags := initEntry[1]
if flags&eventFlagHasExecutingMessage == 0 { if flags&eventFlagHasExecutingMessage == 0 {
return fmt.Errorf("initiating event at %v prior to executing link at %v does not have flag set to indicate needing a executing event: %x", initIdx, entryIdx, initEntry) return fmt.Errorf("initiating event at %v prior to executing link at %v does not have flag set to indicate needing a executing event: %x", initIdx, entryIdx, initEntry)
} }
...@@ -200,7 +163,7 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en ...@@ -200,7 +163,7 @@ func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry en
} }
func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeExecutingLink { if entry.Type() != entrydb.TypeExecutingLink {
return nil return nil
} }
checkIdx := entryIdx + 1 checkIdx := entryIdx + 1
...@@ -211,14 +174,14 @@ func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries ...@@ -211,14 +174,14 @@ func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries
return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but ran out of entries", entryIdx, checkIdx) return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but ran out of entries", entryIdx, checkIdx)
} }
checkEntry := entries[checkIdx] checkEntry := entries[checkIdx]
if checkEntry[0] != typeExecutingCheck { if checkEntry.Type() != entrydb.TypeExecutingCheck {
return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but got type %v", entryIdx, checkIdx, checkEntry[0]) return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but got type %v", entryIdx, checkIdx, checkEntry[0])
} }
return nil return nil
} }
func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error { func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeExecutingCheck { if entry.Type() != entrydb.TypeExecutingCheck {
return nil return nil
} }
if entryIdx == 0 { if entryIdx == 0 {
...@@ -232,29 +195,8 @@ func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entr ...@@ -232,29 +195,8 @@ func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entr
return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx) return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx)
} }
linkEntry := entries[linkIdx] linkEntry := entries[linkIdx]
if linkEntry[0] != typeExecutingLink { if linkEntry.Type() != entrydb.TypeExecutingLink {
return fmt.Errorf("expected executing link at entry %v prior to executing check at %v but got %x", linkIdx, entryIdx, linkEntry[0]) return fmt.Errorf("expected executing link at entry %v prior to executing check at %v but got %x", linkIdx, entryIdx, linkEntry[0])
} }
return nil return nil
} }
// invariantValidLastEntry checks that the last entry is either a executing check or initiating event with no exec message
func invariantValidLastEntry(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entryIdx+1 < len(entries) {
return nil
}
if entry[0] == typeExecutingCheck {
return nil
}
if entry[0] != typeInitiatingEvent {
return fmt.Errorf("invalid final event type: %v", entry[0])
}
evt, err := newInitiatingEventFromEntry(entry)
if err != nil {
return fmt.Errorf("final event was invalid: %w", err)
}
if evt.hasExecMsg {
return errors.New("ends with init event that should have exec msg but no exec msg follows")
}
return nil
}
...@@ -3,44 +3,46 @@ package logs ...@@ -3,44 +3,46 @@ package logs
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"math"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
) )
// searchCheckpoint is both a checkpoint for searching, as well as a checkpoint for sealing blocks.
type searchCheckpoint struct { type searchCheckpoint struct {
blockNum uint64 blockNum uint64
logIdx uint32 // seen logs *after* the seal of the mentioned block, i.e. not part of this block, but building on top of it.
// There is at least one checkpoint per L2 block with logsSince == 0, i.e. the exact block boundary.
logsSince uint32
timestamp uint64 timestamp uint64
} }
func newSearchCheckpoint(blockNum uint64, logIdx uint32, timestamp uint64) searchCheckpoint { func newSearchCheckpoint(blockNum uint64, logsSince uint32, timestamp uint64) searchCheckpoint {
return searchCheckpoint{ return searchCheckpoint{
blockNum: blockNum, blockNum: blockNum,
logIdx: logIdx, logsSince: logsSince,
timestamp: timestamp, timestamp: timestamp,
} }
} }
func newSearchCheckpointFromEntry(data entrydb.Entry) (searchCheckpoint, error) { func newSearchCheckpointFromEntry(data entrydb.Entry) (searchCheckpoint, error) {
if data[0] != typeSearchCheckpoint { if data.Type() != entrydb.TypeSearchCheckpoint {
return searchCheckpoint{}, fmt.Errorf("%w: attempting to decode search checkpoint but was type %v", ErrDataCorruption, data[0]) return searchCheckpoint{}, fmt.Errorf("%w: attempting to decode search checkpoint but was type %s", ErrDataCorruption, data.Type())
} }
return searchCheckpoint{ return searchCheckpoint{
blockNum: binary.LittleEndian.Uint64(data[1:9]), blockNum: binary.LittleEndian.Uint64(data[1:9]),
logIdx: binary.LittleEndian.Uint32(data[9:13]), logsSince: binary.LittleEndian.Uint32(data[9:13]),
timestamp: binary.LittleEndian.Uint64(data[13:21]), timestamp: binary.LittleEndian.Uint64(data[13:21]),
}, nil }, nil
} }
// encode creates a search checkpoint entry // encode creates a checkpoint entry
// type 0: "search checkpoint" <type><uint64 block number: 8 bytes><uint32 event index offset: 4 bytes><uint64 timestamp: 8 bytes> = 20 bytes // type 0: "search checkpoint" <type><uint64 block number: 8 bytes><uint32 logsSince count: 4 bytes><uint64 timestamp: 8 bytes> = 21 bytes
func (s searchCheckpoint) encode() entrydb.Entry { func (s searchCheckpoint) encode() entrydb.Entry {
var data entrydb.Entry var data entrydb.Entry
data[0] = typeSearchCheckpoint data[0] = uint8(entrydb.TypeSearchCheckpoint)
binary.LittleEndian.PutUint64(data[1:9], s.blockNum) binary.LittleEndian.PutUint64(data[1:9], s.blockNum)
binary.LittleEndian.PutUint32(data[9:13], s.logIdx) binary.LittleEndian.PutUint32(data[9:13], s.logsSince)
binary.LittleEndian.PutUint64(data[13:21], s.timestamp) binary.LittleEndian.PutUint64(data[13:21], s.timestamp)
return data return data
} }
...@@ -54,8 +56,8 @@ func newCanonicalHash(hash types.TruncatedHash) canonicalHash { ...@@ -54,8 +56,8 @@ func newCanonicalHash(hash types.TruncatedHash) canonicalHash {
} }
func newCanonicalHashFromEntry(data entrydb.Entry) (canonicalHash, error) { func newCanonicalHashFromEntry(data entrydb.Entry) (canonicalHash, error) {
if data[0] != typeCanonicalHash { if data.Type() != entrydb.TypeCanonicalHash {
return canonicalHash{}, fmt.Errorf("%w: attempting to decode canonical hash but was type %v", ErrDataCorruption, data[0]) return canonicalHash{}, fmt.Errorf("%w: attempting to decode canonical hash but was type %s", ErrDataCorruption, data.Type())
} }
var truncated types.TruncatedHash var truncated types.TruncatedHash
copy(truncated[:], data[1:21]) copy(truncated[:], data[1:21])
...@@ -64,100 +66,48 @@ func newCanonicalHashFromEntry(data entrydb.Entry) (canonicalHash, error) { ...@@ -64,100 +66,48 @@ func newCanonicalHashFromEntry(data entrydb.Entry) (canonicalHash, error) {
func (c canonicalHash) encode() entrydb.Entry { func (c canonicalHash) encode() entrydb.Entry {
var entry entrydb.Entry var entry entrydb.Entry
entry[0] = typeCanonicalHash entry[0] = uint8(entrydb.TypeCanonicalHash)
copy(entry[1:21], c.hash[:]) copy(entry[1:21], c.hash[:])
return entry return entry
} }
type initiatingEvent struct { type initiatingEvent struct {
blockDiff uint8 hasExecMsg bool
incrementLogIdx bool logHash types.TruncatedHash
hasExecMsg bool
logHash types.TruncatedHash
} }
func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) { func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) {
if data[0] != typeInitiatingEvent { if data.Type() != entrydb.TypeInitiatingEvent {
return initiatingEvent{}, fmt.Errorf("%w: attempting to decode initiating event but was type %v", ErrDataCorruption, data[0]) return initiatingEvent{}, fmt.Errorf("%w: attempting to decode initiating event but was type %s", ErrDataCorruption, data.Type())
} }
blockNumDiff := data[1] flags := data[1]
flags := data[2]
return initiatingEvent{ return initiatingEvent{
blockDiff: blockNumDiff, hasExecMsg: flags&eventFlagHasExecutingMessage != 0,
incrementLogIdx: flags&eventFlagIncrementLogIdx != 0, logHash: types.TruncatedHash(data[2:22]),
hasExecMsg: flags&eventFlagHasExecutingMessage != 0,
logHash: types.TruncatedHash(data[3:23]),
}, nil }, nil
} }
func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash types.TruncatedHash, hasExecMsg bool) (initiatingEvent, error) { func newInitiatingEvent(logHash types.TruncatedHash, hasExecMsg bool) initiatingEvent {
blockDiff := blockNum - pre.blockNum
if blockDiff > math.MaxUint8 {
// TODO(optimism#11091): Need to find a way to support this.
return initiatingEvent{}, fmt.Errorf("too many block skipped between %v and %v", pre.blockNum, blockNum)
}
currLogIdx := pre.logIdx
if blockDiff > 0 {
currLogIdx = 0
}
logDiff := logIdx - currLogIdx
if logDiff > 1 {
return initiatingEvent{}, fmt.Errorf("skipped logs between %v and %v", currLogIdx, logIdx)
}
return initiatingEvent{ return initiatingEvent{
blockDiff: uint8(blockDiff), hasExecMsg: hasExecMsg,
incrementLogIdx: logDiff > 0, logHash: logHash,
hasExecMsg: hasExecMsg, }
logHash: logHash,
}, nil
} }
// encode creates an initiating event entry // encode creates an initiating event entry
// type 2: "initiating event" <type><blocknum diff: 1 byte><event flags: 1 byte><event-hash: 20 bytes> = 23 bytes // type 2: "initiating event" <type><flags><event-hash: 20 bytes> = 22 bytes
func (i initiatingEvent) encode() entrydb.Entry { func (i initiatingEvent) encode() entrydb.Entry {
var data entrydb.Entry var data entrydb.Entry
data[0] = typeInitiatingEvent data[0] = uint8(entrydb.TypeInitiatingEvent)
data[1] = i.blockDiff
flags := byte(0) flags := byte(0)
if i.incrementLogIdx {
// Set flag to indicate log idx needs to be incremented (ie we're not directly after a checkpoint)
flags = flags | eventFlagIncrementLogIdx
}
if i.hasExecMsg { if i.hasExecMsg {
flags = flags | eventFlagHasExecutingMessage flags = flags | eventFlagHasExecutingMessage
} }
data[2] = flags data[1] = flags
copy(data[3:23], i.logHash[:]) copy(data[2:22], i.logHash[:])
return data return data
} }
func (i initiatingEvent) postContext(pre logContext) logContext {
post := logContext{
blockNum: pre.blockNum + uint64(i.blockDiff),
logIdx: pre.logIdx,
}
if i.blockDiff > 0 {
post.logIdx = 0
}
if i.incrementLogIdx {
post.logIdx++
}
return post
}
// preContext is the reverse of postContext and calculates the logContext required as input to get the specified post
// context after applying this init event.
func (i initiatingEvent) preContext(post logContext) logContext {
pre := post
pre.blockNum = post.blockNum - uint64(i.blockDiff)
if i.incrementLogIdx {
pre.logIdx--
}
return pre
}
type executingLink struct { type executingLink struct {
chain uint32 chain uint32
blockNum uint64 blockNum uint64
...@@ -178,8 +128,8 @@ func newExecutingLink(msg types.ExecutingMessage) (executingLink, error) { ...@@ -178,8 +128,8 @@ func newExecutingLink(msg types.ExecutingMessage) (executingLink, error) {
} }
func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) { func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) {
if data[0] != typeExecutingLink { if data.Type() != entrydb.TypeExecutingLink {
return executingLink{}, fmt.Errorf("%w: attempting to decode executing link but was type %v", ErrDataCorruption, data[0]) return executingLink{}, fmt.Errorf("%w: attempting to decode executing link but was type %s", ErrDataCorruption, data.Type())
} }
timestamp := binary.LittleEndian.Uint64(data[16:24]) timestamp := binary.LittleEndian.Uint64(data[16:24])
return executingLink{ return executingLink{
...@@ -194,7 +144,7 @@ func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) { ...@@ -194,7 +144,7 @@ func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) {
// type 3: "executing link" <type><chain: 4 bytes><blocknum: 8 bytes><event index: 3 bytes><uint64 timestamp: 8 bytes> = 24 bytes // type 3: "executing link" <type><chain: 4 bytes><blocknum: 8 bytes><event index: 3 bytes><uint64 timestamp: 8 bytes> = 24 bytes
func (e executingLink) encode() entrydb.Entry { func (e executingLink) encode() entrydb.Entry {
var entry entrydb.Entry var entry entrydb.Entry
entry[0] = typeExecutingLink entry[0] = uint8(entrydb.TypeExecutingLink)
binary.LittleEndian.PutUint32(entry[1:5], e.chain) binary.LittleEndian.PutUint32(entry[1:5], e.chain)
binary.LittleEndian.PutUint64(entry[5:13], e.blockNum) binary.LittleEndian.PutUint64(entry[5:13], e.blockNum)
...@@ -214,12 +164,12 @@ func newExecutingCheck(hash types.TruncatedHash) executingCheck { ...@@ -214,12 +164,12 @@ func newExecutingCheck(hash types.TruncatedHash) executingCheck {
return executingCheck{hash: hash} return executingCheck{hash: hash}
} }
func newExecutingCheckFromEntry(entry entrydb.Entry) (executingCheck, error) { func newExecutingCheckFromEntry(data entrydb.Entry) (executingCheck, error) {
if entry[0] != typeExecutingCheck { if data.Type() != entrydb.TypeExecutingCheck {
return executingCheck{}, fmt.Errorf("%w: attempting to decode executing check but was type %v", ErrDataCorruption, entry[0]) return executingCheck{}, fmt.Errorf("%w: attempting to decode executing check but was type %s", ErrDataCorruption, data.Type())
} }
var hash types.TruncatedHash var hash types.TruncatedHash
copy(hash[:], entry[1:21]) copy(hash[:], data[1:21])
return newExecutingCheck(hash), nil return newExecutingCheck(hash), nil
} }
...@@ -227,25 +177,17 @@ func newExecutingCheckFromEntry(entry entrydb.Entry) (executingCheck, error) { ...@@ -227,25 +177,17 @@ func newExecutingCheckFromEntry(entry entrydb.Entry) (executingCheck, error) {
// type 4: "executing check" <type><event-hash: 20 bytes> = 21 bytes // type 4: "executing check" <type><event-hash: 20 bytes> = 21 bytes
func (e executingCheck) encode() entrydb.Entry { func (e executingCheck) encode() entrydb.Entry {
var entry entrydb.Entry var entry entrydb.Entry
entry[0] = typeExecutingCheck entry[0] = uint8(entrydb.TypeExecutingCheck)
copy(entry[1:21], e.hash[:]) copy(entry[1:21], e.hash[:])
return entry return entry
} }
func newExecutingMessageFromEntries(linkEntry entrydb.Entry, checkEntry entrydb.Entry) (types.ExecutingMessage, error) { type paddingEntry struct{}
link, err := newExecutingLinkFromEntry(linkEntry)
if err != nil { // encoding of the padding entry
return types.ExecutingMessage{}, fmt.Errorf("invalid executing link: %w", err) // type 5: "padding" <type><padding: 23 bytes> = 24 bytes
} func (e paddingEntry) encode() entrydb.Entry {
check, err := newExecutingCheckFromEntry(checkEntry) var entry entrydb.Entry
if err != nil { entry[0] = uint8(entrydb.TypePadding)
return types.ExecutingMessage{}, fmt.Errorf("invalid executing check: %w", err) return entry
}
return types.ExecutingMessage{
Chain: link.chain,
BlockNum: link.blockNum,
LogIdx: link.logIdx,
Timestamp: link.timestamp,
Hash: check.hash,
}, nil
} }
...@@ -9,106 +9,134 @@ import ( ...@@ -9,106 +9,134 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
) )
type IteratorState interface {
NextIndex() entrydb.EntryIdx
SealedBlock() (hash types.TruncatedHash, num uint64, ok bool)
InitMessage() (hash types.TruncatedHash, logIndex uint32, ok bool)
ExecMessage() *types.ExecutingMessage
}
type Iterator interface { type Iterator interface {
NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error) End() error
Index() entrydb.EntryIdx NextInitMsg() error
ExecMessage() (types.ExecutingMessage, error) NextExecMsg() error
NextBlock() error
IteratorState
} }
type iterator struct { type iterator struct {
db *DB db *DB
nextEntryIdx entrydb.EntryIdx current logContext
current logContext
hasExecMsg bool
entriesRead int64 entriesRead int64
} }
// NextLog returns the next log in the iterator. // End traverses the iterator to the end of the DB.
// It scans forward until it finds an initiating event, returning the block number, log index, and event hash. // It does not return io.EOF or ErrFuture.
func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error) { func (i *iterator) End() error {
for i.nextEntryIdx <= i.db.lastEntryIdx() { for {
entryIdx := i.nextEntryIdx _, err := i.next()
entry, err := i.db.store.Read(entryIdx) if errors.Is(err, ErrFuture) {
return nil
} else if err != nil {
return err
}
}
}
// NextInitMsg returns the next initiating message in the iterator.
// It scans forward until it finds and fully reads an initiating event, skipping any blocks.
func (i *iterator) NextInitMsg() error {
seenLog := false
for {
typ, err := i.next()
if err != nil { if err != nil {
outErr = fmt.Errorf("failed to read entry %v: %w", i, err) return err
return }
if typ == entrydb.TypeInitiatingEvent {
seenLog = true
} }
i.nextEntryIdx++ if !i.current.hasCompleteBlock() {
i.entriesRead++ continue // must know the block we're building on top of
i.hasExecMsg = false }
switch entry[0] { if i.current.hasIncompleteLog() {
case typeSearchCheckpoint: continue // didn't finish processing the log yet
current, err := newSearchCheckpointFromEntry(entry) }
if err != nil { if seenLog {
outErr = fmt.Errorf("failed to parse search checkpoint at idx %v: %w", entryIdx, err) return nil
return
}
i.current.blockNum = current.blockNum
i.current.logIdx = current.logIdx
case typeInitiatingEvent:
evt, err := newInitiatingEventFromEntry(entry)
if err != nil {
outErr = fmt.Errorf("failed to parse initiating event at idx %v: %w", entryIdx, err)
return
}
i.current = evt.postContext(i.current)
blockNum = i.current.blockNum
logIdx = i.current.logIdx
evtHash = evt.logHash
i.hasExecMsg = evt.hasExecMsg
return
case typeCanonicalHash: // Skip
case typeExecutingCheck: // Skip
case typeExecutingLink: // Skip
default:
outErr = fmt.Errorf("unknown entry type at idx %v %v", entryIdx, entry[0])
return
} }
} }
outErr = io.EOF
return
} }
func (i *iterator) Index() entrydb.EntryIdx { // NextExecMsg returns the next executing message in the iterator.
return i.nextEntryIdx - 1 // It scans forward until it finds and fully reads an initiating event, skipping any blocks.
// This does not stay at the executing message of the current initiating message, if there is any.
func (i *iterator) NextExecMsg() error {
for {
err := i.NextInitMsg()
if err != nil {
return err
}
if i.current.execMsg != nil {
return nil // found a new executing message!
}
}
} }
func (i *iterator) ExecMessage() (types.ExecutingMessage, error) { // NextBlock returns the next block in the iterator.
if !i.hasExecMsg { // It scans forward until it finds and fully reads a block, skipping any events.
return types.ExecutingMessage{}, nil func (i *iterator) NextBlock() error {
} seenBlock := false
// Look ahead to find the exec message info for {
logEntryIdx := i.nextEntryIdx - 1 typ, err := i.next()
execMsg, err := i.readExecMessage(logEntryIdx) if err != nil {
if err != nil { return err
return types.ExecutingMessage{}, fmt.Errorf("failed to read exec message for initiating event at %v: %w", logEntryIdx, err) }
if typ == entrydb.TypeSearchCheckpoint {
seenBlock = true
}
if !i.current.hasCompleteBlock() {
continue // need the full block content
}
if seenBlock {
return nil
}
} }
return execMsg, nil
} }
func (i *iterator) readExecMessage(initEntryIdx entrydb.EntryIdx) (types.ExecutingMessage, error) { // Read and apply the next entry.
linkIdx := initEntryIdx + 1 func (i *iterator) next() (entrydb.EntryType, error) {
if linkIdx%searchCheckpointFrequency == 0 { index := i.current.nextEntryIndex
linkIdx += 2 // skip the search checkpoint and canonical hash entries entry, err := i.db.store.Read(index)
if err != nil {
if errors.Is(err, io.EOF) {
return 0, ErrFuture
}
return 0, fmt.Errorf("failed to read entry %d: %w", index, err)
} }
linkEntry, err := i.db.store.Read(linkIdx) if err := i.current.ApplyEntry(entry); err != nil {
if errors.Is(err, io.EOF) { return entry.Type(), fmt.Errorf("failed to apply entry %d to iterator state: %w", index, err)
return types.ExecutingMessage{}, fmt.Errorf("%w: missing expected executing link event at idx %v", ErrDataCorruption, linkIdx)
} else if err != nil {
return types.ExecutingMessage{}, fmt.Errorf("failed to read executing link event at idx %v: %w", linkIdx, err)
} }
checkIdx := linkIdx + 1 i.entriesRead++
if checkIdx%searchCheckpointFrequency == 0 { return entry.Type(), nil
checkIdx += 2 // skip the search checkpoint and canonical hash entries }
}
checkEntry, err := i.db.store.Read(checkIdx) func (i *iterator) NextIndex() entrydb.EntryIdx {
if errors.Is(err, io.EOF) { return i.current.NextIndex()
return types.ExecutingMessage{}, fmt.Errorf("%w: missing expected executing check event at idx %v", ErrDataCorruption, checkIdx) }
} else if err != nil {
return types.ExecutingMessage{}, fmt.Errorf("failed to read executing check event at idx %v: %w", checkIdx, err) // SealedBlock returns the sealed block that we are appending logs after, if any is available.
} // I.e. the block is the parent block of the block containing the logs that are currently appending to it.
return newExecutingMessageFromEntries(linkEntry, checkEntry) func (i *iterator) SealedBlock() (hash types.TruncatedHash, num uint64, ok bool) {
return i.current.SealedBlock()
}
// InitMessage returns the current initiating message, if any is available.
func (i *iterator) InitMessage() (hash types.TruncatedHash, logIndex uint32, ok bool) {
return i.current.InitMessage()
}
// ExecMessage returns the current executing message, if any is available.
func (i *iterator) ExecMessage() *types.ExecutingMessage {
return i.current.ExecMessage()
} }
This diff is collapsed.
package db package db
import ( import (
"errors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -132,11 +135,17 @@ func check( ...@@ -132,11 +135,17 @@ func check(
// exist at the blockNum and logIdx // exist at the blockNum and logIdx
// have a hash that matches the provided hash (implicit in the Contains call), and // have a hash that matches the provided hash (implicit in the Contains call), and
// be less than or equal to the local head for the chain // be less than or equal to the local head for the chain
exists, index, err := chainsDB.logDBs[chain].Contains(blockNum, logIdx, logHash) index, err := chainsDB.logDBs[chain].Contains(blockNum, logIdx, logHash)
if err != nil { if err != nil {
if errors.Is(err, logs.ErrFuture) {
return false // TODO(#12031)
}
if errors.Is(err, logs.ErrConflict) {
return false // TODO(#12031)
}
return false return false
} }
return exists && index <= localHead return index <= localHead
} }
// Check checks if the log entry is safe, provided a local head for the chain // Check checks if the log entry is safe, provided a local head for the chain
......
package db package db
import ( import (
"fmt" "errors"
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
) )
// TestHeadsForChain tests the heads for a chain, // TestHeadsForChain tests the heads for a chain,
...@@ -115,7 +118,7 @@ func TestCheck(t *testing.T) { ...@@ -115,7 +118,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(6), nil}, containsResponse{entrydb.EntryIdx(6), nil},
true, true,
}, },
{ {
...@@ -126,7 +129,7 @@ func TestCheck(t *testing.T) { ...@@ -126,7 +129,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(3), nil}, containsResponse{entrydb.EntryIdx(3), nil},
true, true,
}, },
{ {
...@@ -137,7 +140,7 @@ func TestCheck(t *testing.T) { ...@@ -137,7 +140,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(1), nil}, containsResponse{entrydb.EntryIdx(1), nil},
true, true,
}, },
{ {
...@@ -148,7 +151,7 @@ func TestCheck(t *testing.T) { ...@@ -148,7 +151,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{false, entrydb.EntryIdx(1), nil}, containsResponse{entrydb.EntryIdx(1), logs.ErrConflict},
false, false,
}, },
{ {
...@@ -159,7 +162,7 @@ func TestCheck(t *testing.T) { ...@@ -159,7 +162,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(100), nil}, containsResponse{entrydb.EntryIdx(100), nil},
false, false,
}, },
{ {
...@@ -170,7 +173,7 @@ func TestCheck(t *testing.T) { ...@@ -170,7 +173,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(5), nil}, containsResponse{entrydb.EntryIdx(5), nil},
false, false,
}, },
{ {
...@@ -181,7 +184,7 @@ func TestCheck(t *testing.T) { ...@@ -181,7 +184,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{true, entrydb.EntryIdx(3), nil}, containsResponse{entrydb.EntryIdx(3), nil},
false, false,
}, },
{ {
...@@ -192,7 +195,7 @@ func TestCheck(t *testing.T) { ...@@ -192,7 +195,7 @@ func TestCheck(t *testing.T) {
1, 1,
1, 1,
backendTypes.TruncatedHash{1, 2, 3}, backendTypes.TruncatedHash{1, 2, 3},
containsResponse{false, entrydb.EntryIdx(0), fmt.Errorf("error")}, containsResponse{entrydb.EntryIdx(0), errors.New("error")},
false, false,
}, },
} }
......
...@@ -26,7 +26,7 @@ type Metrics interface { ...@@ -26,7 +26,7 @@ type Metrics interface {
type Storage interface { type Storage interface {
LogStorage LogStorage
DatabaseRewinder DatabaseRewinder
LatestBlockNum(chainID types.ChainID) uint64 LatestBlockNum(chainID types.ChainID) (num uint64, ok bool)
} }
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform // ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
...@@ -43,8 +43,13 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID ...@@ -43,8 +43,13 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID
return nil, err return nil, err
} }
latest, ok := store.LatestBlockNum(chainID)
if !ok {
logger.Warn("")
}
startingHead := eth.L1BlockRef{ startingHead := eth.L1BlockRef{
Number: store.LatestBlockNum(chainID), Number: latest,
} }
processLogs := newLogProcessor(chainID, store) processLogs := newLogProcessor(chainID, store)
......
...@@ -49,7 +49,7 @@ func NewChainProcessor(log log.Logger, client BlockByNumberSource, chain types.C ...@@ -49,7 +49,7 @@ func NewChainProcessor(log log.Logger, client BlockByNumberSource, chain types.C
} }
func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) { func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) {
s.log.Debug("Processing chain", "chain", s.chain, "head", head) s.log.Debug("Processing chain", "chain", s.chain, "head", head, "last", s.lastBlock)
if head.Number <= s.lastBlock.Number { if head.Number <= s.lastBlock.Number {
s.log.Info("head is not newer than last processed block", "head", head, "lastBlock", s.lastBlock) s.log.Info("head is not newer than last processed block", "head", head, "lastBlock", s.lastBlock)
return return
......
...@@ -5,17 +5,19 @@ import ( ...@@ -5,17 +5,19 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source/contracts" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source/contracts"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
supTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" supTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
) )
type LogStorage interface { type LogStorage interface {
AddLog(chain supTypes.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error SealBlock(chain supTypes.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error
AddLog(chain supTypes.ChainID, logHash backendTypes.TruncatedHash, parentBlock eth.BlockID, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error
} }
type EventDecoder interface { type EventDecoder interface {
...@@ -53,13 +55,15 @@ func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpt ...@@ -53,13 +55,15 @@ func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpt
} }
// executing messages have multiple entries in the database // executing messages have multiple entries in the database
// they should start with the initiating message and then include the execution // they should start with the initiating message and then include the execution
fmt.Println("p.chain", p.chain) err = p.logStore.AddLog(p.chain, logHash, block.ParentID(), uint32(l.Index), execMsg)
err = p.logStore.AddLog(p.chain, logHash, block.ID(), block.Time, uint32(l.Index), execMsg)
if err != nil { if err != nil {
return fmt.Errorf("failed to add log %d from block %v: %w", l.Index, block.ID(), err) return fmt.Errorf("failed to add log %d from block %v: %w", l.Index, block.ID(), err)
} }
} }
} }
if err := p.logStore.SealBlock(p.chain, block.ParentHash, block.ID(), block.Time); err != nil {
return fmt.Errorf("failed to seal block %s: %w", block.ID(), err)
}
return nil return nil
} }
......
...@@ -18,7 +18,12 @@ var logProcessorChainID = supTypes.ChainIDFromUInt64(4) ...@@ -18,7 +18,12 @@ var logProcessorChainID = supTypes.ChainIDFromUInt64(4)
func TestLogProcessor(t *testing.T) { func TestLogProcessor(t *testing.T) {
ctx := context.Background() ctx := context.Background()
block1 := eth.L1BlockRef{Number: 100, Hash: common.Hash{0x11}, Time: 1111} block1 := eth.L1BlockRef{
ParentHash: common.Hash{0x42},
Number: 100,
Hash: common.Hash{0x11},
Time: 1111,
}
t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) { t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) {
store := &stubLogStorage{} store := &stubLogStorage{}
processor := newLogProcessor(logProcessorChainID, store) processor := newLogProcessor(logProcessorChainID, store)
...@@ -59,30 +64,36 @@ func TestLogProcessor(t *testing.T) { ...@@ -59,30 +64,36 @@ func TestLogProcessor(t *testing.T) {
err := processor.ProcessLogs(ctx, block1, rcpts) err := processor.ProcessLogs(ctx, block1, rcpts)
require.NoError(t, err) require.NoError(t, err)
expected := []storedLog{ expectedLogs := []storedLog{
{ {
block: block1.ID(), parent: block1.ParentID(),
timestamp: block1.Time, logIdx: 0,
logIdx: 0, logHash: logToLogHash(rcpts[0].Logs[0]),
logHash: logToLogHash(rcpts[0].Logs[0]), execMsg: nil,
execMsg: nil,
}, },
{ {
block: block1.ID(), parent: block1.ParentID(),
timestamp: block1.Time, logIdx: 0,
logIdx: 0, logHash: logToLogHash(rcpts[0].Logs[1]),
logHash: logToLogHash(rcpts[0].Logs[1]), execMsg: nil,
execMsg: nil,
}, },
{ {
parent: block1.ParentID(),
logIdx: 0,
logHash: logToLogHash(rcpts[1].Logs[0]),
execMsg: nil,
},
}
require.Equal(t, expectedLogs, store.logs)
expectedBlocks := []storedSeal{
{
parent: block1.ParentHash,
block: block1.ID(), block: block1.ID(),
timestamp: block1.Time, timestamp: block1.Time,
logIdx: 0,
logHash: logToLogHash(rcpts[1].Logs[0]),
execMsg: nil,
}, },
} }
require.Equal(t, expected, store.logs) require.Equal(t, expectedBlocks, store.seals)
}) })
t.Run("IncludeExecutingMessage", func(t *testing.T) { t.Run("IncludeExecutingMessage", func(t *testing.T) {
...@@ -115,14 +126,22 @@ func TestLogProcessor(t *testing.T) { ...@@ -115,14 +126,22 @@ func TestLogProcessor(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
expected := []storedLog{ expected := []storedLog{
{ {
parent: block1.ParentID(),
logIdx: 0,
logHash: logToLogHash(rcpts[0].Logs[0]),
execMsg: &execMsg,
},
}
require.Equal(t, expected, store.logs)
expectedBlocks := []storedSeal{
{
parent: block1.ParentHash,
block: block1.ID(), block: block1.ID(),
timestamp: block1.Time, timestamp: block1.Time,
logIdx: 0,
logHash: logToLogHash(rcpts[0].Logs[0]),
execMsg: &execMsg,
}, },
} }
require.Equal(t, expected, store.logs) require.Equal(t, expectedBlocks, store.seals)
}) })
} }
...@@ -183,29 +202,46 @@ func TestToLogHash(t *testing.T) { ...@@ -183,29 +202,46 @@ func TestToLogHash(t *testing.T) {
} }
type stubLogStorage struct { type stubLogStorage struct {
logs []storedLog logs []storedLog
seals []storedSeal
} }
func (s *stubLogStorage) AddLog(chainID supTypes.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error { func (s *stubLogStorage) SealBlock(chainID supTypes.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error {
if logProcessorChainID != chainID { if logProcessorChainID != chainID {
return fmt.Errorf("chain id mismatch, expected %v but got %v", logProcessorChainID, chainID) return fmt.Errorf("chain id mismatch, expected %v but got %v", logProcessorChainID, chainID)
} }
s.logs = append(s.logs, storedLog{ s.seals = append(s.seals, storedSeal{
parent: parentHash,
block: block, block: block,
timestamp: timestamp, timestamp: timestamp,
logIdx: logIdx,
logHash: logHash,
execMsg: execMsg,
}) })
return nil return nil
} }
type storedLog struct { func (s *stubLogStorage) AddLog(chainID supTypes.ChainID, logHash backendTypes.TruncatedHash, parentBlock eth.BlockID, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error {
if logProcessorChainID != chainID {
return fmt.Errorf("chain id mismatch, expected %v but got %v", logProcessorChainID, chainID)
}
s.logs = append(s.logs, storedLog{
parent: parentBlock,
logIdx: logIdx,
logHash: logHash,
execMsg: execMsg,
})
return nil
}
type storedSeal struct {
parent common.Hash
block eth.BlockID block eth.BlockID
timestamp uint64 timestamp uint64
logIdx uint32 }
logHash backendTypes.TruncatedHash
execMsg *backendTypes.ExecutingMessage type storedLog struct {
parent eth.BlockID
logIdx uint32
logHash backendTypes.TruncatedHash
execMsg *backendTypes.ExecutingMessage
} }
type EventDecoderFn func(*ethTypes.Log) (backendTypes.ExecutingMessage, error) type EventDecoderFn func(*ethTypes.Log) (backendTypes.ExecutingMessage, error)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment