Commit f3da9a53 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

Record executing messages in log db and handle write failures (#10973)

* op-supervisor: Support recording executing message info

* op-supervisor: Reduce permissions

* op-supervisor: Implement recovery for entry db

* op-supervisor: Track size in entrydb instead of last entry idx

* op-supervisor: Trim log entries back to the last valid ending point

* op-supervisor: Remove the manual recover operations since recovery at startup is automatic

* op-supervisor: Add support for writing multiple entries in a single write

* op-supervisor: Write all entries for a log in one append call.

Only update in-memory state after the write succeeds.

* op-supervisor: Handle partial writes

* op-supervisor: Extract logic to reverse an init event

* op-supervisor: Use errors.New

* op-supervisor: Combine the two AddLog variants.

* op-supervisor: Remove place holder tests.

* op-supervisor: Separate Executes and Contains queries

* op-supervisor: Only read executing message when it is actually used.
parent 1e0ea43e
This diff is collapsed.
package db package db
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
...@@ -42,6 +43,11 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) { ...@@ -42,6 +43,11 @@ func checkDBInvariants(t *testing.T, dbPath string, m *stubMetrics) {
invariantCanonicalHashAfterEverySearchCheckpoint, invariantCanonicalHashAfterEverySearchCheckpoint,
invariantSearchCheckpointBeforeEveryCanonicalHash, invariantSearchCheckpointBeforeEveryCanonicalHash,
invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash, invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash,
invariantExecLinkAfterInitEventWithFlagSet,
invariantExecLinkOnlyAfterInitiatingEventWithFlagSet,
invariantExecCheckAfterExecLink,
invariantExecCheckOnlyAfterExecLink,
invariantValidLastEntry,
} }
for i, entry := range entries { for i, entry := range entries {
for _, invariant := range entryInvariants { for _, invariant := range entryInvariants {
...@@ -146,3 +152,109 @@ func invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash(entryIdx int, en ...@@ -146,3 +152,109 @@ func invariantIncrementLogIdxIfNotImmediatelyAfterCanonicalHash(entryIdx int, en
} }
return nil return nil
} }
func invariantExecLinkAfterInitEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeInitiatingEvent {
return nil
}
hasExecMessage := entry[2]&eventFlagHasExecutingMessage != 0
if !hasExecMessage {
return nil
}
linkIdx := entryIdx + 1
if linkIdx%searchCheckpointFrequency == 0 {
linkIdx += 2 // Skip over the search checkpoint and canonical hash events
}
if len(entries) <= linkIdx {
return fmt.Errorf("expected executing link after initiating event with exec msg flag set at entry %v but there were no more events", entryIdx)
}
if entries[linkIdx][0] != typeExecutingLink {
return fmt.Errorf("expected executing link at idx %v after initiating event with exec msg flag set at entry %v but got type %v", linkIdx, entryIdx, entries[linkIdx][0])
}
return nil
}
func invariantExecLinkOnlyAfterInitiatingEventWithFlagSet(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeExecutingLink {
return nil
}
if entryIdx == 0 {
return errors.New("found executing link as first entry")
}
initIdx := entryIdx - 1
if initIdx%searchCheckpointFrequency == 1 {
initIdx -= 2 // Skip the canonical hash and search checkpoint entries
}
if initIdx < 0 {
return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx)
}
initEntry := entries[initIdx]
if initEntry[0] != typeInitiatingEvent {
return fmt.Errorf("expected initiating event at entry %v prior to executing link at %v but got %x", initIdx, entryIdx, initEntry[0])
}
flags := initEntry[2]
if flags&eventFlagHasExecutingMessage == 0 {
return fmt.Errorf("initiating event at %v prior to executing link at %v does not have flag set to indicate needing a executing event: %x", initIdx, entryIdx, initEntry)
}
return nil
}
func invariantExecCheckAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeExecutingLink {
return nil
}
checkIdx := entryIdx + 1
if checkIdx%searchCheckpointFrequency == 0 {
checkIdx += 2 // Skip the search checkpoint and canonical hash entries
}
if checkIdx >= len(entries) {
return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but ran out of entries", entryIdx, checkIdx)
}
checkEntry := entries[checkIdx]
if checkEntry[0] != typeExecutingCheck {
return fmt.Errorf("expected executing link at %v to be followed by executing check at %v but got type %v", entryIdx, checkIdx, checkEntry[0])
}
return nil
}
func invariantExecCheckOnlyAfterExecLink(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entry[0] != typeExecutingCheck {
return nil
}
if entryIdx == 0 {
return errors.New("found executing check as first entry")
}
linkIdx := entryIdx - 1
if linkIdx%searchCheckpointFrequency == 1 {
linkIdx -= 2 // Skip the canonical hash and search checkpoint entries
}
if linkIdx < 0 {
return fmt.Errorf("found executing link without a preceding initiating event at entry %v", entryIdx)
}
linkEntry := entries[linkIdx]
if linkEntry[0] != typeExecutingLink {
return fmt.Errorf("expected executing link at entry %v prior to executing check at %v but got %x", linkIdx, entryIdx, linkEntry[0])
}
return nil
}
// invariantValidLastEntry checks that the last entry is either a executing check or initiating event with no exec message
func invariantValidLastEntry(entryIdx int, entry entrydb.Entry, entries []entrydb.Entry, m *stubMetrics) error {
if entryIdx+1 < len(entries) {
return nil
}
if entry[0] == typeExecutingCheck {
return nil
}
if entry[0] != typeInitiatingEvent {
return fmt.Errorf("invalid final event type: %v", entry[0])
}
evt, err := newInitiatingEventFromEntry(entry)
if err != nil {
return fmt.Errorf("final event was invalid: %w", err)
}
if evt.hasExecMsg {
return errors.New("ends with init event that should have exec msg but no exec msg follows")
}
return nil
}
...@@ -71,6 +71,7 @@ func (c canonicalHash) encode() entrydb.Entry { ...@@ -71,6 +71,7 @@ func (c canonicalHash) encode() entrydb.Entry {
type initiatingEvent struct { type initiatingEvent struct {
blockDiff uint8 blockDiff uint8
incrementLogIdx bool incrementLogIdx bool
hasExecMsg bool
logHash TruncatedHash logHash TruncatedHash
} }
...@@ -83,11 +84,12 @@ func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) { ...@@ -83,11 +84,12 @@ func newInitiatingEventFromEntry(data entrydb.Entry) (initiatingEvent, error) {
return initiatingEvent{ return initiatingEvent{
blockDiff: blockNumDiff, blockDiff: blockNumDiff,
incrementLogIdx: flags&eventFlagIncrementLogIdx != 0, incrementLogIdx: flags&eventFlagIncrementLogIdx != 0,
hasExecMsg: flags&eventFlagHasExecutingMessage != 0,
logHash: TruncatedHash(data[3:23]), logHash: TruncatedHash(data[3:23]),
}, nil }, nil
} }
func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash TruncatedHash) (initiatingEvent, error) { func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash TruncatedHash, hasExecMsg bool) (initiatingEvent, error) {
blockDiff := blockNum - pre.blockNum blockDiff := blockNum - pre.blockNum
if blockDiff > math.MaxUint8 { if blockDiff > math.MaxUint8 {
// TODO(optimism#10857): Need to find a way to support this. // TODO(optimism#10857): Need to find a way to support this.
...@@ -106,6 +108,7 @@ func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash ...@@ -106,6 +108,7 @@ func newInitiatingEvent(pre logContext, blockNum uint64, logIdx uint32, logHash
return initiatingEvent{ return initiatingEvent{
blockDiff: uint8(blockDiff), blockDiff: uint8(blockDiff),
incrementLogIdx: logDiff > 0, incrementLogIdx: logDiff > 0,
hasExecMsg: hasExecMsg,
logHash: logHash, logHash: logHash,
}, nil }, nil
} }
...@@ -121,6 +124,9 @@ func (i initiatingEvent) encode() entrydb.Entry { ...@@ -121,6 +124,9 @@ func (i initiatingEvent) encode() entrydb.Entry {
// Set flag to indicate log idx needs to be incremented (ie we're not directly after a checkpoint) // Set flag to indicate log idx needs to be incremented (ie we're not directly after a checkpoint)
flags = flags | eventFlagIncrementLogIdx flags = flags | eventFlagIncrementLogIdx
} }
if i.hasExecMsg {
flags = flags | eventFlagHasExecutingMessage
}
data[2] = flags data[2] = flags
copy(data[3:23], i.logHash[:]) copy(data[3:23], i.logHash[:])
return data return data
...@@ -139,3 +145,106 @@ func (i initiatingEvent) postContext(pre logContext) logContext { ...@@ -139,3 +145,106 @@ func (i initiatingEvent) postContext(pre logContext) logContext {
} }
return post return post
} }
// preContext is the reverse of postContext and calculates the logContext required as input to get the specified post
// context after applying this init event.
func (i initiatingEvent) preContext(post logContext) logContext {
pre := post
pre.blockNum = post.blockNum - uint64(i.blockDiff)
if i.incrementLogIdx {
pre.logIdx--
}
return pre
}
type executingLink struct {
chain uint32
blockNum uint64
logIdx uint32
timestamp uint64
}
func newExecutingLink(msg ExecutingMessage) (executingLink, error) {
if msg.LogIdx > 1<<24 {
return executingLink{}, fmt.Errorf("log idx is too large (%v)", msg.LogIdx)
}
return executingLink{
chain: msg.Chain,
blockNum: msg.BlockNum,
logIdx: msg.LogIdx,
timestamp: msg.Timestamp,
}, nil
}
func newExecutingLinkFromEntry(data entrydb.Entry) (executingLink, error) {
if data[0] != typeExecutingLink {
return executingLink{}, fmt.Errorf("%w: attempting to decode executing link but was type %v", ErrDataCorruption, data[0])
}
timestamp := binary.LittleEndian.Uint64(data[16:24])
return executingLink{
chain: binary.LittleEndian.Uint32(data[1:5]),
blockNum: binary.LittleEndian.Uint64(data[5:13]),
logIdx: uint32(data[13]) | uint32(data[14])<<8 | uint32(data[15])<<16,
timestamp: timestamp,
}, nil
}
// encode creates an executing link entry
// type 3: "executing link" <type><chain: 4 bytes><blocknum: 8 bytes><event index: 3 bytes><uint64 timestamp: 8 bytes> = 24 bytes
func (e executingLink) encode() entrydb.Entry {
var entry entrydb.Entry
entry[0] = typeExecutingLink
binary.LittleEndian.PutUint32(entry[1:5], e.chain)
binary.LittleEndian.PutUint64(entry[5:13], e.blockNum)
entry[13] = byte(e.logIdx)
entry[14] = byte(e.logIdx >> 8)
entry[15] = byte(e.logIdx >> 16)
binary.LittleEndian.PutUint64(entry[16:24], e.timestamp)
return entry
}
type executingCheck struct {
hash TruncatedHash
}
func newExecutingCheck(hash TruncatedHash) executingCheck {
return executingCheck{hash: hash}
}
func newExecutingCheckFromEntry(entry entrydb.Entry) (executingCheck, error) {
if entry[0] != typeExecutingCheck {
return executingCheck{}, fmt.Errorf("%w: attempting to decode executing check but was type %v", ErrDataCorruption, entry[0])
}
var hash TruncatedHash
copy(hash[:], entry[1:21])
return newExecutingCheck(hash), nil
}
// encode creates an executing check entry
// type 4: "executing check" <type><event-hash: 20 bytes> = 21 bytes
func (e executingCheck) encode() entrydb.Entry {
var entry entrydb.Entry
entry[0] = typeExecutingCheck
copy(entry[1:21], e.hash[:])
return entry
}
func newExecutingMessageFromEntries(linkEntry entrydb.Entry, checkEntry entrydb.Entry) (ExecutingMessage, error) {
link, err := newExecutingLinkFromEntry(linkEntry)
if err != nil {
return ExecutingMessage{}, fmt.Errorf("invalid executing link: %w", err)
}
check, err := newExecutingCheckFromEntry(checkEntry)
if err != nil {
return ExecutingMessage{}, fmt.Errorf("invalid executing check: %w", err)
}
return ExecutingMessage{
Chain: link.chain,
BlockNum: link.blockNum,
LogIdx: link.logIdx,
Timestamp: link.timestamp,
Hash: check.hash,
}, nil
}
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"github.com/ethereum/go-ethereum/log"
) )
const ( const (
...@@ -23,12 +25,20 @@ type dataAccess interface { ...@@ -23,12 +25,20 @@ type dataAccess interface {
} }
type EntryDB struct { type EntryDB struct {
data dataAccess data dataAccess
lastEntryIdx int64 size int64
cleanupFailedWrite bool
} }
func NewEntryDB(path string) (*EntryDB, error) { // NewEntryDB creates an EntryDB. A new file will be created if the specified path does not exist,
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o666) // but parent directories will not be created.
// If the file exists it will be used as the existing data.
// Returns ErrRecoveryRequired if the existing file is not a valid entry db. A EntryDB is still returned but all
// operations will return ErrRecoveryRequired until the Recover method is called.
func NewEntryDB(logger log.Logger, path string) (*EntryDB, error) {
logger.Info("Opening entry database", "path", path)
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open database at %v: %w", path, err) return nil, fmt.Errorf("failed to open database at %v: %w", path, err)
} }
...@@ -36,18 +46,29 @@ func NewEntryDB(path string) (*EntryDB, error) { ...@@ -36,18 +46,29 @@ func NewEntryDB(path string) (*EntryDB, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to stat database at %v: %w", path, err) return nil, fmt.Errorf("failed to stat database at %v: %w", path, err)
} }
lastEntryIdx := info.Size()/EntrySize - 1 size := info.Size() / EntrySize
return &EntryDB{ db := &EntryDB{
data: file, data: file,
lastEntryIdx: lastEntryIdx, size: size,
}, nil }
if size*EntrySize != info.Size() {
logger.Warn("File size (%v) is nut a multiple of entry size %v. Truncating to last complete entry", size, EntrySize)
if err := db.recover(); err != nil {
return nil, fmt.Errorf("failed to recover database at %v: %w", path, err)
}
}
return db, nil
} }
func (e *EntryDB) Size() int64 { func (e *EntryDB) Size() int64 {
return e.lastEntryIdx + 1 return e.size
} }
// Read an entry from the database by index. Returns io.EOF iff idx is after the last entry.
func (e *EntryDB) Read(idx int64) (Entry, error) { func (e *EntryDB) Read(idx int64) (Entry, error) {
if idx >= e.size {
return Entry{}, io.EOF
}
var out Entry var out Entry
read, err := e.data.ReadAt(out[:], idx*EntrySize) read, err := e.data.ReadAt(out[:], idx*EntrySize)
// Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte // Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte
...@@ -57,25 +78,55 @@ func (e *EntryDB) Read(idx int64) (Entry, error) { ...@@ -57,25 +78,55 @@ func (e *EntryDB) Read(idx int64) (Entry, error) {
return out, nil return out, nil
} }
// Append entries to the database.
// The entries are combined in memory and passed to a single Write invocation.
// If the write fails, it will attempt to truncate any partially written data.
// Subsequent writes to this instance will fail until partially written data is truncated.
func (e *EntryDB) Append(entries ...Entry) error { func (e *EntryDB) Append(entries ...Entry) error {
if e.cleanupFailedWrite {
// Try to rollback partially written data from a previous Append
if truncateErr := e.Truncate(e.size - 1); truncateErr != nil {
return fmt.Errorf("failed to recover from previous write error: %w", truncateErr)
}
}
data := make([]byte, 0, len(entries)*EntrySize)
for _, entry := range entries { for _, entry := range entries {
if _, err := e.data.Write(entry[:]); err != nil { data = append(data, entry[:]...)
// TODO(optimism#10857): When a write fails, need to revert any in memory changes and truncate back to the }
// pre-write state. Likely need to batch writes for multiple entries into a single write akin to transactions if n, err := e.data.Write(data); err != nil {
// to avoid leaving hanging entries without the entry that should follow them. if n == 0 {
// Didn't write any data, so no recovery required
return err return err
} }
e.lastEntryIdx++ // Try to rollback the partially written data
if truncateErr := e.Truncate(e.size - 1); truncateErr != nil {
// Failed to rollback, set a flag to attempt the clean up on the next write
e.cleanupFailedWrite = true
return errors.Join(err, fmt.Errorf("failed to remove partially written data: %w", truncateErr))
}
// Successfully rolled back the changes, still report the failed write
return err
} }
e.size += int64(len(entries))
return nil return nil
} }
// Truncate the database so that the last retained entry is idx. Any entries after idx are deleted.
func (e *EntryDB) Truncate(idx int64) error { func (e *EntryDB) Truncate(idx int64) error {
if err := e.data.Truncate((idx + 1) * EntrySize); err != nil { if err := e.data.Truncate((idx + 1) * EntrySize); err != nil {
return fmt.Errorf("failed to truncate to entry %v: %w", idx, err) return fmt.Errorf("failed to truncate to entry %v: %w", idx, err)
} }
// Update the lastEntryIdx cache and then use db.init() to find the log context for the new latest log entry // Update the lastEntryIdx cache
e.lastEntryIdx = idx e.size = idx + 1
e.cleanupFailedWrite = false
return nil
}
// recover an invalid database by truncating back to the last complete event.
func (e *EntryDB) recover() error {
if err := e.data.Truncate((e.size) * EntrySize); err != nil {
return fmt.Errorf("failed to truncate trailing partial entries: %w", err)
}
return nil return nil
} }
......
...@@ -2,20 +2,30 @@ package entrydb ...@@ -2,20 +2,30 @@ package entrydb
import ( import (
"bytes" "bytes"
"errors"
"io" "io"
"os"
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestReadWrite(t *testing.T) { func TestReadWrite(t *testing.T) {
t.Run("BasicReadWrite", func(t *testing.T) { t.Run("BasicReadWrite", func(t *testing.T) {
db := createEntryDB(t) db := createEntryDB(t)
require.EqualValues(t, 0, db.Size())
require.NoError(t, db.Append(createEntry(1))) require.NoError(t, db.Append(createEntry(1)))
require.EqualValues(t, 1, db.Size())
require.NoError(t, db.Append(createEntry(2))) require.NoError(t, db.Append(createEntry(2)))
require.EqualValues(t, 2, db.Size())
require.NoError(t, db.Append(createEntry(3))) require.NoError(t, db.Append(createEntry(3)))
require.EqualValues(t, 3, db.Size())
require.NoError(t, db.Append(createEntry(4))) require.NoError(t, db.Append(createEntry(4)))
require.EqualValues(t, 4, db.Size())
requireRead(t, db, 0, createEntry(1)) requireRead(t, db, 0, createEntry(1))
requireRead(t, db, 1, createEntry(2)) requireRead(t, db, 1, createEntry(2))
requireRead(t, db, 2, createEntry(3)) requireRead(t, db, 2, createEntry(3))
...@@ -33,11 +43,8 @@ func TestReadWrite(t *testing.T) { ...@@ -33,11 +43,8 @@ func TestReadWrite(t *testing.T) {
t.Run("WriteMultiple", func(t *testing.T) { t.Run("WriteMultiple", func(t *testing.T) {
db := createEntryDB(t) db := createEntryDB(t)
require.NoError(t, db.Append( require.NoError(t, db.Append(createEntry(1), createEntry(2), createEntry(3)))
createEntry(1), require.EqualValues(t, 3, db.Size())
createEntry(2),
createEntry(3),
))
requireRead(t, db, 0, createEntry(1)) requireRead(t, db, 0, createEntry(1))
requireRead(t, db, 1, createEntry(2)) requireRead(t, db, 1, createEntry(2))
requireRead(t, db, 2, createEntry(3)) requireRead(t, db, 2, createEntry(3))
...@@ -45,23 +52,129 @@ func TestReadWrite(t *testing.T) { ...@@ -45,23 +52,129 @@ func TestReadWrite(t *testing.T) {
} }
func TestTruncate(t *testing.T) { func TestTruncate(t *testing.T) {
db := createEntryDB(t) t.Run("Partial", func(t *testing.T) {
require.NoError(t, db.Append(createEntry(1))) db := createEntryDB(t)
require.NoError(t, db.Append(createEntry(2))) require.NoError(t, db.Append(createEntry(1)))
require.NoError(t, db.Append(createEntry(3))) require.NoError(t, db.Append(createEntry(2)))
require.NoError(t, db.Append(createEntry(4))) require.NoError(t, db.Append(createEntry(3)))
require.NoError(t, db.Append(createEntry(5))) require.NoError(t, db.Append(createEntry(4)))
require.NoError(t, db.Append(createEntry(5)))
require.NoError(t, db.Truncate(3)) require.EqualValues(t, 5, db.Size())
requireRead(t, db, 0, createEntry(1))
requireRead(t, db, 1, createEntry(2)) require.NoError(t, db.Truncate(3))
requireRead(t, db, 2, createEntry(3)) require.EqualValues(t, 4, db.Size()) // 0, 1, 2 and 3 are preserved
requireRead(t, db, 0, createEntry(1))
// 4 and 5 have been removed requireRead(t, db, 1, createEntry(2))
_, err := db.Read(4) requireRead(t, db, 2, createEntry(3))
require.ErrorIs(t, err, io.EOF)
_, err = db.Read(5) // 4 and 5 have been removed
require.ErrorIs(t, err, io.EOF) _, err := db.Read(4)
require.ErrorIs(t, err, io.EOF)
_, err = db.Read(5)
require.ErrorIs(t, err, io.EOF)
})
t.Run("Complete", func(t *testing.T) {
db := createEntryDB(t)
require.NoError(t, db.Append(createEntry(1)))
require.NoError(t, db.Append(createEntry(2)))
require.NoError(t, db.Append(createEntry(3)))
require.EqualValues(t, 3, db.Size())
require.NoError(t, db.Truncate(-1))
require.EqualValues(t, 0, db.Size()) // All items are removed
_, err := db.Read(0)
require.ErrorIs(t, err, io.EOF)
})
t.Run("AppendAfterTruncate", func(t *testing.T) {
db := createEntryDB(t)
require.NoError(t, db.Append(createEntry(1)))
require.NoError(t, db.Append(createEntry(2)))
require.NoError(t, db.Append(createEntry(3)))
require.EqualValues(t, 3, db.Size())
require.NoError(t, db.Truncate(1))
require.EqualValues(t, 2, db.Size())
newEntry := createEntry(4)
require.NoError(t, db.Append(newEntry))
entry, err := db.Read(2)
require.NoError(t, err)
require.Equal(t, newEntry, entry)
})
}
func TestTruncateTrailingPartialEntries(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
file := filepath.Join(t.TempDir(), "entries.db")
entry1 := createEntry(1)
entry2 := createEntry(2)
invalidData := make([]byte, len(entry1)+len(entry2)+4)
copy(invalidData, entry1[:])
copy(invalidData[EntrySize:], entry2[:])
invalidData[len(invalidData)-1] = 3 // Some invalid trailing data
require.NoError(t, os.WriteFile(file, invalidData, 0o644))
db, err := NewEntryDB(logger, file)
require.NoError(t, err)
defer db.Close()
// Should automatically truncate the file to remove trailing partial entries
require.EqualValues(t, 2, db.Size())
stat, err := os.Stat(file)
require.NoError(t, err)
require.EqualValues(t, 2*EntrySize, stat.Size())
}
func TestWriteErrors(t *testing.T) {
expectedErr := errors.New("some error")
t.Run("TruncatePartiallyWrittenData", func(t *testing.T) {
db, stubData := createEntryDBWithStubData()
stubData.writeErr = expectedErr
stubData.writeErrAfterBytes = 3
err := db.Append(createEntry(1), createEntry(2))
require.ErrorIs(t, err, expectedErr)
require.EqualValues(t, 0, db.Size(), "should not consider entries written")
require.Len(t, stubData.data, 0, "should truncate written bytes")
})
t.Run("FailBeforeDataWritten", func(t *testing.T) {
db, stubData := createEntryDBWithStubData()
stubData.writeErr = expectedErr
stubData.writeErrAfterBytes = 0
err := db.Append(createEntry(1), createEntry(2))
require.ErrorIs(t, err, expectedErr)
require.EqualValues(t, 0, db.Size(), "should not consider entries written")
require.Len(t, stubData.data, 0, "no data written")
})
t.Run("PartialWriteAndTruncateFails", func(t *testing.T) {
db, stubData := createEntryDBWithStubData()
stubData.writeErr = expectedErr
stubData.writeErrAfterBytes = EntrySize + 2
stubData.truncateErr = errors.New("boom")
err := db.Append(createEntry(1), createEntry(2))
require.ErrorIs(t, err, expectedErr)
require.EqualValues(t, 0, db.Size(), "should not consider entries written")
require.Len(t, stubData.data, stubData.writeErrAfterBytes, "rollback failed")
_, err = db.Read(0)
require.ErrorIs(t, err, io.EOF, "should not have first entry")
_, err = db.Read(1)
require.ErrorIs(t, err, io.EOF, "should not have second entry")
// Should retry truncate on next write
stubData.writeErr = nil
stubData.truncateErr = nil
err = db.Append(createEntry(3))
require.NoError(t, err)
actual, err := db.Read(0)
require.NoError(t, err)
require.Equal(t, createEntry(3), actual)
})
} }
func requireRead(t *testing.T, db *EntryDB, idx int64, expected Entry) { func requireRead(t *testing.T, db *EntryDB, idx int64, expected Entry) {
...@@ -75,10 +188,51 @@ func createEntry(i byte) Entry { ...@@ -75,10 +188,51 @@ func createEntry(i byte) Entry {
} }
func createEntryDB(t *testing.T) *EntryDB { func createEntryDB(t *testing.T) *EntryDB {
db, err := NewEntryDB(filepath.Join(t.TempDir(), "entries.db")) logger := testlog.Logger(t, log.LvlInfo)
db, err := NewEntryDB(logger, filepath.Join(t.TempDir(), "entries.db"))
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
}) })
return db return db
} }
func createEntryDBWithStubData() (*EntryDB, *stubDataAccess) {
stubData := &stubDataAccess{}
db := &EntryDB{data: stubData, size: 0}
return db, stubData
}
type stubDataAccess struct {
data []byte
writeErr error
writeErrAfterBytes int
truncateErr error
}
func (s *stubDataAccess) ReadAt(p []byte, off int64) (n int, err error) {
return bytes.NewReader(s.data).ReadAt(p, off)
}
func (s *stubDataAccess) Write(p []byte) (n int, err error) {
if s.writeErr != nil {
s.data = append(s.data, p[:s.writeErrAfterBytes]...)
return s.writeErrAfterBytes, s.writeErr
}
s.data = append(s.data, p...)
return len(p), nil
}
func (s *stubDataAccess) Close() error {
return nil
}
func (s *stubDataAccess) Truncate(size int64) error {
if s.truncateErr != nil {
return s.truncateErr
}
s.data = s.data[:size]
return nil
}
var _ dataAccess = (*stubDataAccess)(nil)
package db package db
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
) )
...@@ -9,7 +10,8 @@ type iterator struct { ...@@ -9,7 +10,8 @@ type iterator struct {
db *DB db *DB
nextEntryIdx int64 nextEntryIdx int64
current logContext current logContext
hasExecMsg bool
entriesRead int64 entriesRead int64
} }
...@@ -24,6 +26,7 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH ...@@ -24,6 +26,7 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH
} }
i.nextEntryIdx++ i.nextEntryIdx++
i.entriesRead++ i.entriesRead++
i.hasExecMsg = false
switch entry[0] { switch entry[0] {
case typeSearchCheckpoint: case typeSearchCheckpoint:
current, err := newSearchCheckpointFromEntry(entry) current, err := newSearchCheckpointFromEntry(entry)
...@@ -33,8 +36,6 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH ...@@ -33,8 +36,6 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH
} }
i.current.blockNum = current.blockNum i.current.blockNum = current.blockNum
i.current.logIdx = current.logIdx i.current.logIdx = current.logIdx
case typeCanonicalHash:
// Skip
case typeInitiatingEvent: case typeInitiatingEvent:
evt, err := newInitiatingEventFromEntry(entry) evt, err := newInitiatingEventFromEntry(entry)
if err != nil { if err != nil {
...@@ -45,11 +46,11 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH ...@@ -45,11 +46,11 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH
blockNum = i.current.blockNum blockNum = i.current.blockNum
logIdx = i.current.logIdx logIdx = i.current.logIdx
evtHash = evt.logHash evtHash = evt.logHash
i.hasExecMsg = evt.hasExecMsg
return return
case typeExecutingCheck: case typeCanonicalHash: // Skip
// TODO(optimism#10857): Handle this properly case typeExecutingCheck: // Skip
case typeExecutingLink: case typeExecutingLink: // Skip
// TODO(optimism#10857): Handle this properly
default: default:
outErr = fmt.Errorf("unknown entry type at idx %v %v", entryIdx, entry[0]) outErr = fmt.Errorf("unknown entry type at idx %v %v", entryIdx, entry[0])
return return
...@@ -58,3 +59,41 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH ...@@ -58,3 +59,41 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash TruncatedH
outErr = io.EOF outErr = io.EOF
return return
} }
func (i *iterator) ExecMessage() (ExecutingMessage, error) {
if !i.hasExecMsg {
return ExecutingMessage{}, nil
}
// Look ahead to find the exec message info
logEntryIdx := i.nextEntryIdx - 1
execMsg, err := i.readExecMessage(logEntryIdx)
if err != nil {
return ExecutingMessage{}, fmt.Errorf("failed to read exec message for initiating event at %v: %w", logEntryIdx, err)
}
return execMsg, nil
}
func (i *iterator) readExecMessage(initEntryIdx int64) (ExecutingMessage, error) {
linkIdx := initEntryIdx + 1
if linkIdx%searchCheckpointFrequency == 0 {
linkIdx += 2 // skip the search checkpoint and canonical hash entries
}
linkEntry, err := i.db.store.Read(linkIdx)
if errors.Is(err, io.EOF) {
return ExecutingMessage{}, fmt.Errorf("%w: missing expected executing link event at idx %v", ErrDataCorruption, linkIdx)
} else if err != nil {
return ExecutingMessage{}, fmt.Errorf("failed to read executing link event at idx %v: %w", linkIdx, err)
}
checkIdx := linkIdx + 1
if checkIdx%searchCheckpointFrequency == 0 {
checkIdx += 2 // skip the search checkpoint and canonical hash entries
}
checkEntry, err := i.db.store.Read(checkIdx)
if errors.Is(err, io.EOF) {
return ExecutingMessage{}, fmt.Errorf("%w: missing expected executing check event at idx %v", ErrDataCorruption, checkIdx)
} else if err != nil {
return ExecutingMessage{}, fmt.Errorf("failed to read executing check event at idx %v: %w", checkIdx, err)
}
return newExecutingMessageFromEntries(linkEntry, checkEntry)
}
package db
import (
"errors"
"github.com/ethereum/go-ethereum/common"
)
var (
ErrLogOutOfOrder = errors.New("log out of order")
ErrDataCorruption = errors.New("data corruption")
ErrNotFound = errors.New("not found")
)
type TruncatedHash [20]byte
func TruncateHash(hash common.Hash) TruncatedHash {
var truncated TruncatedHash
copy(truncated[:], hash[0:20])
return truncated
}
type ExecutingMessage struct {
Chain uint32
BlockNum uint64
LogIdx uint32
Timestamp uint64
Hash TruncatedHash
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment