Commit 1cf5239c authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Add log db (#10902)

* op-supervisor: Introduce thread-unsafe log database

* op-supervisor: Add simple r/w locking

* op-supervisor: Add comment

* op-supervisor: Start switching to multi-entry database format

* op-supervisor: Improve test to cover the case where a new block starts at a search checkpoint boundary (other than at the start of the file)

* op-supervisor: Use a flag to indicate when log index should increment rather than a 1 byte increment amount.

* op-supervisor: Comment out unused stuff to make lint happy.

* op-supervisor: Load correct block number and log idx on init

* op-supervisor: Refactor state to only hold context that can always be kept up to date.

* op-supervisor: Support rewinding

* op-supervisor: Remove TODO that probably won't be done there

* op-supervisor: Require first log in block to have logIdx 0

* op-supervisor: Remove completed TODO.

* op-supervisor: Improve testing for logs not existing

* op-supervisor: Fix typo

* op-supervisor: Tidy up TODOs and pending tests.

* op-supervisor: Add invariant assertions for db data

* op-supervisor: Lock db in ClosestBlockInfo

* op-supervisor: Label alerts

* op-supervisor: Use a TruncatedHash for logs everywhere and make it a fixed size array.

* op-supervisor: Separate serialization of initating events

* op-supervisor: Separate serialization of other event types and enforce type code.

* op-supervisor: Introduce entry type

* op-supervisor: Split out an entry database

* op-supervisor: Introduce structs for entry types

* op-supervisor: Use a struct for CanonicalHash too
parent c54b656b
This diff is collapsed.
package db
import (
"fmt"
"io"
"os"
"testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/stretchr/testify/require"
)
type statInvariant func(stat os.FileInfo, m *stubMetrics) error
type entryInvariant func(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error
// checkDBInvariants reads the database log directly and asserts a set of invariants on the data.
func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
stat, err := os.Stat(dbPath)
require.NoError(t, err)
statInvariants := []statInvariant{
invariantFileSizeMultipleOfEntrySize,
invariantFileSizeMatchesEntryCountMetric,
}
for _, invariant := range statInvariants {
require.NoError(t, invariant(stat, m))
}
// Read all entries as binary blobs
file, err := os.OpenFile(dbPath, os.O_RDONLY, 0o644)
require.NoError(t, err)
entries := make([]entrydb.Entry, stat.Size()/entrydb.EntrySize)
for i := range entries {
n, err := io.ReadFull(file, entries[i][:])
require.NoErrorf(t, err, "failed to read entry %v", i)
require.EqualValuesf(t, entrydb.EntrySize, n, "read wrong length for entry %v", i)
}
entryInvariants := []entryInvariant{
invariantSearchCheckpointOnlyAtFrequency,
invariantSearchCheckpointAtEverySearchCheckpointFrequency,
invariantCanonicalHashAfterEverySearchCheckpoint,
invariantSearchCheckpointBeforeEveryCanonicalHash,
invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash,
}
for i, entry := range entries {
for _, invariant := range entryInvariants {
err := invariant(i, entry, entries, m)
if err != nil {
require.NoErrorf(t, err, "Invariant breached: \n%v", fmtEntries(entries))
}
}
}
}
func fmtEntries(entries []entrydb.Entry) string {
out := ""
for i, entry := range entries {
out += fmt.Sprintf("%v: %x\n", i, entry)
}
return out
}
func invariantFileSizeMultipleOfEntrySize(stat os.FileInfo, _ *stubMetrics) error {
size := stat.Size()
if size%entrydb.EntrySize != 0 {
return fmt.Errorf("expected file size to be a multiple of entry size (%v) but was %v", entrydb.EntrySize, size)
}
return nil
}
func invariantFileSizeMatchesEntryCountMetric(stat os.FileInfo, m *stubMetrics) error {
size := stat.Size()
if m.entryCount*entrydb.EntrySize != size {
return fmt.Errorf("expected file size to be entryCount (%v) * entrySize (%v) = %v but was %v", m.entryCount, entrydb.EntrySize, m.entryCount*entrydb.EntrySize, size)
}
return nil
}
func invariantSearchCheckpointOnlyAtFrequency(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeSearchCheckpoint {
return nil
}
if entryIdx%searchCheckpointFrequency != 0 {
return fmt.Errorf("should only have search checkpoints every %v entries but found at entry %v", searchCheckpointFrequency, entryIdx)
}
return nil
}
func invariantSearchCheckpointAtEverySearchCheckpointFrequency(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entryIdx%searchCheckpointFrequency == 0 && entry[0] != typeSearchCheckpoint {
return fmt.Errorf("should have search checkpoints every %v entries but entry %v was %x", searchCheckpointFrequency, entryIdx, entry)
}
return nil
}
func invariantCanonicalHashAfterEverySearchCheckpoint(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeSearchCheckpoint {
return nil
}
if entryIdx+1 >= len(entries) {
return fmt.Errorf("expected canonical hash after search checkpoint at entry %v but no further entries found", entryIdx)
}
nextEntry := entries[entryIdx+1]
if nextEntry[0] != typeCanonicalHash {
return fmt.Errorf("expected canonical hash after search checkpoint at entry %v but got %x", entryIdx, nextEntry)
}
return nil
}
// invariantSearchCheckpointBeforeEveryCanonicalHash ensures we don't have extra canonical-hash entries
func invariantSearchCheckpointBeforeEveryCanonicalHash(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeCanonicalHash {
return nil
}
if entryIdx == 0 {
return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but no previous entries present", entryIdx)
}
prevEntry := entries[entryIdx-1]
if prevEntry[0] != typeSearchCheckpoint {
return fmt.Errorf("expected search checkpoint before canonical hash at entry %v but got %x", entryIdx, prevEntry)
}
return nil
}
func invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeInitiatingEvent {
return nil
}
if entryIdx == 0 {
return fmt.Errorf("found initiating event at index %v before any search checkpoint", entryIdx)
}
blockDiff := entry[1]
flags := entry[2]
incrementsLogIdx := flags&eventFlagIncrementLogIdx != 0
prevEntry := entries[entryIdx-1]
prevEntryIsCanonicalHash := prevEntry[0] == typeCanonicalHash
if incrementsLogIdx && prevEntryIsCanonicalHash {
return fmt.Errorf("initiating event at index %v increments logIdx despite being immediately after canonical hash (prev entry %x)", entryIdx, prevEntry)
}
if incrementsLogIdx && blockDiff > 0 {
return fmt.Errorf("initiating event at index %v increments logIdx despite starting a new block", entryIdx)
}
if !incrementsLogIdx && !prevEntryIsCanonicalHash && blockDiff == 0 {
return fmt.Errorf("initiating event at index %v does not increment logIdx when block unchanged and not after canonical hash (prev entry %x)", entryIdx, prevEntry)
}
return nil
}
This diff is collapsed.
package db
import (
"encoding/binary"
"fmt"
"math"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
)
type searchCheckpoint struct {
blockNum uint64
logIdx uint32
timestamp uint64
}
func newSearchCheckpoint(blockNum uint64, logIdx uint32, timestamp uint64) searchCheckpoint {
return searchCheckpoint{
blockNum: blockNum,
logIdx: logIdx,
timestamp: timestamp,
}
}
func newSearchCheckpointFromEntry(data entrydb.Entry) (searchCheckpoint, error) {
if data[0] != typeSearchCheckpoint {
return searchCheckpoint{}, fmt.Errorf("%w: attempting to decode search checkpoint but was type %v", ErrDataCorruption, data[0])
}
return searchCheckpoint{
blockNum: binary.LittleEndian.Uint64(data[1:9]),
logIdx: binary.LittleEndian.Uint32(data[9:13]),
timestamp: binary.LittleEndian.Uint64(data[13:21]),
}, nil
}
// encode creates a search checkpoint entry
// type 0: "search checkpoint" <type><uint64 block number: 8 bytes><uint32 event index offset: 4 bytes><uint64 timestamp: 8 bytes> = 20 bytes
func (s searchCheckpoint) encode() entrydb.Entry {
var data entrydb.Entry
data[0] = typeSearchCheckpoint
binary.LittleEndian.PutUint64(data[1:9], s.blockNum)
binary.LittleEndian.PutUint32(data[9:13], s.logIdx)
binary.LittleEndian.PutUint64(data[13:21], s.timestamp)
return data
}
type canonicalHash struct {
hash TruncatedHash
}
func newCanonicalHash(hash TruncatedHash) canonicalHash {
return canonicalHash{hash: hash}
}
func newCanonicalHashFromEntry(data entrydb.Entry) (canonicalHash, error) {
if data[0] != typeCanonicalHash {
return canonicalHash{}, fmt.Errorf("%w: attempting to decode canonical hash but was type %v", ErrDataCorruption, data[0])
}
var truncated TruncatedHash
copy(truncated[:], data[1:21])
return newCanonicalHash(truncated), nil
}
func (c canonicalHash) encode() entrydb.Entry {
var entry entrydb.Entry
entry[0] = typeCanonicalHash
copy(entry[1:21], c.hash[:])
return entry
}
type initiatingEvent struct {
blockDiff uint8
incrementLogIdx bool
logHash TruncatedHash
}
func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) {
if data[0] != typeInitiatingEvent {
return initiatingEvent{}, fmt.Errorf("%w: attempting to decode initiating event but was type %v", ErrDataCorruption, data[0])
}
blockNumDiff := data[1]
flags := data[2]
return initiatingEvent{
blockDiff: blockNumDiff,
incrementLogIdx: flags&eventFlagIncrementLogIdx != 0,
logHash: TruncatedHash(data[3:23]),
}, nil
}
func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash TruncatedHash) (initiatingEvent, error) {
blockDiff := blockNum - pre.blockNum
if blockDiff > math.MaxUint8 {
// TODO(optimism#10857): Need to find a way to support this.
return initiatingEvent{}, fmt.Errorf("too many block skipped between %v and %v", pre.blockNum, blockNum)
}
currLogIdx := pre.logIdx
if blockDiff > 0 {
currLogIdx = 0
}
logDiff := logIdx - currLogIdx
if logDiff > 1 {
return initiatingEvent{}, fmt.Errorf("skipped logs between %v and %v", currLogIdx, logIdx)
}
return initiatingEvent{
blockDiff: uint8(blockDiff),
incrementLogIdx: logDiff > 0,
logHash: logHash,
}, nil
}
// encode creates an initiating event entry
// type 2: "initiating event" <type><blocknum diff: 1 byte><event flags: 1 byte><event-hash: 20 bytes> = 23 bytes
func (i initiatingEvent) encode() entrydb.Entry {
var data entrydb.Entry
data[0] = typeInitiatingEvent
data[1] = i.blockDiff
flags := byte(0)
if i.incrementLogIdx {
// Set flag to indicate log idx needs to be incremented (ie we're not directly after a checkpoint)
flags = flags | eventFlagIncrementLogIdx
}
data[2] = flags
copy(data[3:23], i.logHash[:])
return data
}
func (i initiatingEvent) postContext(pre logContext) logContext {
post := logContext{
blockNum: pre.blockNum + uint64(i.blockDiff),
logIdx: pre.logIdx,
}
if i.blockDiff > 0 {
post.logIdx = 0
}
if i.incrementLogIdx {
post.logIdx++
}
return post
}
package entrydb
import (
"errors"
"fmt"
"io"
"os"
)
const (
EntrySize = 24
)
type Entry [EntrySize]byte
// dataAccess defines a minimal API required to manipulate the actual stored data.
// It is a subset of the os.File API but could (theoretically) be satisfied by an in-memory implementation for testing.
type dataAccess interface {
io.ReaderAt
io.Writer
io.Closer
Truncate(size int64) error
}
type EntryDB struct {
data dataAccess
lastEntryIdx int64
}
func NewEntryDB(path string) (*EntryDB, error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o666)
if err != nil {
return nil, fmt.Errorf("failed to open database at %v: %w", path, err)
}
info, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("failed to stat database at %v: %w", path, err)
}
lastEntryIdx := info.Size()/EntrySize - 1
return &EntryDB{
data: file,
lastEntryIdx: lastEntryIdx,
}, nil
}
func (e *EntryDB) Size() int64 {
return e.lastEntryIdx + 1
}
func (e *EntryDB) Read(idx int64) (Entry, error) {
var out Entry
read, err := e.data.ReadAt(out[:], idx*EntrySize)
// Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte
if err != nil && !(errors.Is(err, io.EOF) && read == EntrySize) {
return Entry{}, fmt.Errorf("failed to read entry %v: %w", idx, err)
}
return out, nil
}
func (e *EntryDB) Append(entries ...Entry) error {
for _, entry := range entries {
if _, err := e.data.Write(entry[:]); err != nil {
// TODO(optimism#10857): When a write fails, need to revert any in memory changes and truncate back to the
// pre-write state. Likely need to batch writes for multiple entries into a single write akin to transactions
// to avoid leaving hanging entries without the entry that should follow them.
return err
}
e.lastEntryIdx++
}
return nil
}
func (e *EntryDB) Truncate(idx int64) error {
if err := e.data.Truncate((idx + 1) * EntrySize); err != nil {
return fmt.Errorf("failed to truncate to entry %v: %w", idx, err)
}
// Update the lastEntryIdx cache and then use db.init() to find the log context for the new latest log entry
e.lastEntryIdx = idx
return nil
}
func (e *EntryDB) Close() error {
return e.data.Close()
}
package entrydb
import (
"bytes"
"io"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
func TestReadWrite(t *testing.T) {
t.Run("BasicReadWrite", func(t *testing.T) {
db := createEntryDB(t)
require.NoError(t, db.Append(createEntry(1)))
require.NoError(t, db.Append(createEntry(2)))
require.NoError(t, db.Append(createEntry(3)))
require.NoError(t, db.Append(createEntry(4)))
requireRead(t, db, 0, createEntry(1))
requireRead(t, db, 1, createEntry(2))
requireRead(t, db, 2, createEntry(3))
requireRead(t, db, 3, createEntry(4))
// Check we can read out of order
requireRead(t, db, 1, createEntry(2))
})
t.Run("ReadPastEndOfFileReturnsEOF", func(t *testing.T) {
db := createEntryDB(t)
_, err := db.Read(0)
require.ErrorIs(t, err, io.EOF)
})
t.Run("WriteMultiple", func(t *testing.T) {
db := createEntryDB(t)
require.NoError(t, db.Append(
createEntry(1),
createEntry(2),
createEntry(3),
))
requireRead(t, db, 0, createEntry(1))
requireRead(t, db, 1, createEntry(2))
requireRead(t, db, 2, createEntry(3))
})
}
func TestTruncate(t *testing.T) {
db := createEntryDB(t)
require.NoError(t, db.Append(createEntry(1)))
require.NoError(t, db.Append(createEntry(2)))
require.NoError(t, db.Append(createEntry(3)))
require.NoError(t, db.Append(createEntry(4)))
require.NoError(t, db.Append(createEntry(5)))
require.NoError(t, db.Truncate(3))
requireRead(t, db, 0, createEntry(1))
requireRead(t, db, 1, createEntry(2))
requireRead(t, db, 2, createEntry(3))
// 4 and 5 have been removed
_, err := db.Read(4)
require.ErrorIs(t, err, io.EOF)
_, err = db.Read(5)
require.ErrorIs(t, err, io.EOF)
}
func requireRead(t *testing.T, db *EntryDB, idx int64, expected Entry) {
actual, err := db.Read(idx)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
func createEntry(i byte) Entry {
return Entry(bytes.Repeat([]byte{i}, EntrySize))
}
func createEntryDB(t *testing.T) *EntryDB {
db, err := NewEntryDB(filepath.Join(t.TempDir(), "entries.db"))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
return db
}
package db
import (
"fmt"
"io"
)
type iterator struct {
db *DB
nextEntryIdx int64
current logContext
entriesRead int64
}
func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedHash, outErr error) {
for i.nextEntryIdx <= i.db.lastEntryIdx() {
entryIdx := i.nextEntryIdx
entry, err := i.db.store.Read(entryIdx)
if err != nil {
outErr = fmt.Errorf("failed to read entry %v: %w", i, err)
return
}
i.nextEntryIdx++
i.entriesRead++
switch entry[0] {
case typeSearchCheckpoint:
current, err := newSearchCheckpointFromEntry(entry)
if err != nil {
outErr = fmt.Errorf("failed to parse search checkpoint at idx %v: %w", entryIdx, err)
return
}
i.current.blockNum = current.blockNum
i.current.logIdx = current.logIdx
case typeCanonicalHash:
// Skip
case typeInitiatingEvent:
evt, err := newInitiatingEventFromEntry(entry)
if err != nil {
outErr = fmt.Errorf("failed to parse initiating event at idx %v: %w", entryIdx, err)
return
}
i.current = evt.postContext(i.current)
blockNum = i.current.blockNum
logIdx = i.current.logIdx
evtHash = evt.logHash
return
case typeExecutingCheck:
// TODO(optimism#10857): Handle this properly
case typeExecutingLink:
// TODO(optimism#10857): Handle this properly
default:
outErr = fmt.Errorf("unknown entry type at idx %v %v", entryIdx, entry[0])
return
}
}
outErr = io.EOF
return
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment