Commit 99021b5e authored by protolambda's avatar protolambda Committed by GitHub

interop: fromda DB (#12436)

* op-supervisor: cleanup, refactor to take local-safe info from op-node

* interop: draft fromda DB

* op-supervisor: fromda DB fixes and tests

* fix test warnings

* add extra check ; remove LatestDerived, LatestDerivedFrom

---------
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
parent 3be573c9
......@@ -9,7 +9,14 @@ import (
"github.com/ethereum/go-ethereum/log"
)
const EntrySize = 34
type EntryStore[T EntryType, E Entry[T]] interface {
Size() int64
LastEntryIdx() EntryIdx
Read(idx EntryIdx) (E, error)
Append(entries ...E) error
Truncate(idx EntryIdx) error
Close() error
}
type EntryIdx int64
......@@ -18,10 +25,18 @@ type EntryType interface {
~uint8
}
type Entry[T EntryType] [EntrySize]byte
type Entry[T EntryType] interface {
Type() T
comparable
}
func (entry Entry[T]) Type() T {
return T(entry[0])
// Binary is the binary interface to encode/decode/size entries.
// This should be a zero-cost abstraction, and is bundled as interface for the EntryDB
// to have generic access to this functionality without const-generics for array size in Go.
type Binary[T EntryType, E Entry[T]] interface {
Append(dest []byte, e *E) []byte
ReadAt(dest *E, r io.ReaderAt, at int64) (n int, err error)
EntrySize() int
}
// dataAccess defines a minimal API required to manipulate the actual stored data.
......@@ -33,10 +48,12 @@ type dataAccess interface {
Truncate(size int64) error
}
type EntryDB[T EntryType] struct {
type EntryDB[T EntryType, E Entry[T], B Binary[T, E]] struct {
data dataAccess
lastEntryIdx EntryIdx
b B
cleanupFailedWrite bool
}
......@@ -45,7 +62,7 @@ type EntryDB[T EntryType] struct {
// If the file exists it will be used as the existing data.
// Returns ErrRecoveryRequired if the existing file is not a valid entry db. A EntryDB is still returned but all
// operations will return ErrRecoveryRequired until the Recover method is called.
func NewEntryDB[T EntryType](logger log.Logger, path string) (*EntryDB[T], error) {
func NewEntryDB[T EntryType, E Entry[T], B Binary[T, E]](logger log.Logger, path string) (*EntryDB[T, E, B], error) {
logger.Info("Opening entry database", "path", path)
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
......@@ -55,13 +72,14 @@ func NewEntryDB[T EntryType](logger log.Logger, path string) (*EntryDB[T], error
if err != nil {
return nil, fmt.Errorf("failed to stat database at %v: %w", path, err)
}
size := info.Size() / EntrySize
db := &EntryDB[T]{
var b B
size := info.Size() / int64(b.EntrySize())
db := &EntryDB[T, E, B]{
data: file,
lastEntryIdx: EntryIdx(size - 1),
}
if size*EntrySize != info.Size() {
logger.Warn("File size is not a multiple of entry size. Truncating to last complete entry", "fileSize", size, "entrySize", EntrySize)
if size*int64(b.EntrySize()) != info.Size() {
logger.Warn("File size is not a multiple of entry size. Truncating to last complete entry", "fileSize", size, "entrySize", b.EntrySize())
if err := db.recover(); err != nil {
return nil, fmt.Errorf("failed to recover database at %v: %w", path, err)
}
......@@ -69,24 +87,26 @@ func NewEntryDB[T EntryType](logger log.Logger, path string) (*EntryDB[T], error
return db, nil
}
func (e *EntryDB[T]) Size() int64 {
func (e *EntryDB[T, E, B]) Size() int64 {
return int64(e.lastEntryIdx) + 1
}
func (e *EntryDB[T]) LastEntryIdx() EntryIdx {
// LastEntryIdx returns the index of the last entry in the DB.
// This returns -1 if the DB is empty.
func (e *EntryDB[T, E, B]) LastEntryIdx() EntryIdx {
return e.lastEntryIdx
}
// Read an entry from the database by index. Returns io.EOF iff idx is after the last entry.
func (e *EntryDB[T]) Read(idx EntryIdx) (Entry[T], error) {
func (e *EntryDB[T, E, B]) Read(idx EntryIdx) (E, error) {
var out E
if idx > e.lastEntryIdx {
return Entry[T]{}, io.EOF
return out, io.EOF
}
var out Entry[T]
read, err := e.data.ReadAt(out[:], int64(idx)*EntrySize)
read, err := e.b.ReadAt(&out, e.data, int64(idx)*int64(e.b.EntrySize()))
// Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte
if err != nil && !(errors.Is(err, io.EOF) && read == EntrySize) {
return Entry[T]{}, fmt.Errorf("failed to read entry %v: %w", idx, err)
if err != nil && !(errors.Is(err, io.EOF) && read == e.b.EntrySize()) {
return out, fmt.Errorf("failed to read entry %v: %w", idx, err)
}
return out, nil
}
......@@ -95,16 +115,16 @@ func (e *EntryDB[T]) Read(idx EntryIdx) (Entry[T], error) {
// The entries are combined in memory and passed to a single Write invocation.
// If the write fails, it will attempt to truncate any partially written data.
// Subsequent writes to this instance will fail until partially written data is truncated.
func (e *EntryDB[T]) Append(entries ...Entry[T]) error {
func (e *EntryDB[T, E, B]) Append(entries ...E) error {
if e.cleanupFailedWrite {
// Try to rollback partially written data from a previous Append
if truncateErr := e.Truncate(e.lastEntryIdx); truncateErr != nil {
return fmt.Errorf("failed to recover from previous write error: %w", truncateErr)
}
}
data := make([]byte, 0, len(entries)*EntrySize)
for _, entry := range entries {
data = append(data, entry[:]...)
data := make([]byte, 0, len(entries)*e.b.EntrySize())
for i := range entries {
data = e.b.Append(data, &entries[i])
}
if n, err := e.data.Write(data); err != nil {
if n == 0 {
......@@ -125,8 +145,8 @@ func (e *EntryDB[T]) Append(entries ...Entry[T]) error {
}
// Truncate the database so that the last retained entry is idx. Any entries after idx are deleted.
func (e *EntryDB[T]) Truncate(idx EntryIdx) error {
if err := e.data.Truncate((int64(idx) + 1) * EntrySize); err != nil {
func (e *EntryDB[T, E, B]) Truncate(idx EntryIdx) error {
if err := e.data.Truncate((int64(idx) + 1) * int64(e.b.EntrySize())); err != nil {
return fmt.Errorf("failed to truncate to entry %v: %w", idx, err)
}
// Update the lastEntryIdx cache
......@@ -136,13 +156,13 @@ func (e *EntryDB[T]) Truncate(idx EntryIdx) error {
}
// recover an invalid database by truncating back to the last complete event.
func (e *EntryDB[T]) recover() error {
if err := e.data.Truncate((e.Size()) * EntrySize); err != nil {
func (e *EntryDB[T, E, B]) recover() error {
if err := e.data.Truncate(e.Size() * int64(e.b.EntrySize())); err != nil {
return fmt.Errorf("failed to truncate trailing partial entries: %w", err)
}
return nil
}
func (e *EntryDB[T]) Close() error {
func (e *EntryDB[T, E, B]) Close() error {
return e.data.Close()
}
......@@ -9,9 +9,11 @@ import (
"path/filepath"
"testing"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
type TestEntryType uint8
......@@ -20,7 +22,31 @@ func (typ TestEntryType) String() string {
return fmt.Sprintf("%d", uint8(typ))
}
type TestEntry = Entry[TestEntryType]
const TestEntrySize = 34
type TestEntry [TestEntrySize]byte
func (t TestEntry) Type() TestEntryType {
return TestEntryType(t[0])
}
type TestEntryBinary struct{}
func (TestEntryBinary) Append(dest []byte, e *TestEntry) []byte {
return append(dest, e[:]...)
}
func (TestEntryBinary) ReadAt(dest *TestEntry, r io.ReaderAt, at int64) (n int, err error) {
return r.ReadAt(dest[:], at)
}
func (TestEntryBinary) EntrySize() int {
return TestEntrySize
}
var _ Binary[TestEntryType, TestEntry] = TestEntryBinary{}
type TestEntryDB = EntryDB[TestEntryType, TestEntry, TestEntryBinary]
func TestReadWrite(t *testing.T) {
t.Run("BasicReadWrite", func(t *testing.T) {
......@@ -120,10 +146,10 @@ func TestTruncateTrailingPartialEntries(t *testing.T) {
entry2 := createEntry(2)
invalidData := make([]byte, len(entry1)+len(entry2)+4)
copy(invalidData, entry1[:])
copy(invalidData[EntrySize:], entry2[:])
copy(invalidData[TestEntrySize:], entry2[:])
invalidData[len(invalidData)-1] = 3 // Some invalid trailing data
require.NoError(t, os.WriteFile(file, invalidData, 0o644))
db, err := NewEntryDB[TestEntryType](logger, file)
db, err := NewEntryDB[TestEntryType, TestEntry, TestEntryBinary](logger, file)
require.NoError(t, err)
defer db.Close()
......@@ -131,7 +157,7 @@ func TestTruncateTrailingPartialEntries(t *testing.T) {
require.EqualValues(t, 2, db.Size())
stat, err := os.Stat(file)
require.NoError(t, err)
require.EqualValues(t, 2*EntrySize, stat.Size())
require.EqualValues(t, 2*TestEntrySize, stat.Size())
}
func TestWriteErrors(t *testing.T) {
......@@ -162,7 +188,7 @@ func TestWriteErrors(t *testing.T) {
t.Run("PartialWriteAndTruncateFails", func(t *testing.T) {
db, stubData := createEntryDBWithStubData()
stubData.writeErr = expectedErr
stubData.writeErrAfterBytes = EntrySize + 2
stubData.writeErrAfterBytes = TestEntrySize + 2
stubData.truncateErr = errors.New("boom")
err := db.Append(createEntry(1), createEntry(2))
require.ErrorIs(t, err, expectedErr)
......@@ -186,19 +212,19 @@ func TestWriteErrors(t *testing.T) {
})
}
func requireRead(t *testing.T, db *EntryDB[TestEntryType], idx EntryIdx, expected TestEntry) {
func requireRead(t *testing.T, db *TestEntryDB, idx EntryIdx, expected TestEntry) {
actual, err := db.Read(idx)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
func createEntry(i byte) TestEntry {
return TestEntry(bytes.Repeat([]byte{i}, EntrySize))
return TestEntry(bytes.Repeat([]byte{i}, TestEntrySize))
}
func createEntryDB(t *testing.T) *EntryDB[TestEntryType] {
func createEntryDB(t *testing.T) *TestEntryDB {
logger := testlog.Logger(t, log.LvlInfo)
db, err := NewEntryDB[TestEntryType](logger, filepath.Join(t.TempDir(), "entries.db"))
db, err := NewEntryDB[TestEntryType, TestEntry, TestEntryBinary](logger, filepath.Join(t.TempDir(), "entries.db"))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
......@@ -206,9 +232,9 @@ func createEntryDB(t *testing.T) *EntryDB[TestEntryType] {
return db
}
func createEntryDBWithStubData() (*EntryDB[TestEntryType], *stubDataAccess) {
func createEntryDBWithStubData() (*TestEntryDB, *stubDataAccess) {
stubData := &stubDataAccess{}
db := &EntryDB[TestEntryType]{data: stubData, lastEntryIdx: -1}
db := &EntryDB[TestEntryType, TestEntry, TestEntryBinary]{data: stubData, lastEntryIdx: -1}
return db, stubData
}
......
package entrydb
import (
"io"
)
type MemEntryStore[T EntryType, E Entry[T]] struct {
entries []E
}
func (s *MemEntryStore[T, E]) Size() int64 {
return int64(len(s.entries))
}
func (s *MemEntryStore[T, E]) LastEntryIdx() EntryIdx {
return EntryIdx(s.Size() - 1)
}
func (s *MemEntryStore[T, E]) Read(idx EntryIdx) (E, error) {
if idx < EntryIdx(len(s.entries)) {
return s.entries[idx], nil
}
var out E
return out, io.EOF
}
func (s *MemEntryStore[T, E]) Append(entries ...E) error {
s.entries = append(s.entries, entries...)
return nil
}
func (s *MemEntryStore[T, E]) Truncate(idx EntryIdx) error {
s.entries = s.entries[:min(s.Size()-1, int64(idx+1))]
return nil
}
func (s *MemEntryStore[T, E]) Close() error {
return nil
}
package fromda
import (
"cmp"
"fmt"
"sort"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type Metrics interface {
RecordDBDerivedEntryCount(count int64)
}
type EntryStore interface {
Size() int64
LastEntryIdx() entrydb.EntryIdx
Read(idx entrydb.EntryIdx) (Entry, error)
Append(entries ...Entry) error
Truncate(idx entrydb.EntryIdx) error
Close() error
}
// DB implements an append only database for log data and cross-chain dependencies.
// Each entry is fixed size, and denotes an increment in L1 (derived-from) and/or L2 (derived) block.
// Data is an append-only log, that can be binary searched for any necessary derivation-link data.
type DB struct {
log log.Logger
m Metrics
store EntryStore
rwLock sync.RWMutex
}
func NewFromFile(logger log.Logger, m Metrics, path string) (*DB, error) {
store, err := entrydb.NewEntryDB[EntryType, Entry, EntryBinary](logger, path)
if err != nil {
return nil, fmt.Errorf("failed to open DB: %w", err)
}
return NewFromEntryStore(logger, m, store)
}
func NewFromEntryStore(logger log.Logger, m Metrics, store EntryStore) (*DB, error) {
db := &DB{
log: logger,
m: m,
store: store,
}
db.m.RecordDBDerivedEntryCount(db.store.Size())
return db, nil
}
// Rewind to the last entry that was derived from a L1 block with the given block number.
func (db *DB) Rewind(derivedFrom uint64) error {
index, _, err := db.lastDerivedAt(derivedFrom)
if err != nil {
return fmt.Errorf("failed to find point to rewind to: %w", err)
}
err = db.store.Truncate(index)
if err != nil {
return err
}
db.m.RecordDBDerivedEntryCount(int64(index) + 1)
return nil
}
// Latest returns the last known values:
// derivedFrom: the L1 block that the L2 block is safe for (not necessarily the first, multiple L2 blocks may be derived from the same L1 block).
// derived: the L2 block that was derived (not necessarily the first, the L1 block may have been empty and repeated the last safe L2 block).
func (db *DB) Latest() (derivedFrom types.BlockSeal, derived types.BlockSeal, err error) {
db.rwLock.Lock()
defer db.rwLock.Unlock()
return db.latest()
}
// latest is like Latest, but without lock, for internal use.
func (db *DB) latest() (derivedFrom types.BlockSeal, derived types.BlockSeal, err error) {
lastIndex := db.store.LastEntryIdx()
if lastIndex < 0 {
return types.BlockSeal{}, types.BlockSeal{}, entrydb.ErrFuture
}
last, err := db.readAt(lastIndex)
if err != nil {
return types.BlockSeal{}, types.BlockSeal{}, fmt.Errorf("failed to read last derivation data: %w", err)
}
return last.derivedFrom, last.derived, nil
}
// LastDerivedAt returns the last L2 block derived from the given L1 block.
func (db *DB) LastDerivedAt(derivedFrom eth.BlockID) (derived types.BlockSeal, err error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
_, link, err := db.lastDerivedAt(derivedFrom.Number)
if err != nil {
return types.BlockSeal{}, err
}
if link.derivedFrom.ID() != derivedFrom {
return types.BlockSeal{}, fmt.Errorf("searched for last derived-from %s but found %s: %w",
derivedFrom, link.derivedFrom, entrydb.ErrConflict)
}
return link.derived, nil
}
// DerivedFrom determines where a L2 block was first derived from.
// (a L2 block may repeat if the following L1 blocks are empty and don't produce additional L2 blocks)
func (db *DB) DerivedFrom(derived eth.BlockID) (derivedFrom types.BlockSeal, err error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
_, link, err := db.firstDerivedFrom(derived.Number)
if err != nil {
return types.BlockSeal{}, err
}
if link.derived.ID() != derived {
return types.BlockSeal{}, fmt.Errorf("searched for first derived %s but found %s: %w",
derived, link.derived, entrydb.ErrConflict)
}
return link.derivedFrom, nil
}
func (db *DB) firstDerivedFrom(derived uint64) (entrydb.EntryIdx, LinkEntry, error) {
return db.find(false, func(link LinkEntry) int {
return cmp.Compare(link.derived.Number, derived)
})
}
func (db *DB) lastDerivedAt(derivedFrom uint64) (entrydb.EntryIdx, LinkEntry, error) {
// Reverse: prioritize the last entry.
return db.find(true, func(link LinkEntry) int {
return cmp.Compare(derivedFrom, link.derivedFrom.Number)
})
}
// find finds the first entry for which cmpFn(link) returns 0.
// The cmpFn entries to the left should return -1, entries to the right 1.
// If reverse, the cmpFn should be flipped too, and the last entry for which cmpFn(link) is 0 will be found.
func (db *DB) find(reverse bool, cmpFn func(link LinkEntry) int) (entrydb.EntryIdx, LinkEntry, error) {
n := db.store.Size()
if n == 0 {
return -1, LinkEntry{}, entrydb.ErrFuture
}
var searchErr error
// binary-search for the smallest index i for which cmp(i) >= 0
result := sort.Search(int(n), func(i int) bool {
at := entrydb.EntryIdx(i)
if reverse {
at = entrydb.EntryIdx(n) - 1 - at
}
entry, err := db.readAt(at)
if err != nil {
searchErr = err
return false
}
return cmpFn(entry) >= 0
})
if searchErr != nil {
return -1, LinkEntry{}, fmt.Errorf("failed to search: %w", searchErr)
}
if result == int(n) {
if reverse {
return -1, LinkEntry{}, fmt.Errorf("no entry found: %w", entrydb.ErrSkipped)
} else {
return -1, LinkEntry{}, fmt.Errorf("no entry found: %w", entrydb.ErrFuture)
}
}
if reverse {
result = int(n) - 1 - result
}
link, err := db.readAt(entrydb.EntryIdx(result))
if err != nil {
return -1, LinkEntry{}, fmt.Errorf("failed to read final result entry %d: %w", result, err)
}
if cmpFn(link) != 0 {
if reverse {
return -1, LinkEntry{}, fmt.Errorf("lowest entry %s is too high: %w", link, entrydb.ErrFuture)
} else {
return -1, LinkEntry{}, fmt.Errorf("lowest entry %s is too high: %w", link, entrydb.ErrSkipped)
}
}
if cmpFn(link) != 0 {
// Search should have returned lowest entry >= the target.
// And we already checked it's not > the target
panic(fmt.Errorf("invalid search result %s, did not match equality check", link))
}
return entrydb.EntryIdx(result), link, nil
}
func (db *DB) readAt(i entrydb.EntryIdx) (LinkEntry, error) {
entry, err := db.store.Read(i)
if err != nil {
return LinkEntry{}, err
}
var out LinkEntry
err = out.decode(entry)
return out, err
}
func (db *DB) Close() error {
db.rwLock.Lock()
defer db.rwLock.Unlock()
return db.store.Close()
}
package fromda
import (
"fmt"
"io"
"os"
"testing"
"github.com/stretchr/testify/require"
)
type statInvariant func(stat os.FileInfo, m *stubMetrics) error
type linkInvariant func(prev, current LinkEntry) error
// checkDBInvariants reads the database log directly and asserts a set of invariants on the data.
func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
stat, err := os.Stat(dbPath)
require.NoError(t, err)
statInvariants := []statInvariant{
invariantFileSizeMultipleOfEntrySize,
invariantFileSizeMatchesEntryCountMetric,
}
for _, invariant := range statInvariants {
require.NoError(t, invariant(stat, m))
}
// Read all entries as binary blobs
file, err := os.OpenFile(dbPath, os.O_RDONLY, 0o644)
require.NoError(t, err)
entries := make([]Entry, stat.Size()/EntrySize)
for i := range entries {
n, err := io.ReadFull(file, entries[i][:])
require.NoErrorf(t, err, "failed to read entry %v", i)
require.EqualValuesf(t, EntrySize, n, "read wrong length for entry %v", i)
}
var links []LinkEntry
for i, e := range entries {
var v LinkEntry
require.NoError(t, v.decode(e), "failed to decode entry %d", i)
links = append(links, v)
}
linkInvariants := []linkInvariant{
invariantDerivedTimestamp,
invariantDerivedFromTimestamp,
invariantNumberIncrement,
}
for i, link := range links {
if i == 0 {
continue
}
for _, invariant := range linkInvariants {
err := invariant(links[i-1], link)
if err != nil {
require.NoErrorf(t, err, "Invariant breached: \n%v", fmtEntries(entries))
}
}
}
}
func fmtEntries(entries []Entry) string {
out := ""
for i, entry := range entries {
out += fmt.Sprintf("%v: %x\n", i, entry)
}
return out
}
func invariantFileSizeMultipleOfEntrySize(stat os.FileInfo, _ *stubMetrics) error {
size := stat.Size()
if size%EntrySize != 0 {
return fmt.Errorf("expected file size to be a multiple of entry size (%v) but was %v", EntrySize, size)
}
return nil
}
func invariantFileSizeMatchesEntryCountMetric(stat os.FileInfo, m *stubMetrics) error {
size := stat.Size()
if m.DBDerivedEntryCount*EntrySize != size {
return fmt.Errorf("expected file size to be entryCount (%v) * entrySize (%v) = %v but was %v", m.DBDerivedEntryCount, EntrySize, m.DBDerivedEntryCount*EntrySize, size)
}
return nil
}
func invariantDerivedTimestamp(prev, current LinkEntry) error {
if current.derived.Timestamp < prev.derived.Timestamp {
return fmt.Errorf("derived timestamp must be >=, current: %s, prev: %s", current.derived, prev.derived)
}
return nil
}
func invariantNumberIncrement(prev, current LinkEntry) error {
// derived stays the same if the new L1 block is empty.
derivedSame := current.derived.Number == prev.derived.Number
// derivedFrom stays the same if this L2 block is derived from the same L1 block as the last L2 block
derivedFromSame := current.derivedFrom.Number == prev.derivedFrom.Number
// At least one of the two must increment, otherwise we are just repeating data in the DB.
if derivedSame && derivedFromSame {
return fmt.Errorf("expected at least either derivedFrom or derived to increment, but both have same number")
}
derivedIncrement := current.derived.Number == prev.derived.Number+1
derivedFromIncrement := current.derivedFrom.Number == prev.derivedFrom.Number+1
if !(derivedSame || derivedIncrement) {
return fmt.Errorf("expected derived to either stay the same or increment, got prev %s current %s", prev.derived, current.derived)
}
if !(derivedFromSame || derivedFromIncrement) {
return fmt.Errorf("expected derivedFrom to either stay the same or increment, got prev %s current %s", prev.derivedFrom, current.derivedFrom)
}
return nil
}
func invariantDerivedFromTimestamp(prev, current LinkEntry) error {
if current.derivedFrom.Timestamp < prev.derivedFrom.Timestamp {
return fmt.Errorf("derivedFrom timestamp must be >=, current: %s, prev: %s", current.derivedFrom, prev.derivedFrom)
}
return nil
}
This diff is collapsed.
package fromda
import (
"encoding/binary"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
const EntrySize = 100
type Entry [EntrySize]byte
func (e Entry) Type() EntryType {
return EntryType(e[0])
}
type EntryType uint8
const (
DerivedFromV0 EntryType = 0
)
func (s EntryType) String() string {
switch s {
case DerivedFromV0:
return "v0"
default:
return fmt.Sprintf("unknown(%d)", uint8(s))
}
}
type EntryBinary struct{}
func (EntryBinary) Append(dest []byte, e *Entry) []byte {
return append(dest, e[:]...)
}
func (EntryBinary) ReadAt(dest *Entry, r io.ReaderAt, at int64) (n int, err error) {
return r.ReadAt(dest[:], at)
}
func (EntryBinary) EntrySize() int {
return EntrySize
}
type LinkEntry struct {
derivedFrom types.BlockSeal
derived types.BlockSeal
}
func (d LinkEntry) String() string {
return fmt.Sprintf("LinkEntry(derivedFrom: %s, derived: %s)", d.derivedFrom, d.derived)
}
func (d *LinkEntry) decode(e Entry) error {
if e.Type() != DerivedFromV0 {
return fmt.Errorf("%w: unexpected entry type: %s", entrydb.ErrDataCorruption, e.Type())
}
if [3]byte(e[1:4]) != ([3]byte{}) {
return fmt.Errorf("%w: expected empty data, to pad entry size to round number: %x", entrydb.ErrDataCorruption, e[1:4])
}
offset := 4
d.derivedFrom.Number = binary.BigEndian.Uint64(e[offset : offset+8])
offset += 8
d.derivedFrom.Timestamp = binary.BigEndian.Uint64(e[offset : offset+8])
offset += 8
d.derived.Number = binary.BigEndian.Uint64(e[offset : offset+8])
offset += 8
d.derived.Timestamp = binary.BigEndian.Uint64(e[offset : offset+8])
offset += 8
copy(d.derivedFrom.Hash[:], e[offset:offset+32])
offset += 32
copy(d.derived.Hash[:], e[offset:offset+32])
return nil
}
func (d *LinkEntry) encode() Entry {
var out Entry
out[0] = uint8(DerivedFromV0)
offset := 4
binary.BigEndian.PutUint64(out[offset:offset+8], d.derivedFrom.Number)
offset += 8
binary.BigEndian.PutUint64(out[offset:offset+8], d.derivedFrom.Timestamp)
offset += 8
binary.BigEndian.PutUint64(out[offset:offset+8], d.derived.Number)
offset += 8
binary.BigEndian.PutUint64(out[offset:offset+8], d.derived.Timestamp)
offset += 8
copy(out[offset:offset+32], d.derivedFrom.Hash[:])
offset += 32
copy(out[offset:offset+32], d.derived.Hash[:])
return out
}
package fromda
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func FuzzRoundtripLinkEntry(f *testing.F) {
f.Fuzz(func(t *testing.T, aHash []byte, aNum uint64, aTimestamp uint64, bHash []byte, bNum uint64, bTimestamp uint64) {
x := LinkEntry{
derivedFrom: types.BlockSeal{
Hash: common.BytesToHash(aHash),
Number: aNum,
Timestamp: aTimestamp,
},
derived: types.BlockSeal{
Hash: common.BytesToHash(bHash),
Number: bNum,
Timestamp: bTimestamp,
},
}
entry := x.encode()
require.Equal(t, DerivedFromV0, entry.Type())
var y LinkEntry
err := y.decode(entry)
require.NoError(t, err)
require.Equal(t, x, y)
})
}
func TestLinkEntry(t *testing.T) {
t.Run("invalid type", func(t *testing.T) {
var entry Entry
entry[0] = 123
var x LinkEntry
require.ErrorContains(t, x.decode(entry), "unexpected")
})
}
package fromda
import (
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func (db *DB) AddDerived(derivedFrom eth.BlockRef, derived eth.BlockRef) error {
db.rwLock.Lock()
defer db.rwLock.Unlock()
// If we don't have any entries yet, allow any block to start things off
if db.store.Size() == 0 {
link := LinkEntry{
derivedFrom: types.BlockSeal{
Hash: derivedFrom.Hash,
Number: derivedFrom.Number,
Timestamp: derivedFrom.Time,
},
derived: types.BlockSeal{
Hash: derived.Hash,
Number: derived.Number,
Timestamp: derived.Time,
},
}
e := link.encode()
if err := db.store.Append(e); err != nil {
return err
}
db.m.RecordDBDerivedEntryCount(db.store.Size())
return nil
}
lastDerivedFrom, lastDerived, err := db.latest()
if err != nil {
return err
}
if lastDerived.ID() == derived.ID() && lastDerivedFrom.ID() == derivedFrom.ID() {
// it shouldn't be possible, but the ID component of a block ref doesn't include the timestamp
// so if the timestampt doesn't match, still return no error to the caller, but at least log a warning
if lastDerived.Timestamp != derived.Time {
db.log.Warn("Derived block already exists with different timestamp", "derived", derived, "lastDerived", lastDerived)
}
if lastDerivedFrom.Timestamp != derivedFrom.Time {
db.log.Warn("Derived-from block already exists with different timestamp", "derivedFrom", derivedFrom, "lastDerivedFrom", lastDerivedFrom)
}
// Repeat of same information. No entries to be written.
// But we can silently ignore and not return an error, as that brings the caller
// in a consistent state, after which it can insert the actual new derived-from information.
return nil
}
// Check derived relation: the L2 chain has to be sequential without gaps. An L2 block may repeat if the L1 block is empty.
if lastDerived.Number == derived.Number {
// Same block height? Then it must be the same block.
// I.e. we encountered an empty L1 block, and the same L2 block continues to be the last block that was derived from it.
if lastDerived.Hash != derived.Hash {
return fmt.Errorf("derived block %s conflicts with known derived block %s at same height: %w",
derived, lastDerived, entrydb.ErrConflict)
}
} else if lastDerived.Number+1 == derived.Number {
if lastDerived.Hash != derived.ParentHash {
return fmt.Errorf("derived block %s (parent %s) does not build on %s: %w",
derived, derived.ParentHash, lastDerived, entrydb.ErrConflict)
}
} else if lastDerived.Number+1 < derived.Number {
return fmt.Errorf("derived block %s (parent: %s) is too new, expected to build on top of %s: %w",
derived, derived.ParentHash, lastDerived, entrydb.ErrOutOfOrder)
} else {
return fmt.Errorf("derived block %s is older than current derived block %s: %w",
derived, lastDerived, entrydb.ErrOutOfOrder)
}
// Check derived-from relation: multiple L2 blocks may be derived from the same L1 block. But everything in sequence.
if lastDerivedFrom.Number == derivedFrom.Number {
// Same block height? Then it must be the same block.
if lastDerivedFrom.Hash != derivedFrom.Hash {
return fmt.Errorf("cannot add block %s as derived from %s, expected to be derived from %s at this block height: %w",
derived, derivedFrom, lastDerivedFrom, entrydb.ErrConflict)
}
} else if lastDerivedFrom.Number+1 == derivedFrom.Number {
// parent hash check
if lastDerivedFrom.Hash != derivedFrom.ParentHash {
return fmt.Errorf("cannot add block %s as derived from %s (parent %s) derived on top of %s: %w",
derived, derivedFrom, derivedFrom.ParentHash, lastDerivedFrom, entrydb.ErrConflict)
}
} else if lastDerivedFrom.Number+1 < derivedFrom.Number {
// adding block that is derived from something too far into the future
return fmt.Errorf("cannot add block %s as derived from %s, still deriving from %s: %w",
derived, derivedFrom, lastDerivedFrom, entrydb.ErrOutOfOrder)
} else {
// adding block that is derived from something too old
return fmt.Errorf("cannot add block %s as derived from %s, deriving already at %s: %w",
derived, derivedFrom, lastDerivedFrom, entrydb.ErrOutOfOrder)
}
link := LinkEntry{
derivedFrom: types.BlockSeal{
Hash: derivedFrom.Hash,
Number: derivedFrom.Number,
Timestamp: derivedFrom.Time,
},
derived: types.BlockSeal{
Hash: derived.Hash,
Number: derived.Number,
Timestamp: derived.Time,
},
}
e := link.encode()
if err := db.store.Append(e); err != nil {
return err
}
db.m.RecordDBDerivedEntryCount(db.store.Size())
return nil
}
package fromda
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type testCase struct {
name string
setupFn setupFn
assertFn assertFn
}
func TestBadUpdates(t *testing.T) {
aDerivedFrom := mockL1(1)
aDerived := mockL2(201)
bDerivedFrom := mockL1(2)
bDerived := mockL2(202)
cDerivedFrom := mockL1(3)
cDerived := mockL2(203)
dDerivedFrom := mockL1(4)
dDerived := mockL2(204)
eDerivedFrom := mockL1(5)
eDerived := mockL2(205)
fDerivedFrom := mockL1(6)
fDerived := mockL2(206)
noChange := assertFn(func(t *testing.T, db *DB, m *stubMetrics) {
derivedFrom, derived, err := db.Latest()
require.NoError(t, err)
require.Equal(t, dDerivedFrom, derivedFrom)
require.Equal(t, dDerived, derived)
})
testCases := []testCase{
{
name: "add on old derivedFrom",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(bDerivedFrom, aDerivedFrom.Hash), toRef(dDerived, cDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "repeat parent derivedFrom",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(cDerivedFrom, bDerivedFrom.Hash), toRef(dDerived, cDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "add on conflicting derivedFrom, same height. And new derived value",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(types.BlockSeal{
Hash: common.Hash{0xba, 0xd},
Number: dDerivedFrom.Number,
Timestamp: dDerivedFrom.Timestamp,
}, cDerivedFrom.Hash), toRef(eDerived, dDerived.Hash)), entrydb.ErrConflict)
},
assertFn: noChange,
},
{
name: "DerivedFrom with conflicting parent root, same L1 height, new L2: accepted, L1 parent-hash is used only on L1 increments.",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.NoError(t, db.AddDerived(toRef(dDerivedFrom, common.Hash{0x42}), toRef(eDerived, dDerived.Hash)), entrydb.ErrConflict)
},
assertFn: func(t *testing.T, db *DB, m *stubMetrics) {
derivedFrom, derived, err := db.Latest()
require.NoError(t, err)
require.Equal(t, dDerivedFrom, derivedFrom)
require.Equal(t, eDerived, derived)
},
},
{
name: "Conflicting derivedFrom parent root, new L1 height, same L2",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(eDerivedFrom, common.Hash{0x42}), toRef(dDerived, cDerived.Hash)), entrydb.ErrConflict)
},
assertFn: noChange,
},
{
name: "add on too new derivedFrom (even if parent-hash looks correct)",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(fDerivedFrom, dDerivedFrom.Hash), toRef(eDerived, dDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "add on old derivedFrom (even if parent-hash looks correct)",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(cDerivedFrom, bDerivedFrom.Hash), toRef(cDerived, dDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "add on even older derivedFrom",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(bDerivedFrom, aDerivedFrom.Hash), toRef(dDerived, cDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "add on conflicting derived, same L2 height, new L1 block",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(eDerivedFrom, dDerivedFrom.Hash), toRef(types.BlockSeal{
Hash: common.Hash{0x42},
Number: dDerived.Number,
Timestamp: dDerived.Timestamp,
}, cDerived.Hash)), entrydb.ErrConflict)
},
assertFn: noChange,
},
{
name: "add derived with conflicting parent hash, new L1 height, same L2 height: accepted, L2 parent-hash is only checked on L2 increments.",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.NoError(t, db.AddDerived(toRef(eDerivedFrom, dDerivedFrom.Hash), toRef(dDerived, common.Hash{0x42})), entrydb.ErrConflict)
},
assertFn: func(t *testing.T, db *DB, m *stubMetrics) {
derivedFrom, derived, err := db.Latest()
require.NoError(t, err)
require.Equal(t, eDerivedFrom, derivedFrom)
require.Equal(t, dDerived, derived)
},
},
{
name: "add derived with conflicting parent hash, same L1 height, new L2 height",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(dDerivedFrom, cDerivedFrom.Hash), toRef(eDerived, common.Hash{0x42})), entrydb.ErrConflict)
},
assertFn: noChange,
},
{
name: "add on too new derived (even if parent-hash looks correct)",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(dDerivedFrom, cDerivedFrom.Hash), toRef(fDerived, dDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "add on old derived (even if parent-hash looks correct)",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(dDerivedFrom, cDerivedFrom.Hash), toRef(cDerived, bDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "add on even older derived",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.AddDerived(toRef(dDerivedFrom, cDerivedFrom.Hash), toRef(bDerived, aDerived.Hash)), entrydb.ErrOutOfOrder)
},
assertFn: noChange,
},
{
name: "repeat self, silent no-op",
setupFn: func(t *testing.T, db *DB, m *stubMetrics) {
pre := m.DBDerivedEntryCount
require.NoError(t, db.AddDerived(toRef(dDerivedFrom, cDerivedFrom.Hash), toRef(dDerived, cDerived.Hash)), entrydb.ErrOutOfOrder)
require.Equal(t, pre, m.DBDerivedEntryCount)
},
assertFn: noChange,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
runDBTest(t,
func(t *testing.T, db *DB, m *stubMetrics) {
// Good first entry
require.NoError(t, db.AddDerived(toRef(dDerivedFrom, cDerivedFrom.Hash), toRef(dDerived, cDerived.Hash)))
// apply the test-case setup
tc.setupFn(t, db, m)
},
tc.assertFn)
})
}
}
......@@ -25,15 +25,6 @@ type Metrics interface {
RecordDBSearchEntriesRead(count int64)
}
type EntryStore interface {
Size() int64
LastEntryIdx() entrydb.EntryIdx
Read(idx entrydb.EntryIdx) (Entry, error)
Append(entries ...Entry) error
Truncate(idx entrydb.EntryIdx) error
Close() error
}
// DB implements an append only database for log data and cross-chain dependencies.
//
// To keep the append-only format, reduce data size, and support reorg detection and registering of executing-messages:
......@@ -44,21 +35,21 @@ type EntryStore interface {
type DB struct {
log log.Logger
m Metrics
store EntryStore
store entrydb.EntryStore[EntryType, Entry]
rwLock sync.RWMutex
lastEntryContext logContext
}
func NewFromFile(logger log.Logger, m Metrics, path string, trimToLastSealed bool) (*DB, error) {
store, err := entrydb.NewEntryDB[EntryType](logger, path)
store, err := entrydb.NewEntryDB[EntryType, Entry, EntryBinary](logger, path)
if err != nil {
return nil, fmt.Errorf("failed to open DB: %w", err)
}
return NewFromEntryStore(logger, m, store, trimToLastSealed)
}
func NewFromEntryStore(logger log.Logger, m Metrics, store EntryStore, trimToLastSealed bool) (*DB, error) {
func NewFromEntryStore(logger log.Logger, m Metrics, store entrydb.EntryStore[EntryType, Entry], trimToLastSealed bool) (*DB, error) {
db := &DB{
log: logger,
m: m,
......
......@@ -8,8 +8,6 @@ import (
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
)
type statInvariant func(stat os.FileInfo, m *stubMetrics) error
......@@ -31,11 +29,11 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
// Read all entries as binary blobs
file, err := os.OpenFile(dbPath, os.O_RDONLY, 0o644)
require.NoError(t, err)
entries := make([]Entry, stat.Size()/entrydb.EntrySize)
entries := make([]Entry, stat.Size()/EntrySize)
for i := range entries {
n, err := io.ReadFull(file, entries[i][:])
require.NoErrorf(t, err, "failed to read entry %v", i)
require.EqualValuesf(t, entrydb.EntrySize, n, "read wrong length for entry %v", i)
require.EqualValuesf(t, EntrySize, n, "read wrong length for entry %v", i)
}
entryInvariants := []entryInvariant{
......@@ -67,16 +65,16 @@ func fmtEntries(entries []Entry) string {
func invariantFileSizeMultipleOfEntrySize(stat os.FileInfo, _ *stubMetrics) error {
size := stat.Size()
if size%entrydb.EntrySize != 0 {
return fmt.Errorf("expected file size to be a multiple of entry size (%v) but was %v", entrydb.EntrySize, size)
if size%EntrySize != 0 {
return fmt.Errorf("expected file size to be a multiple of entry size (%v) but was %v", EntrySize, size)
}
return nil
}
func invariantFileSizeMatchesEntryCountMetric(stat os.FileInfo, m *stubMetrics) error {
size := stat.Size()
if m.entryCount*entrydb.EntrySize != size {
return fmt.Errorf("expected file size to be entryCount (%v) * entrySize (%v) = %v but was %v", m.entryCount, entrydb.EntrySize, m.entryCount*entrydb.EntrySize, size)
if m.entryCount*EntrySize != size {
return fmt.Errorf("expected file size to be entryCount (%v) * entrySize (%v) = %v but was %v", m.entryCount, EntrySize, m.entryCount*EntrySize, size)
}
return nil
}
......
......@@ -2,7 +2,6 @@ package logs
import (
"encoding/binary"
"io"
"io/fs"
"os"
"path/filepath"
......@@ -91,6 +90,7 @@ func TestLatestSealedBlockNum(t *testing.T) {
require.Zero(t, n)
idx, err := db.searchCheckpoint(0, 0)
require.ErrorIs(t, err, entrydb.ErrFuture, "no checkpoint in empty db")
require.ErrorIs(t, err, entrydb.ErrFuture, "no checkpoint in empty db")
require.Zero(t, idx)
})
})
......@@ -124,6 +124,7 @@ func TestLatestSealedBlockNum(t *testing.T) {
require.Zero(t, idx, "anchor block as checkpoint 0")
_, err = db.searchCheckpoint(0, 0)
require.ErrorIs(t, err, entrydb.ErrSkipped, "no checkpoint before genesis")
require.ErrorIs(t, err, entrydb.ErrSkipped, "no checkpoint before genesis")
})
})
t.Run("Block 1 case", func(t *testing.T) {
......@@ -176,6 +177,7 @@ func TestAddLog(t *testing.T) {
genesis := eth.BlockID{Hash: createHash(15), Number: 0}
err := db.AddLog(createHash(1), genesis, 0, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
})
})
......@@ -266,6 +268,7 @@ func TestAddLog(t *testing.T) {
bl14 := eth.BlockID{Hash: createHash(14), Number: 14}
err := db.SealBlock(createHash(13), bl14, 5000)
require.ErrorIs(t, err, entrydb.ErrConflict)
require.ErrorIs(t, err, entrydb.ErrConflict)
})
})
......@@ -283,6 +286,7 @@ func TestAddLog(t *testing.T) {
onto := eth.BlockID{Hash: createHash(14), Number: 14}
err := db.AddLog(createHash(1), onto, 0, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "cannot build logs on 14 when 15 is already sealed")
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "cannot build logs on 14 when 15 is already sealed")
})
})
......@@ -299,6 +303,7 @@ func TestAddLog(t *testing.T) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 0, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "already at log index 2")
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "already at log index 2")
})
})
......@@ -314,6 +319,7 @@ func TestAddLog(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) {
err := db.AddLog(createHash(1), eth.BlockID{Hash: createHash(16), Number: 16}, 0, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
})
})
......@@ -330,6 +336,7 @@ func TestAddLog(t *testing.T) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 1, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "already at log index 2")
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "already at log index 2")
})
})
......@@ -346,6 +353,7 @@ func TestAddLog(t *testing.T) {
bl15 := eth.BlockID{Hash: createHash(16), Number: 16}
err := db.AddLog(createHash(1), bl15, 2, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
})
})
......@@ -361,6 +369,7 @@ func TestAddLog(t *testing.T) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 2, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
})
})
......@@ -374,6 +383,7 @@ func TestAddLog(t *testing.T) {
bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
err := db.AddLog(createHash(1), bl15, 5, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
})
})
......@@ -395,6 +405,7 @@ func TestAddLog(t *testing.T) {
require.NoError(t, err)
err = db.AddLog(createHash(1), bl16, 1, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
})
})
......@@ -700,6 +711,8 @@ func TestGetBlockInfo(t *testing.T) {
func(t *testing.T, db *DB, m *stubMetrics) {
_, err := db.FindSealedBlock(10)
require.ErrorIs(t, err, entrydb.ErrFuture)
_, err = db.FindSealedBlock(10)
require.ErrorIs(t, err, entrydb.ErrFuture)
})
})
......@@ -715,6 +728,8 @@ func TestGetBlockInfo(t *testing.T) {
// if the DB starts at 11, then shouldn't find 10
_, err := db.FindSealedBlock(10)
require.ErrorIs(t, err, entrydb.ErrSkipped)
_, err = db.FindSealedBlock(10)
require.ErrorIs(t, err, entrydb.ErrSkipped)
})
})
......@@ -725,10 +740,14 @@ func TestGetBlockInfo(t *testing.T) {
require.NoError(t, db.SealBlock(common.Hash{}, block, 500))
},
func(t *testing.T, db *DB, m *stubMetrics) {
_, err := db.FindSealedBlock(block.Number)
require.NoError(t, err)
seal, err := db.FindSealedBlock(block.Number)
require.NoError(t, err)
require.Equal(t, block, seal.ID())
require.Equal(t, uint64(500), seal.Timestamp)
require.Equal(t, block, seal.ID())
require.Equal(t, uint64(500), seal.Timestamp)
})
})
}
......@@ -754,6 +773,7 @@ func requireConflicts(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logH
require.True(t, ok, "Did not get the expected metrics type")
_, err := db.Contains(blockNum, logIdx, logHash)
require.ErrorIs(t, err, entrydb.ErrConflict, "canonical chain must not include this log")
require.ErrorIs(t, err, entrydb.ErrConflict, "canonical chain must not include this log")
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency*2), "Should not need to read more than between two checkpoints")
}
......@@ -762,6 +782,7 @@ func requireFuture(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash
require.True(t, ok, "Did not get the expected metrics type")
_, err := db.Contains(blockNum, logIdx, logHash)
require.ErrorIs(t, err, entrydb.ErrFuture, "canonical chain does not yet include this log")
require.ErrorIs(t, err, entrydb.ErrFuture, "canonical chain does not yet include this log")
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency*2), "Should not need to read more than between two checkpoints")
}
......@@ -782,16 +803,16 @@ func requireExecutingMessage(t *testing.T, db *DB, blockNum uint64, logIdx uint3
}
func TestRecoverOnCreate(t *testing.T) {
createDb := func(t *testing.T, store *stubEntryStore) (*DB, *stubMetrics, error) {
createDb := func(t *testing.T, store *entrydb.MemEntryStore[EntryType, Entry]) (*DB, *stubMetrics, error) {
logger := testlog.Logger(t, log.LvlInfo)
m := &stubMetrics{}
db, err := NewFromEntryStore(logger, m, store, true)
return db, m, err
}
storeWithEvents := func(evts ...Entry) *stubEntryStore {
store := &stubEntryStore{}
store.entries = append(store.entries, evts...)
storeWithEvents := func(evts ...Entry) *entrydb.MemEntryStore[EntryType, Entry] {
store := &entrydb.MemEntryStore[EntryType, Entry]{}
_ = store.Append(evts...)
return store
}
t.Run("NoTruncateWhenLastEntryIsLogWithNoExecMessageSealed", func(t *testing.T) {
......@@ -922,9 +943,11 @@ func TestRewind(t *testing.T) {
t.Run("WhenEmpty", func(t *testing.T) {
runDBTest(t, func(t *testing.T, db *DB, m *stubMetrics) {},
func(t *testing.T, db *DB, m *stubMetrics) {
require.ErrorIs(t, db.Rewind(100), entrydb.ErrFuture)
require.ErrorIs(t, db.Rewind(100), entrydb.ErrFuture)
// Genesis is a block to, not present in an empty DB
require.ErrorIs(t, db.Rewind(0), entrydb.ErrFuture)
require.ErrorIs(t, db.Rewind(0), entrydb.ErrFuture)
})
})
......@@ -943,6 +966,7 @@ func TestRewind(t *testing.T) {
require.NoError(t, db.AddLog(createHash(4), bl52, 0, nil))
// cannot rewind to a block that is not sealed yet
require.ErrorIs(t, db.Rewind(53), entrydb.ErrFuture)
require.ErrorIs(t, db.Rewind(53), entrydb.ErrFuture)
},
func(t *testing.T, db *DB, m *stubMetrics) {
requireContains(t, db, 51, 0, createHash(1))
......@@ -962,6 +986,7 @@ func TestRewind(t *testing.T) {
require.NoError(t, db.AddLog(createHash(2), bl50, 1, nil))
// cannot go back to an unknown block
require.ErrorIs(t, db.Rewind(25), entrydb.ErrSkipped)
require.ErrorIs(t, db.Rewind(25), entrydb.ErrSkipped)
},
func(t *testing.T, db *DB, m *stubMetrics) {
requireContains(t, db, 51, 0, createHash(1))
......@@ -1087,11 +1112,13 @@ func TestRewind(t *testing.T) {
// 29 was deleted
err := db.AddLog(createHash(2), bl29, 1, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "Cannot add log on removed block")
require.ErrorIs(t, err, entrydb.ErrOutOfOrder, "Cannot add log on removed block")
// 15 is older, we have up to 16
bl15 := eth.BlockID{Hash: createHash(15), Number: 15}
// try to add a third log to 15
err = db.AddLog(createHash(10), bl15, 2, nil)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
require.ErrorIs(t, err, entrydb.ErrOutOfOrder)
bl16 := eth.BlockID{Hash: createHash(16), Number: 16}
// try to add a log to 17, on top of 16
err = db.AddLog(createHash(42), bl16, 0, nil)
......@@ -1116,37 +1143,4 @@ func (s *stubMetrics) RecordDBSearchEntriesRead(count int64) {
var _ Metrics = (*stubMetrics)(nil)
type stubEntryStore struct {
entries []Entry
}
func (s *stubEntryStore) Size() int64 {
return int64(len(s.entries))
}
func (s *stubEntryStore) LastEntryIdx() entrydb.EntryIdx {
return entrydb.EntryIdx(s.Size() - 1)
}
func (s *stubEntryStore) Read(idx entrydb.EntryIdx) (Entry, error) {
if idx < entrydb.EntryIdx(len(s.entries)) {
return s.entries[idx], nil
}
return Entry{}, io.EOF
}
func (s *stubEntryStore) Append(entries ...Entry) error {
s.entries = append(s.entries, entries...)
return nil
}
func (s *stubEntryStore) Truncate(idx entrydb.EntryIdx) error {
s.entries = s.entries[:min(s.Size()-1, int64(idx+1))]
return nil
}
func (s *stubEntryStore) Close() error {
return nil
}
var _ EntryStore = (*stubEntryStore)(nil)
var _ entrydb.EntryStore[EntryType, Entry] = (*entrydb.MemEntryStore[EntryType, Entry])(nil)
......@@ -2,16 +2,35 @@ package logs
import (
"fmt"
"io"
"strings"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
)
type EntryObj interface {
encode() Entry
}
type Entry = entrydb.Entry[EntryType]
const EntrySize = 34
type Entry [EntrySize]byte
func (e Entry) Type() EntryType {
return EntryType(e[0])
}
type EntryBinary struct{}
func (EntryBinary) Append(dest []byte, e *Entry) []byte {
return append(dest, e[:]...)
}
func (EntryBinary) ReadAt(dest *Entry, r io.ReaderAt, at int64) (n int, err error) {
return r.ReadAt(dest[:], at)
}
func (EntryBinary) EntrySize() int {
return EntrySize
}
type EntryTypeFlag uint8
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment