Commit d5109103 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Refactor types (#11132)

* op-supervisor: Move TruncatedHash and ExecutingMessage out of db package

* op-supervisor: Introduce EntryIdx type

Will make it clear when a value is a block number vs a db entry index when adding head tracking.
parent acafdb9e
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
......@@ -22,7 +23,7 @@ import (
type LogStore interface {
io.Closer
ClosestBlockInfo(blockNum uint64) (uint64, db.TruncatedHash, error)
ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
Rewind(headBlockNum uint64) error
}
......
......@@ -5,7 +5,7 @@ import (
"io"
"testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/stretchr/testify/require"
)
......@@ -56,11 +56,11 @@ func (s *stubLogStore) Close() error {
return nil
}
func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, db.TruncatedHash, error) {
func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) {
if s.closestBlockErr != nil {
return 0, db.TruncatedHash{}, s.closestBlockErr
return 0, types.TruncatedHash{}, s.closestBlockErr
}
return s.closestBlockNumber, db.TruncatedHash{}, nil
return s.closestBlockNumber, types.TruncatedHash{}, nil
}
func (s *stubLogStore) Rewind(headBlockNum uint64) error {
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum/go-ethereum/log"
)
......@@ -27,6 +28,12 @@ const (
typeExecutingCheck
)
var (
ErrLogOutOfOrder = errors.New("log out of order")
ErrDataCorruption = errors.New("data corruption")
ErrNotFound = errors.New("not found")
)
type Metrics interface {
RecordDBEntryCount(count int64)
RecordDBSearchEntriesRead(count int64)
......@@ -39,9 +46,10 @@ type logContext struct {
type EntryStore interface {
Size() int64
Read(idx int64) (entrydb.Entry, error)
LastEntryIdx() entrydb.EntryIdx
Read(idx entrydb.EntryIdx) (entrydb.Entry, error)
Append(entries ...entrydb.Entry) error
Truncate(idx int64) error
Truncate(idx entrydb.EntryIdx) error
Close() error
}
......@@ -103,8 +111,8 @@ func NewFromEntryStore(logger log.Logger, m Metrics, store EntryStore) (*DB, err
return db, nil
}
func (db *DB) lastEntryIdx() int64 {
return db.store.Size() - 1
func (db *DB) lastEntryIdx() entrydb.EntryIdx {
return db.store.LastEntryIdx()
}
func (db *DB) init() error {
......@@ -166,26 +174,26 @@ func (db *DB) trimInvalidTrailingEntries() error {
}
func (db *DB) updateEntryCountMetric() {
db.m.RecordDBEntryCount(db.lastEntryIdx() + 1)
db.m.RecordDBEntryCount(db.store.Size())
}
// ClosestBlockInfo returns the block number and hash of the highest recorded block at or before blockNum.
// Since block data is only recorded in search checkpoints, this may return an earlier block even if log data is
// recorded for the requested block.
func (db *DB) ClosestBlockInfo(blockNum uint64) (uint64, TruncatedHash, error) {
func (db *DB) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
checkpointIdx, err := db.searchCheckpoint(blockNum, math.MaxUint32)
if err != nil {
return 0, TruncatedHash{}, fmt.Errorf("no checkpoint at or before block %v found: %w", blockNum, err)
return 0, types.TruncatedHash{}, fmt.Errorf("no checkpoint at or before block %v found: %w", blockNum, err)
}
checkpoint, err := db.readSearchCheckpoint(checkpointIdx)
if err != nil {
return 0, TruncatedHash{}, fmt.Errorf("failed to reach checkpoint: %w", err)
return 0, types.TruncatedHash{}, fmt.Errorf("failed to reach checkpoint: %w", err)
}
entry, err := db.readCanonicalHash(checkpointIdx + 1)
if err != nil {
return 0, TruncatedHash{}, fmt.Errorf("failed to read canonical hash: %w", err)
return 0, types.TruncatedHash{}, fmt.Errorf("failed to read canonical hash: %w", err)
}
return checkpoint.blockNum, entry.hash, nil
}
......@@ -193,7 +201,7 @@ func (db *DB) ClosestBlockInfo(blockNum uint64) (uint64, TruncatedHash, error) {
// Contains return true iff the specified logHash is recorded in the specified blockNum and logIdx.
// logIdx is the index of the log in the array of all logs the block.
// This can be used to check the validity of cross-chain interop events.
func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash TruncatedHash) (bool, error) {
func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash types.TruncatedHash) (bool, error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
db.log.Trace("Checking for log", "blockNum", blockNum, "logIdx", logIdx, "hash", logHash)
......@@ -215,32 +223,32 @@ func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash TruncatedHash) (b
// logIdx is the index of the log in the array of all logs the block.
// Returns the ExecutingMessage if it exists, or ExecutingMessage{} if the log is found but has no ExecutingMessage.
// Returns ErrNotFound if the specified log does not exist in the database.
func (db *DB) Executes(blockNum uint64, logIdx uint32) (ExecutingMessage, error) {
func (db *DB) Executes(blockNum uint64, logIdx uint32) (types.ExecutingMessage, error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
_, iter, err := db.findLogInfo(blockNum, logIdx)
if err != nil {
return ExecutingMessage{}, err
return types.ExecutingMessage{}, err
}
execMsg, err := iter.ExecMessage()
if err != nil {
return ExecutingMessage{}, fmt.Errorf("failed to read executing message: %w", err)
return types.ExecutingMessage{}, fmt.Errorf("failed to read executing message: %w", err)
}
return execMsg, nil
}
func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (TruncatedHash, *iterator, error) {
func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (types.TruncatedHash, *iterator, error) {
entryIdx, err := db.searchCheckpoint(blockNum, logIdx)
if errors.Is(err, io.EOF) {
// Did not find a checkpoint to start reading from so the log cannot be present.
return TruncatedHash{}, nil, ErrNotFound
return types.TruncatedHash{}, nil, ErrNotFound
} else if err != nil {
return TruncatedHash{}, nil, err
return types.TruncatedHash{}, nil, err
}
i, err := db.newIterator(entryIdx)
if err != nil {
return TruncatedHash{}, nil, fmt.Errorf("failed to create iterator: %w", err)
return types.TruncatedHash{}, nil, fmt.Errorf("failed to create iterator: %w", err)
}
db.log.Trace("Starting search", "entry", entryIdx, "blockNum", i.current.blockNum, "logIdx", i.current.logIdx)
defer func() {
......@@ -250,9 +258,9 @@ func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (TruncatedHash, *itera
evtBlockNum, evtLogIdx, evtHash, err := i.NextLog()
if errors.Is(err, io.EOF) {
// Reached end of log without finding the event
return TruncatedHash{}, nil, ErrNotFound
return types.TruncatedHash{}, nil, ErrNotFound
} else if err != nil {
return TruncatedHash{}, nil, fmt.Errorf("failed to read next log: %w", err)
return types.TruncatedHash{}, nil, fmt.Errorf("failed to read next log: %w", err)
}
if evtBlockNum == blockNum && evtLogIdx == logIdx {
db.log.Trace("Found initiatingEvent", "blockNum", evtBlockNum, "logIdx", evtLogIdx, "hash", evtHash)
......@@ -260,12 +268,12 @@ func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (TruncatedHash, *itera
}
if evtBlockNum > blockNum || (evtBlockNum == blockNum && evtLogIdx > logIdx) {
// Progressed past the requested log without finding it.
return TruncatedHash{}, nil, ErrNotFound
return types.TruncatedHash{}, nil, ErrNotFound
}
}
}
func (db *DB) newIterator(startCheckpointEntry int64) (*iterator, error) {
func (db *DB) newIterator(startCheckpointEntry entrydb.EntryIdx) (*iterator, error) {
checkpoint, err := db.readSearchCheckpoint(startCheckpointEntry)
if err != nil {
return nil, fmt.Errorf("failed to read search checkpoint entry %v: %w", startCheckpointEntry, err)
......@@ -315,13 +323,13 @@ func (db *DB) newIterator(startCheckpointEntry int64) (*iterator, error) {
// searchCheckpoint performs a binary search of the searchCheckpoint entries to find the closest one at or before
// the requested log.
// Returns the index of the searchCheckpoint to begin reading from or an error
func (db *DB) searchCheckpoint(blockNum uint64, logIdx uint32) (int64, error) {
func (db *DB) searchCheckpoint(blockNum uint64, logIdx uint32) (entrydb.EntryIdx, error) {
n := (db.lastEntryIdx() / searchCheckpointFrequency) + 1
// Define x[-1] < target and x[n] >= target.
// Invariant: x[i-1] < target, x[j] >= target.
i, j := int64(0), n
i, j := entrydb.EntryIdx(0), n
for i < j {
h := int64(uint64(i+j) >> 1) // avoid overflow when computing h
h := entrydb.EntryIdx(uint64(i+j) >> 1) // avoid overflow when computing h
checkpoint, err := db.readSearchCheckpoint(h * searchCheckpointFrequency)
if err != nil {
return 0, fmt.Errorf("failed to read entry %v: %w", h, err)
......@@ -351,7 +359,7 @@ func (db *DB) searchCheckpoint(blockNum uint64, logIdx uint32) (int64, error) {
return (i - 1) * searchCheckpointFrequency, nil
}
func (db *DB) AddLog(logHash TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *ExecutingMessage) error {
func (db *DB) AddLog(logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error {
db.rwLock.Lock()
defer db.rwLock.Unlock()
postState := logContext{
......@@ -381,7 +389,7 @@ func (db *DB) AddLog(logHash TruncatedHash, block eth.BlockID, timestamp uint64,
maybeAddCheckpoint := func() {
if (lastEntryIdx+1)%searchCheckpointFrequency == 0 {
addEntry(newSearchCheckpoint(block.Number, logIdx, timestamp).encode())
addEntry(newCanonicalHash(TruncateHash(block.Hash)).encode())
addEntry(newCanonicalHash(types.TruncateHash(block.Hash)).encode())
newContext = postState
}
}
......@@ -465,7 +473,7 @@ func (db *DB) Rewind(headBlockNum uint64) error {
return nil
}
func (db *DB) readSearchCheckpoint(entryIdx int64) (searchCheckpoint, error) {
func (db *DB) readSearchCheckpoint(entryIdx entrydb.EntryIdx) (searchCheckpoint, error) {
data, err := db.store.Read(entryIdx)
if err != nil {
return searchCheckpoint{}, fmt.Errorf("failed to read entry %v: %w", entryIdx, err)
......@@ -473,7 +481,7 @@ func (db *DB) readSearchCheckpoint(entryIdx int64) (searchCheckpoint, error) {
return newSearchCheckpointFromEntry(data)
}
func (db *DB) readCanonicalHash(entryIdx int64) (canonicalHash, error) {
func (db *DB) readCanonicalHash(entryIdx entrydb.EntryIdx) (canonicalHash, error) {
data, err := db.store.Read(entryIdx)
if err != nil {
return canonicalHash{}, fmt.Errorf("failed to read entry %v: %w", entryIdx, err)
......
......@@ -12,13 +12,14 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func createTruncatedHash(i int) TruncatedHash {
return TruncateHash(createHash(i))
func createTruncatedHash(i int) types.TruncatedHash {
return types.TruncateHash(createHash(i))
}
func createHash(i int) common.Hash {
......@@ -311,12 +312,12 @@ func TestAddLog(t *testing.T) {
}
func TestAddDependentLog(t *testing.T) {
execMsg := ExecutingMessage{
execMsg := types.ExecutingMessage{
Chain: 3,
BlockNum: 42894,
LogIdx: 42,
Timestamp: 8742482,
Hash: TruncateHash(createHash(8844)),
Hash: types.TruncateHash(createHash(8844)),
}
t.Run("FirstEntry", func(t *testing.T) {
runDBTest(t,
......@@ -418,26 +419,26 @@ func TestContains(t *testing.T) {
requireNotContains(t, db, 50, 3, createHash(2))
// Should not find log when hash doesn't match log at block number and index
requireWrongHash(t, db, 50, 0, createHash(5), ExecutingMessage{})
requireWrongHash(t, db, 50, 0, createHash(5), types.ExecutingMessage{})
})
}
func TestExecutes(t *testing.T) {
execMsg1 := ExecutingMessage{
execMsg1 := types.ExecutingMessage{
Chain: 33,
BlockNum: 22,
LogIdx: 99,
Timestamp: 948294,
Hash: createTruncatedHash(332299),
}
execMsg2 := ExecutingMessage{
execMsg2 := types.ExecutingMessage{
Chain: 44,
BlockNum: 55,
LogIdx: 66,
Timestamp: 77777,
Hash: createTruncatedHash(445566),
}
execMsg3 := ExecutingMessage{
execMsg3 := types.ExecutingMessage{
Chain: 77,
BlockNum: 88,
LogIdx: 89,
......@@ -454,9 +455,9 @@ func TestExecutes(t *testing.T) {
},
func(t *testing.T, db *DB, m *stubMetrics) {
// Should find added logs
requireExecutingMessage(t, db, 50, 0, ExecutingMessage{})
requireExecutingMessage(t, db, 50, 0, types.ExecutingMessage{})
requireExecutingMessage(t, db, 50, 1, execMsg1)
requireExecutingMessage(t, db, 50, 2, ExecutingMessage{})
requireExecutingMessage(t, db, 50, 2, types.ExecutingMessage{})
requireExecutingMessage(t, db, 52, 0, execMsg2)
requireExecutingMessage(t, db, 52, 1, execMsg3)
......@@ -540,20 +541,20 @@ func requireClosestBlockInfo(t *testing.T, db *DB, searchFor uint64, expectedBlo
blockNum, hash, err := db.ClosestBlockInfo(searchFor)
require.NoError(t, err)
require.Equal(t, expectedBlockNum, blockNum)
require.Equal(t, TruncateHash(expectedHash), hash)
require.Equal(t, types.TruncateHash(expectedHash), hash)
}
func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash, execMsg ...ExecutingMessage) {
func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash, execMsg ...types.ExecutingMessage) {
require.LessOrEqual(t, len(execMsg), 1, "cannot have multiple executing messages for a single log")
m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, TruncateHash(logHash))
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Truef(t, result, "Did not find log %v in block %v with hash %v", logIdx, blockNum, logHash)
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints")
require.NotZero(t, m.entriesReadForSearch, "Must read at least some entries to find the log")
var expectedExecMsg ExecutingMessage
var expectedExecMsg types.ExecutingMessage
if len(execMsg) == 1 {
expectedExecMsg = execMsg[0]
}
......@@ -563,7 +564,7 @@ func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHa
func requireNotContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash) {
m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, TruncateHash(logHash))
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash)
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints")
......@@ -573,7 +574,7 @@ func requireNotContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, lo
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints")
}
func requireExecutingMessage(t *testing.T, db *DB, blockNum uint64, logIdx uint32, execMsg ExecutingMessage) {
func requireExecutingMessage(t *testing.T, db *DB, blockNum uint64, logIdx uint32, execMsg types.ExecutingMessage) {
m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type")
actualExecMsg, err := db.Executes(blockNum, logIdx)
......@@ -583,10 +584,10 @@ func requireExecutingMessage(t *testing.T, db *DB, blockNum uint64, logIdx uint3
require.NotZero(t, m.entriesReadForSearch, "Must read at least some entries to find the log")
}
func requireWrongHash(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash, execMsg ExecutingMessage) {
func requireWrongHash(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash, execMsg types.ExecutingMessage) {
m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, TruncateHash(logHash))
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash)
......@@ -637,7 +638,7 @@ func TestRecoverOnCreate(t *testing.T) {
t.Run("NoTruncateWhenLastEntryIsExecutingCheck", func(t *testing.T) {
initEvent, err := newInitiatingEvent(logContext{blockNum: 3, logIdx: 0}, 3, 0, createTruncatedHash(1), true)
execMsg := ExecutingMessage{
execMsg := types.ExecutingMessage{
Chain: 4,
BlockNum: 10,
LogIdx: 4,
......@@ -693,7 +694,7 @@ func TestRecoverOnCreate(t *testing.T) {
t.Run("TruncateWhenLastEntryInitEventWithExecLink", func(t *testing.T) {
initEvent, err := newInitiatingEvent(logContext{blockNum: 3, logIdx: 0}, 3, 0, createTruncatedHash(1), true)
require.NoError(t, err)
execMsg := ExecutingMessage{
execMsg := types.ExecutingMessage{
Chain: 4,
BlockNum: 10,
LogIdx: 4,
......@@ -899,8 +900,12 @@ func (s *stubEntryStore) Size() int64 {
return int64(len(s.entries))
}
func (s *stubEntryStore) Read(idx int64) (entrydb.Entry, error) {
if idx < int64(len(s.entries)) {
func (s *stubEntryStore) LastEntryIdx() entrydb.EntryIdx {
return entrydb.EntryIdx(s.Size() - 1)
}
func (s *stubEntryStore) Read(idx entrydb.EntryIdx) (entrydb.Entry, error) {
if idx < entrydb.EntryIdx(len(s.entries)) {
return s.entries[idx], nil
}
return entrydb.Entry{}, io.EOF
......@@ -911,8 +916,8 @@ func (s *stubEntryStore) Append(entries ...entrydb.Entry) error {
return nil
}
func (s *stubEntryStore) Truncate(idx int64) error {
s.entries = s.entries[:min(s.Size()-1, idx+1)]
func (s *stubEntryStore) Truncate(idx entrydb.EntryIdx) error {
s.entries = s.entries[:min(s.Size()-1, int64(idx+1))]
return nil
}
......
......@@ -6,6 +6,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
)
type searchCheckpoint struct {
......@@ -45,10 +46,10 @@ func (s searchCheckpoint) encode() entrydb.Entry {
}
type canonicalHash struct {
hash TruncatedHash
hash types.TruncatedHash
}
func newCanonicalHash(hash TruncatedHash) canonicalHash {
func newCanonicalHash(hash types.TruncatedHash) canonicalHash {
return canonicalHash{hash: hash}
}
......@@ -56,7 +57,7 @@ func newCanonicalHashFromEntry(data entrydb.Entry) (canonicalHash, error) {
if data[0] != typeCanonicalHash {
return canonicalHash{}, fmt.Errorf("%w: attempting to decode canonical hash but was type %v", ErrDataCorruption, data[0])
}
var truncated TruncatedHash
var truncated types.TruncatedHash
copy(truncated[:], data[1:21])
return newCanonicalHash(truncated), nil
}
......@@ -72,7 +73,7 @@ type initiatingEvent struct {
blockDiff uint8
incrementLogIdx bool
hasExecMsg bool
logHash TruncatedHash
logHash types.TruncatedHash
}
func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) {
......@@ -85,11 +86,11 @@ func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) {
blockDiff: blockNumDiff,
incrementLogIdx: flags&eventFlagIncrementLogIdx != 0,
hasExecMsg: flags&eventFlagHasExecutingMessage != 0,
logHash: TruncatedHash(data[3:23]),
logHash: types.TruncatedHash(data[3:23]),
}, nil
}
func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash TruncatedHash, hasExecMsg bool) (initiatingEvent, error) {
func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash types.TruncatedHash, hasExecMsg bool) (initiatingEvent, error) {
blockDiff := blockNum - pre.blockNum
if blockDiff > math.MaxUint8 {
// TODO(optimism#11091): Need to find a way to support this.
......@@ -164,7 +165,7 @@ type executingLink struct {
timestamp uint64
}
func newExecutingLink(msg ExecutingMessage) (executingLink, error) {
func newExecutingLink(msg types.ExecutingMessage) (executingLink, error) {
if msg.LogIdx > 1<<24 {
return executingLink{}, fmt.Errorf("log idx is too large (%v)", msg.LogIdx)
}
......@@ -206,10 +207,10 @@ func (e executingLink) encode() entrydb.Entry {
}
type executingCheck struct {
hash TruncatedHash
hash types.TruncatedHash
}
func newExecutingCheck(hash TruncatedHash) executingCheck {
func newExecutingCheck(hash types.TruncatedHash) executingCheck {
return executingCheck{hash: hash}
}
......@@ -217,7 +218,7 @@ func newExecutingCheckFromEntry(entry entrydb.Entry) (executingCheck, error) {
if entry[0] != typeExecutingCheck {
return executingCheck{}, fmt.Errorf("%w: attempting to decode executing check but was type %v", ErrDataCorruption, entry[0])
}
var hash TruncatedHash
var hash types.TruncatedHash
copy(hash[:], entry[1:21])
return newExecutingCheck(hash), nil
}
......@@ -231,16 +232,16 @@ func (e executingCheck) encode() entrydb.Entry {
return entry
}
func newExecutingMessageFromEntries(linkEntry entrydb.Entry, checkEntry entrydb.Entry) (ExecutingMessage, error) {
func newExecutingMessageFromEntries(linkEntry entrydb.Entry, checkEntry entrydb.Entry) (types.ExecutingMessage, error) {
link, err := newExecutingLinkFromEntry(linkEntry)
if err != nil {
return ExecutingMessage{}, fmt.Errorf("invalid executing link: %w", err)
return types.ExecutingMessage{}, fmt.Errorf("invalid executing link: %w", err)
}
check, err := newExecutingCheckFromEntry(checkEntry)
if err != nil {
return ExecutingMessage{}, fmt.Errorf("invalid executing check: %w", err)
return types.ExecutingMessage{}, fmt.Errorf("invalid executing check: %w", err)
}
return ExecutingMessage{
return types.ExecutingMessage{
Chain: link.chain,
BlockNum: link.blockNum,
LogIdx: link.logIdx,
......
......@@ -13,6 +13,8 @@ const (
EntrySize = 24
)
type EntryIdx int64
type Entry [EntrySize]byte
// dataAccess defines a minimal API required to manipulate the actual stored data.
......@@ -25,8 +27,8 @@ type dataAccess interface {
}
type EntryDB struct {
data dataAccess
size int64
data dataAccess
lastEntryIdx EntryIdx
cleanupFailedWrite bool
}
......@@ -48,8 +50,8 @@ func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) {
}
size := info.Size() / EntrySize
db := &EntryDB{
data: file,
size: size,
data: file,
lastEntryIdx: EntryIdx(size - 1),
}
if size*EntrySize != info.Size() {
logger.Warn("File size is nut a multiple of entry size. Truncating to last complete entry", "fileSize", size, "entrySize", EntrySize)
......@@ -61,16 +63,20 @@ func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) {
}
func (e *EntryDB) Size() int64 {
return e.size
return int64(e.lastEntryIdx) + 1
}
func (e *EntryDB) LastEntryIdx() EntryIdx {
return e.lastEntryIdx
}
// Read an entry from the database by index. Returns io.EOF iff idx is after the last entry.
func (e *EntryDB) Read(idx int64) (Entry, error) {
if idx >= e.size {
func (e *EntryDB) Read(idx EntryIdx) (Entry, error) {
if idx > e.lastEntryIdx {
return Entry{}, io.EOF
}
var out Entry
read, err := e.data.ReadAt(out[:], idx*EntrySize)
read, err := e.data.ReadAt(out[:], int64(idx)*EntrySize)
// Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte
if err != nil && !(errors.Is(err, io.EOF) && read == EntrySize) {
return Entry{}, fmt.Errorf("failed to read entry %v: %w", idx, err)
......@@ -85,7 +91,7 @@ func (e *EntryDB) Read(idx int64) (Entry, error) {
func (e *EntryDB) Append(entries ...Entry) error {
if e.cleanupFailedWrite {
// Try to rollback partially written data from a previous Append
if truncateErr := e.Truncate(e.size - 1); truncateErr != nil {
if truncateErr := e.Truncate(e.lastEntryIdx); truncateErr != nil {
return fmt.Errorf("failed to recover from previous write error: %w", truncateErr)
}
}
......@@ -99,7 +105,7 @@ func (e *EntryDB) Append(entries ...Entry) error {
return err
}
// Try to rollback the partially written data
if truncateErr := e.Truncate(e.size - 1); truncateErr != nil {
if truncateErr := e.Truncate(e.lastEntryIdx); truncateErr != nil {
// Failed to rollback, set a flag to attempt the clean up on the next write
e.cleanupFailedWrite = true
return errors.Join(err, fmt.Errorf("failed to remove partially written data: %w", truncateErr))
......@@ -107,24 +113,24 @@ func (e *EntryDB) Append(entries ...Entry) error {
// Successfully rolled back the changes, still report the failed write
return err
}
e.size += int64(len(entries))
e.lastEntryIdx += EntryIdx(len(entries))
return nil
}
// Truncate the database so that the last retained entry is idx. Any entries after idx are deleted.
func (e *EntryDB) Truncate(idx int64) error {
if err := e.data.Truncate((idx + 1) * EntrySize); err != nil {
func (e *EntryDB) Truncate(idx EntryIdx) error {
if err := e.data.Truncate((int64(idx) + 1) * EntrySize); err != nil {
return fmt.Errorf("failed to truncate to entry %v: %w", idx, err)
}
// Update the lastEntryIdx cache
e.size = idx + 1
e.lastEntryIdx = idx
e.cleanupFailedWrite = false
return nil
}
// recover an invalid database by truncating back to the last complete event.
func (e *EntryDB) recover() error {
if err := e.data.Truncate((e.size) * EntrySize); err != nil {
if err := e.data.Truncate((e.Size()) * EntrySize); err != nil {
return fmt.Errorf("failed to truncate trailing partial entries: %w", err)
}
return nil
......
......@@ -177,7 +177,7 @@ func TestWriteErrors(t *testing.T) {
})
}
func requireRead(t *testing.T, db *EntryDB, idx int64, expected Entry) {
func requireRead(t *testing.T, db *EntryDB, idx EntryIdx, expected Entry) {
actual, err := db.Read(idx)
require.NoError(t, err)
require.Equal(t, expected, actual)
......@@ -199,7 +199,7 @@ func createEntryDB(t *testing.T) *EntryDB {
func createEntryDBWithStubData() (*EntryDB, *stubDataAccess) {
stubData := &stubDataAccess{}
db := &EntryDB{data: stubData, size: 0}
db := &EntryDB{data: stubData, lastEntryIdx: -1}
return db, stubData
}
......
......@@ -4,11 +4,14 @@ import (
"errors"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
)
type iterator struct {
db *DB
nextEntryIdx int64
nextEntryIdx entrydb.EntryIdx
current logContext
hasExecMsg bool
......@@ -16,7 +19,7 @@ type iterator struct {
entriesRead int64
}
func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedHash, outErr error) {
func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error) {
for i.nextEntryIdx <= i.db.lastEntryIdx() {
entryIdx := i.nextEntryIdx
entry, err := i.db.store.Read(entryIdx)
......@@ -60,29 +63,29 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH
return
}
func (i *iterator) ExecMessage() (ExecutingMessage, error) {
func (i *iterator) ExecMessage() (types.ExecutingMessage, error) {
if !i.hasExecMsg {
return ExecutingMessage{}, nil
return types.ExecutingMessage{}, nil
}
// Look ahead to find the exec message info
logEntryIdx := i.nextEntryIdx - 1
execMsg, err := i.readExecMessage(logEntryIdx)
if err != nil {
return ExecutingMessage{}, fmt.Errorf("failed to read exec message for initiating event at %v: %w", logEntryIdx, err)
return types.ExecutingMessage{}, fmt.Errorf("failed to read exec message for initiating event at %v: %w", logEntryIdx, err)
}
return execMsg, nil
}
func (i *iterator) readExecMessage(initEntryIdx int64) (ExecutingMessage, error) {
func (i *iterator) readExecMessage(initEntryIdx entrydb.EntryIdx) (types.ExecutingMessage, error) {
linkIdx := initEntryIdx + 1
if linkIdx%searchCheckpointFrequency == 0 {
linkIdx += 2 // skip the search checkpoint and canonical hash entries
}
linkEntry, err := i.db.store.Read(linkIdx)
if errors.Is(err, io.EOF) {
return ExecutingMessage{}, fmt.Errorf("%w: missing expected executing link event at idx %v", ErrDataCorruption, linkIdx)
return types.ExecutingMessage{}, fmt.Errorf("%w: missing expected executing link event at idx %v", ErrDataCorruption, linkIdx)
} else if err != nil {
return ExecutingMessage{}, fmt.Errorf("failed to read executing link event at idx %v: %w", linkIdx, err)
return types.ExecutingMessage{}, fmt.Errorf("failed to read executing link event at idx %v: %w", linkIdx, err)
}
checkIdx := linkIdx + 1
......@@ -91,9 +94,9 @@ func (i *iterator) readExecMessage(initEntryIdx int64) (ExecutingMessage, error)
}
checkEntry, err := i.db.store.Read(checkIdx)
if errors.Is(err, io.EOF) {
return ExecutingMessage{}, fmt.Errorf("%w: missing expected executing check event at idx %v", ErrDataCorruption, checkIdx)
return types.ExecutingMessage{}, fmt.Errorf("%w: missing expected executing check event at idx %v", ErrDataCorruption, checkIdx)
} else if err != nil {
return ExecutingMessage{}, fmt.Errorf("failed to read executing check event at idx %v: %w", checkIdx, err)
return types.ExecutingMessage{}, fmt.Errorf("failed to read executing check event at idx %v: %w", checkIdx, err)
}
return newExecutingMessageFromEntries(linkEntry, checkEntry)
}
......@@ -5,14 +5,14 @@ import (
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
)
type LogStorage interface {
AddLog(logHash db.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *db.ExecutingMessage) error
AddLog(logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error
}
type logProcessor struct {
......@@ -23,7 +23,7 @@ func newLogProcessor(logStore LogStorage) *logProcessor {
return &logProcessor{logStore}
}
func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts ethTypes.Receipts) error {
for _, rcpt := range rcpts {
for _, l := range rcpt.Logs {
logHash := logToHash(l)
......@@ -36,15 +36,15 @@ func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpt
return nil
}
func logToHash(l *types.Log) db.TruncatedHash {
func logToHash(l *ethTypes.Log) types.TruncatedHash {
payloadHash := crypto.Keccak256(logToPayload(l))
msg := make([]byte, 0, 2*common.HashLength)
msg = append(msg, l.Address.Bytes()...)
msg = append(msg, payloadHash...)
return db.TruncateHash(crypto.Keccak256Hash(msg))
return types.TruncateHash(crypto.Keccak256Hash(msg))
}
func logToPayload(l *types.Log) []byte {
func logToPayload(l *ethTypes.Log) []byte {
msg := make([]byte, 0)
for _, topic := range l.Topics {
msg = append(msg, topic.Bytes()...)
......
......@@ -5,9 +5,9 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
......@@ -18,15 +18,15 @@ func TestLogProcessor(t *testing.T) {
store := &stubLogStorage{}
processor := newLogProcessor(store)
err := processor.ProcessLogs(ctx, block1, types.Receipts{})
err := processor.ProcessLogs(ctx, block1, ethTypes.Receipts{})
require.NoError(t, err)
require.Empty(t, store.logs)
})
t.Run("OutputLogs", func(t *testing.T) {
rcpts := types.Receipts{
rcpts := ethTypes.Receipts{
{
Logs: []*types.Log{
Logs: []*ethTypes.Log{
{
Address: common.Address{0x11},
Topics: []common.Hash{{0xaa}},
......@@ -40,7 +40,7 @@ func TestLogProcessor(t *testing.T) {
},
},
{
Logs: []*types.Log{
Logs: []*ethTypes.Log{
{
Address: common.Address{0x33},
Topics: []common.Hash{{0xee}},
......@@ -82,8 +82,8 @@ func TestLogProcessor(t *testing.T) {
}
func TestToLogHash(t *testing.T) {
mkLog := func() *types.Log {
return &types.Log{
mkLog := func() *ethTypes.Log {
return &ethTypes.Log{
Address: common.Address{0xaa, 0xbb},
Topics: []common.Hash{
{0xcc},
......@@ -98,27 +98,27 @@ func TestToLogHash(t *testing.T) {
Removed: false,
}
}
relevantMods := []func(l *types.Log){
func(l *types.Log) { l.Address = common.Address{0xab, 0xcd} },
func(l *types.Log) { l.Topics = append(l.Topics, common.Hash{0x12, 0x34}) },
func(l *types.Log) { l.Topics = l.Topics[:len(l.Topics)-1] },
func(l *types.Log) { l.Topics[0] = common.Hash{0x12, 0x34} },
func(l *types.Log) { l.Data = append(l.Data, 0x56) },
func(l *types.Log) { l.Data = l.Data[:len(l.Data)-1] },
func(l *types.Log) { l.Data[0] = 0x45 },
relevantMods := []func(l *ethTypes.Log){
func(l *ethTypes.Log) { l.Address = common.Address{0xab, 0xcd} },
func(l *ethTypes.Log) { l.Topics = append(l.Topics, common.Hash{0x12, 0x34}) },
func(l *ethTypes.Log) { l.Topics = l.Topics[:len(l.Topics)-1] },
func(l *ethTypes.Log) { l.Topics[0] = common.Hash{0x12, 0x34} },
func(l *ethTypes.Log) { l.Data = append(l.Data, 0x56) },
func(l *ethTypes.Log) { l.Data = l.Data[:len(l.Data)-1] },
func(l *ethTypes.Log) { l.Data[0] = 0x45 },
}
irrelevantMods := []func(l *types.Log){
func(l *types.Log) { l.BlockNumber = 987 },
func(l *types.Log) { l.TxHash = common.Hash{0xab, 0xcd} },
func(l *types.Log) { l.TxIndex = 99 },
func(l *types.Log) { l.BlockHash = common.Hash{0xab, 0xcd} },
func(l *types.Log) { l.Index = 98 },
func(l *types.Log) { l.Removed = true },
irrelevantMods := []func(l *ethTypes.Log){
func(l *ethTypes.Log) { l.BlockNumber = 987 },
func(l *ethTypes.Log) { l.TxHash = common.Hash{0xab, 0xcd} },
func(l *ethTypes.Log) { l.TxIndex = 99 },
func(l *ethTypes.Log) { l.BlockHash = common.Hash{0xab, 0xcd} },
func(l *ethTypes.Log) { l.Index = 98 },
func(l *ethTypes.Log) { l.Removed = true },
}
refHash := logToHash(mkLog())
// The log hash is stored in the database so test that it matches the actual value.
// If this changes compatibility with existing databases may be affected
expectedRefHash := db.TruncateHash(common.HexToHash("0x4e1dc08fddeb273275f787762cdfe945cf47bb4e80a1fabbc7a825801e81b73f"))
expectedRefHash := types.TruncateHash(common.HexToHash("0x4e1dc08fddeb273275f787762cdfe945cf47bb4e80a1fabbc7a825801e81b73f"))
require.Equal(t, expectedRefHash, refHash, "reference hash changed, check that database compatibility is not broken")
// Check that the hash is changed when any data it should include changes
......@@ -141,7 +141,7 @@ type stubLogStorage struct {
logs []storedLog
}
func (s *stubLogStorage) AddLog(logHash db.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *db.ExecutingMessage) error {
func (s *stubLogStorage) AddLog(logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error {
s.logs = append(s.logs, storedLog{
block: block,
timestamp: timestamp,
......@@ -156,6 +156,6 @@ type storedLog struct {
block eth.BlockID
timestamp uint64
logIdx uint32
logHash db.TruncatedHash
execMsg *db.ExecutingMessage
logHash types.TruncatedHash
execMsg *types.ExecutingMessage
}
package db
package types
import (
"encoding/hex"
"errors"
"github.com/ethereum/go-ethereum/common"
)
var (
ErrLogOutOfOrder = errors.New("log out of order")
ErrDataCorruption = errors.New("data corruption")
ErrNotFound = errors.New("not found")
)
type TruncatedHash [20]byte
func TruncateHash(hash common.Hash) TruncatedHash {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment