Commit 98e1cf26 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Wire in head tracking (#11139)

* op-supervisor: Introduce head tracker

* op-supervisor: Move log db to subpackage of db.

* op-supervisor: Route all updates through a common db that can track heads

* op-supervisor: Remove unused error.

* op-supervisor: Remove operations - it fits into a later PR.

* op-supervisor: Fix semgrep

* op-supervisor: Move resuming databases into ChainsDB so it can later update the chain heads too.
parent 89f75545
...@@ -43,8 +43,10 @@ func WriteJSON[X any](outputPath string, value X, perm os.FileMode) error { ...@@ -43,8 +43,10 @@ func WriteJSON[X any](outputPath string, value X, perm os.FileMode) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to open output file: %w", err) return fmt.Errorf("failed to open output file: %w", err)
} }
// Ensure we close the stream even if failures occur. // Ensure we close the stream without renaming even if failures occur.
defer f.Close() defer func() {
_ = f.Abort()
}()
out = f out = f
// Closing the file causes it to be renamed to the final destination // Closing the file causes it to be renamed to the final destination
// so make sure we handle any errors it returns // so make sure we handle any errors it returns
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"path/filepath"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -12,8 +13,9 @@ import ( ...@@ -12,8 +13,9 @@ import (
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -21,18 +23,12 @@ import ( ...@@ -21,18 +23,12 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type LogStore interface {
io.Closer
ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
Rewind(headBlockNum uint64) error
}
type SupervisorBackend struct { type SupervisorBackend struct {
started atomic.Bool started atomic.Bool
logger log.Logger logger log.Logger
chainMonitors []*source.ChainMonitor chainMonitors []*source.ChainMonitor
logDBs []LogStore db *db.ChainsDB
} }
var _ frontend.Backend = (*SupervisorBackend)(nil) var _ frontend.Backend = (*SupervisorBackend)(nil)
...@@ -40,9 +36,17 @@ var _ frontend.Backend = (*SupervisorBackend)(nil) ...@@ -40,9 +36,17 @@ var _ frontend.Backend = (*SupervisorBackend)(nil)
var _ io.Closer = (*SupervisorBackend)(nil) var _ io.Closer = (*SupervisorBackend)(nil)
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) { func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs)) if err := prepDataDir(cfg.Datadir); err != nil {
logDBs := make([]LogStore, len(cfg.L2RPCs)) return nil, err
for i, rpc := range cfg.L2RPCs { }
headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json"))
if err != nil {
return nil, fmt.Errorf("failed to load existing heads: %w", err)
}
logDBs := make(map[types.ChainID]db.LogStorage)
chainRPCs := make(map[types.ChainID]string)
chainClients := make(map[types.ChainID]client.RPC)
for _, rpc := range cfg.L2RPCs {
rpcClient, chainID, err := createRpcClient(ctx, logger, rpc) rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -52,26 +56,32 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg ...@@ -52,26 +56,32 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err) return nil, fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
} }
logDB, err := db.NewFromFile(logger, cm, path) logDB, err := logs.NewFromFile(logger, cm, path)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err) return nil, fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
} }
logDBs[i] = logDB logDBs[chainID] = logDB
chainRPCs[chainID] = rpc
chainClients[chainID] = rpcClient
}
chainsDB := db.NewChainsDB(logDBs, headTracker)
if err := chainsDB.Resume(); err != nil {
return nil, fmt.Errorf("failed to resume chains db: %w", err)
}
block, err := Resume(logDB) chainMonitors := make([]*source.ChainMonitor, 0, len(cfg.L2RPCs))
if err != nil { for chainID, rpc := range chainRPCs {
return nil, err cm := newChainMetrics(chainID, m)
} monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, chainClients[chainID], chainsDB)
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, logDB, block)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err) return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
} }
chainMonitors[i] = monitor chainMonitors = append(chainMonitors, monitor)
} }
return &SupervisorBackend{ return &SupervisorBackend{
logger: logger, logger: logger,
chainMonitors: chainMonitors, chainMonitors: chainMonitors,
logDBs: logDBs, db: chainsDB,
}, nil }, nil
} }
...@@ -109,10 +119,8 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { ...@@ -109,10 +119,8 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err)) errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
} }
} }
for _, logDB := range su.logDBs { if err := su.db.Close(); err != nil {
if err := logDB.Close(); err != nil { errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
errs = errors.Join(errs, fmt.Errorf("failed to close logdb: %w", err))
}
} }
return errs return errs
} }
......
...@@ -2,7 +2,7 @@ package backend ...@@ -2,7 +2,7 @@ package backend
import ( import (
"github.com/ethereum-optimism/optimism/op-service/sources/caching" "github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
...@@ -45,4 +45,4 @@ func (c *chainMetrics) RecordDBSearchEntriesRead(count int64) { ...@@ -45,4 +45,4 @@ func (c *chainMetrics) RecordDBSearchEntriesRead(count int64) {
} }
var _ caching.Metrics = (*chainMetrics)(nil) var _ caching.Metrics = (*chainMetrics)(nil)
var _ db.Metrics = (*chainMetrics)(nil) var _ logs.Metrics = (*chainMetrics)(nil)
This diff is collapsed.
package heads
import (
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"github.com/ethereum-optimism/optimism/op-service/jsonutil"
)
// HeadTracker records the current chain head pointers for a single chain.
type HeadTracker struct {
rwLock sync.RWMutex
path string
current *Heads
}
func NewHeadTracker(path string) (*HeadTracker, error) {
current := NewHeads()
if data, err := os.ReadFile(path); errors.Is(err, os.ErrNotExist) {
// No existing file, just use empty heads
} else if err != nil {
return nil, fmt.Errorf("failed to read existing heads from %v: %w", path, err)
} else {
if err := json.Unmarshal(data, current); err != nil {
return nil, fmt.Errorf("invalid existing heads file %v: %w", path, err)
}
}
return &HeadTracker{
path: path,
current: current,
}, nil
}
func (t *HeadTracker) Apply(op Operation) error {
t.rwLock.Lock()
defer t.rwLock.Unlock()
// Store a copy of the heads prior to changing so we can roll back if needed.
modified := t.current.Copy()
if err := op.Apply(modified); err != nil {
return fmt.Errorf("operation failed: %w", err)
}
if err := t.write(modified); err != nil {
return fmt.Errorf("failed to store updated heads: %w", err)
}
t.current = modified
return nil
}
func (t *HeadTracker) Current() *Heads {
t.rwLock.RLock()
defer t.rwLock.RUnlock()
return t.current.Copy()
}
func (t *HeadTracker) write(heads *Heads) error {
if err := jsonutil.WriteJSON(t.path, heads, 0o644); err != nil {
return fmt.Errorf("failed to write new heads: %w", err)
}
return nil
}
func (t *HeadTracker) Close() error {
return nil
}
package heads
import (
"errors"
"os"
"path/filepath"
"testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require"
)
func TestHeads_SaveAndReload(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "heads.json")
chainA := types.ChainIDFromUInt64(3)
chainAHeads := ChainHeads{
Unsafe: 1,
CrossUnsafe: 2,
LocalSafe: 3,
CrossSafe: 4,
LocalFinalized: 5,
CrossFinalized: 6,
}
chainB := types.ChainIDFromUInt64(5)
chainBHeads := ChainHeads{
Unsafe: 11,
CrossUnsafe: 12,
LocalSafe: 13,
CrossSafe: 14,
LocalFinalized: 15,
CrossFinalized: 16,
}
orig, err := NewHeadTracker(path)
require.NoError(t, err)
err = orig.Apply(OperationFn(func(heads *Heads) error {
heads.Put(chainA, chainAHeads)
heads.Put(chainB, chainBHeads)
return nil
}))
require.NoError(t, err)
require.Equal(t, orig.Current().Get(chainA), chainAHeads)
require.Equal(t, orig.Current().Get(chainB), chainBHeads)
loaded, err := NewHeadTracker(path)
require.NoError(t, err)
require.EqualValues(t, loaded.Current(), orig.Current())
}
func TestHeads_NoChangesMadeIfOperationFails(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "heads.json")
chainA := types.ChainIDFromUInt64(3)
chainAHeads := ChainHeads{
Unsafe: 1,
CrossUnsafe: 2,
LocalSafe: 3,
CrossSafe: 4,
LocalFinalized: 5,
CrossFinalized: 6,
}
orig, err := NewHeadTracker(path)
require.NoError(t, err)
boom := errors.New("boom")
err = orig.Apply(OperationFn(func(heads *Heads) error {
heads.Put(chainA, chainAHeads)
return boom
}))
require.ErrorIs(t, err, boom)
require.Equal(t, ChainHeads{}, orig.Current().Get(chainA))
// Should be able to load from disk too
loaded, err := NewHeadTracker(path)
require.NoError(t, err)
require.EqualValues(t, loaded.Current(), orig.Current())
}
func TestHeads_NoChangesMadeIfWriteFails(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "invalid/heads.json")
chainA := types.ChainIDFromUInt64(3)
chainAHeads := ChainHeads{
Unsafe: 1,
CrossUnsafe: 2,
LocalSafe: 3,
CrossSafe: 4,
LocalFinalized: 5,
CrossFinalized: 6,
}
orig, err := NewHeadTracker(path)
require.NoError(t, err)
err = orig.Apply(OperationFn(func(heads *Heads) error {
heads.Put(chainA, chainAHeads)
return nil
}))
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, ChainHeads{}, orig.Current().Get(chainA))
}
package heads
import (
"encoding/json"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common/hexutil"
)
// ChainHeads provides the serialization format for the current chain heads.
// The values here could be block numbers or just the index of entries in the log db.
// If they're log db entries, we can't detect if things changed because of a reorg though (if the logdb write succeeded and head update failed).
// So we probably need to store actual block IDs here... but then we don't have the block hash for every block in the log db.
// Only jumping the head forward on checkpoint blocks doesn't work though...
type ChainHeads struct {
Unsafe entrydb.EntryIdx `json:"localUnsafe"`
CrossUnsafe entrydb.EntryIdx `json:"crossUnsafe"`
LocalSafe entrydb.EntryIdx `json:"localSafe"`
CrossSafe entrydb.EntryIdx `json:"crossSafe"`
LocalFinalized entrydb.EntryIdx `json:"localFinalized"`
CrossFinalized entrydb.EntryIdx `json:"crossFinalized"`
}
type Heads struct {
Chains map[types.ChainID]ChainHeads
}
func NewHeads() *Heads {
return &Heads{Chains: make(map[types.ChainID]ChainHeads)}
}
func (h *Heads) Get(id types.ChainID) ChainHeads {
chain, ok := h.Chains[id]
if !ok {
return ChainHeads{}
}
return chain
}
func (h *Heads) Put(id types.ChainID, head ChainHeads) {
h.Chains[id] = head
}
func (h *Heads) Copy() *Heads {
c := &Heads{Chains: make(map[types.ChainID]ChainHeads)}
for id, heads := range h.Chains {
c.Chains[id] = heads
}
return c
}
func (h Heads) MarshalJSON() ([]byte, error) {
data := make(map[hexutil.U256]ChainHeads)
for id, heads := range h.Chains {
data[hexutil.U256(id)] = heads
}
return json.Marshal(data)
}
func (h *Heads) UnmarshalJSON(data []byte) error {
hexData := make(map[hexutil.U256]ChainHeads)
if err := json.Unmarshal(data, &hexData); err != nil {
return err
}
h.Chains = make(map[types.ChainID]ChainHeads)
for id, heads := range hexData {
h.Put(types.ChainID(id), heads)
}
return nil
}
type Operation interface {
Apply(head *Heads) error
}
type OperationFn func(heads *Heads) error
func (f OperationFn) Apply(heads *Heads) error {
return f(heads)
}
package heads
import (
"encoding/json"
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require"
)
func TestHeads(t *testing.T) {
t.Run("RoundTripViaJson", func(t *testing.T) {
heads := NewHeads()
heads.Put(types.ChainIDFromUInt64(3), ChainHeads{
Unsafe: 10,
CrossUnsafe: 9,
LocalSafe: 8,
CrossSafe: 7,
LocalFinalized: 6,
CrossFinalized: 5,
})
heads.Put(types.ChainIDFromUInt64(9), ChainHeads{
Unsafe: 90,
CrossUnsafe: 80,
LocalSafe: 70,
CrossSafe: 60,
LocalFinalized: 50,
CrossFinalized: 40,
})
heads.Put(types.ChainIDFromUInt64(4892497242424), ChainHeads{
Unsafe: 1000,
CrossUnsafe: 900,
LocalSafe: 800,
CrossSafe: 700,
LocalFinalized: 600,
CrossFinalized: 400,
})
j, err := json.Marshal(heads)
require.NoError(t, err)
fmt.Println(string(j))
var result Heads
err = json.Unmarshal(j, &result)
require.NoError(t, err)
require.Equal(t, heads.Chains, result.Chains)
})
t.Run("Copy", func(t *testing.T) {
chainA := types.ChainIDFromUInt64(3)
chainB := types.ChainIDFromUInt64(4)
chainAOrigHeads := ChainHeads{
Unsafe: 1,
}
chainAModifiedHeads1 := ChainHeads{
Unsafe: 2,
}
chainAModifiedHeads2 := ChainHeads{
Unsafe: 4,
}
chainBModifiedHeads := ChainHeads{
Unsafe: 2,
}
heads := NewHeads()
heads.Put(chainA, chainAOrigHeads)
otherHeads := heads.Copy()
otherHeads.Put(chainA, chainAModifiedHeads1)
otherHeads.Put(chainB, chainBModifiedHeads)
require.Equal(t, heads.Get(chainA), chainAOrigHeads)
require.Equal(t, heads.Get(chainB), ChainHeads{})
heads.Put(chainA, chainAModifiedHeads2)
require.Equal(t, heads.Get(chainA), chainAModifiedHeads2)
require.Equal(t, otherHeads.Get(chainA), chainAModifiedHeads1)
require.Equal(t, otherHeads.Get(chainB), chainBModifiedHeads)
})
}
package backend package db
import ( import (
"errors" "errors"
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
// Resume prepares the given LogStore to resume recording events. // Resume prepares the given LogStore to resume recording events.
// It returns the block number of the last block that is guaranteed to have been fully recorded to the database // It returns the block number of the last block that is guaranteed to have been fully recorded to the database
// and rewinds the database to ensure it can resume recording from the first log of the next block. // and rewinds the database to ensure it can resume recording from the first log of the next block.
func Resume(logDB LogStore) (uint64, error) { func Resume(logDB LogStorage) error {
// Get the last checkpoint that was written then Rewind the db // Get the last checkpoint that was written then Rewind the db
// to the block prior to that block and start from there. // to the block prior to that block and start from there.
// Guarantees we will always roll back at least one block // Guarantees we will always roll back at least one block
...@@ -18,17 +18,17 @@ func Resume(logDB LogStore) (uint64, error) { ...@@ -18,17 +18,17 @@ func Resume(logDB LogStore) (uint64, error) {
checkPointBlock, _, err := logDB.ClosestBlockInfo(math.MaxUint64) checkPointBlock, _, err := logDB.ClosestBlockInfo(math.MaxUint64)
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
// No blocks recorded in the database, start from genesis // No blocks recorded in the database, start from genesis
return 0, nil return nil
} else if err != nil { } else if err != nil {
return 0, fmt.Errorf("failed to get block from checkpoint: %w", err) return fmt.Errorf("failed to get block from checkpoint: %w", err)
} }
if checkPointBlock == 0 { if checkPointBlock == 0 {
return 0, nil return nil
} }
block := checkPointBlock - 1 block := checkPointBlock - 1
err = logDB.Rewind(block) err = logDB.Rewind(block)
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to 'Rewind' the database: %w", err) return fmt.Errorf("failed to rewind the database: %w", err)
} }
return block, nil return nil
} }
package backend package db
import ( import (
"fmt" "fmt"
"io" "io"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestRecover(t *testing.T) { func TestRecover(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
stubDB *stubLogStore stubDB *stubLogStore
expectedBlockNum uint64 expectRewoundTo uint64
expectRewoundTo uint64
}{ }{
{ {
name: "emptydb", name: "emptydb",
stubDB: &stubLogStore{closestBlockErr: fmt.Errorf("no entries: %w", io.EOF)}, stubDB: &stubLogStore{closestBlockErr: fmt.Errorf("no entries: %w", io.EOF)},
expectedBlockNum: 0, expectRewoundTo: 0,
expectRewoundTo: 0,
}, },
{ {
name: "genesis", name: "genesis",
stubDB: &stubLogStore{}, stubDB: &stubLogStore{},
expectedBlockNum: 0, expectRewoundTo: 0,
expectRewoundTo: 0,
}, },
{ {
name: "with_blocks", name: "with_blocks",
stubDB: &stubLogStore{closestBlockNumber: 15}, stubDB: &stubLogStore{closestBlockNumber: 15},
expectedBlockNum: 14, expectRewoundTo: 14,
expectRewoundTo: 14,
}, },
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
block, err := Resume(test.stubDB) err := Resume(test.stubDB)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, test.expectedBlockNum, block)
require.Equal(t, test.expectRewoundTo, test.stubDB.rewoundTo) require.Equal(t, test.expectRewoundTo, test.stubDB.rewoundTo)
}) })
} }
...@@ -52,10 +48,6 @@ type stubLogStore struct { ...@@ -52,10 +48,6 @@ type stubLogStore struct {
rewoundTo uint64 rewoundTo uint64
} }
func (s *stubLogStore) Close() error {
return nil
}
func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) { func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) {
if s.closestBlockErr != nil { if s.closestBlockErr != nil {
return 0, types.TruncatedHash{}, s.closestBlockErr return 0, types.TruncatedHash{}, s.closestBlockErr
...@@ -67,3 +59,15 @@ func (s *stubLogStore) Rewind(headBlockNum uint64) error { ...@@ -67,3 +59,15 @@ func (s *stubLogStore) Rewind(headBlockNum uint64) error {
s.rewoundTo = headBlockNum s.rewoundTo = headBlockNum
return nil return nil
} }
func (s *stubLogStore) AddLog(logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error {
panic("not supported")
}
func (s *stubLogStore) LatestBlockNum() uint64 {
panic("not supported")
}
func (s *stubLogStore) Close() error {
return nil
}
This diff is collapsed.
This diff is collapsed.
package db package logs
import ( import (
"encoding/binary" "encoding/binary"
......
...@@ -23,3 +23,10 @@ func prepChainDir(chainID types.ChainID, datadir string) (string, error) { ...@@ -23,3 +23,10 @@ func prepChainDir(chainID types.ChainID, datadir string) (string, error) {
} }
return dir, nil return dir, nil
} }
func prepDataDir(datadir string) error {
if err := os.MkdirAll(datadir, 0755); err != nil {
return fmt.Errorf("failed to create data directory %v: %w", datadir, err)
}
return nil
}
...@@ -23,9 +23,10 @@ type Metrics interface { ...@@ -23,9 +23,10 @@ type Metrics interface {
caching.Metrics caching.Metrics
} }
type LogDB interface { type Storage interface {
LogStorage LogStorage
DatabaseRewinder DatabaseRewinder
LatestBlockNum(chainID types.ChainID) uint64
} }
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform // ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
...@@ -35,7 +36,7 @@ type ChainMonitor struct { ...@@ -35,7 +36,7 @@ type ChainMonitor struct {
headMonitor *HeadMonitor headMonitor *HeadMonitor
} }
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store LogDB, block uint64) (*ChainMonitor, error) { func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) {
logger = logger.New("chainID", chainID) logger = logger.New("chainID", chainID)
cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind) cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
if err != nil { if err != nil {
...@@ -43,12 +44,12 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID ...@@ -43,12 +44,12 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID
} }
startingHead := eth.L1BlockRef{ startingHead := eth.L1BlockRef{
Number: block, Number: store.LatestBlockNum(chainID),
} }
processLogs := newLogProcessor(store) processLogs := newLogProcessor(chainID, store)
fetchReceipts := newLogFetcher(cl, processLogs) fetchReceipts := newLogFetcher(cl, processLogs)
unsafeBlockProcessor := NewChainProcessor(logger, cl, startingHead, fetchReceipts, store) unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, startingHead, fetchReceipts, store)
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor} unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil) callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -16,7 +17,7 @@ type BlockProcessor interface { ...@@ -16,7 +17,7 @@ type BlockProcessor interface {
} }
type DatabaseRewinder interface { type DatabaseRewinder interface {
Rewind(headBlockNum uint64) error Rewind(chain types.ChainID, headBlockNum uint64) error
} }
type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error
...@@ -30,15 +31,17 @@ func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRe ...@@ -30,15 +31,17 @@ func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRe
type ChainProcessor struct { type ChainProcessor struct {
log log.Logger log log.Logger
client BlockByNumberSource client BlockByNumberSource
chain types.ChainID
lastBlock eth.L1BlockRef lastBlock eth.L1BlockRef
processor BlockProcessor processor BlockProcessor
rewinder DatabaseRewinder rewinder DatabaseRewinder
} }
func NewChainProcessor(log log.Logger, client BlockByNumberSource, startingHead eth.L1BlockRef, processor BlockProcessor, rewinder DatabaseRewinder) *ChainProcessor { func NewChainProcessor(log log.Logger, client BlockByNumberSource, chain types.ChainID, startingHead eth.L1BlockRef, processor BlockProcessor, rewinder DatabaseRewinder) *ChainProcessor {
return &ChainProcessor{ return &ChainProcessor{
log: log, log: log,
client: client, client: client,
chain: chain,
lastBlock: startingHead, lastBlock: startingHead,
processor: processor, processor: processor,
rewinder: rewinder, rewinder: rewinder,
...@@ -68,7 +71,7 @@ func (s *ChainProcessor) processBlock(ctx context.Context, block eth.L1BlockRef) ...@@ -68,7 +71,7 @@ func (s *ChainProcessor) processBlock(ctx context.Context, block eth.L1BlockRef)
if err := s.processor.ProcessBlock(ctx, block); err != nil { if err := s.processor.ProcessBlock(ctx, block); err != nil {
s.log.Error("Failed to process block", "block", block, "err", err) s.log.Error("Failed to process block", "block", block, "err", err)
// Try to rewind the database to the previous block to remove any logs from this block that were written // Try to rewind the database to the previous block to remove any logs from this block that were written
if err := s.rewinder.Rewind(s.lastBlock.Number); err != nil { if err := s.rewinder.Rewind(s.chain, s.lastBlock.Number); err != nil {
// If any logs were written, our next attempt to write will fail and we'll retry this rewind. // If any logs were written, our next attempt to write will fail and we'll retry this rewind.
// If no logs were written successfully then the rewind wouldn't have done anything anyway. // If no logs were written successfully then the rewind wouldn't have done anything anyway.
s.log.Error("Failed to rewind after error processing block", "block", block, "err", err) s.log.Error("Failed to rewind after error processing block", "block", block, "err", err)
......
...@@ -3,22 +3,26 @@ package source ...@@ -3,22 +3,26 @@ package source
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var processorChainID = types.ChainIDFromUInt64(4)
func TestUnsafeBlocksStage(t *testing.T) { func TestUnsafeBlocksStage(t *testing.T) {
t.Run("IgnoreEventsAtOrPriorToStartingHead", func(t *testing.T) { t.Run("IgnoreEventsAtOrPriorToStartingHead", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{} client := &stubBlockByNumberSource{}
processor := &stubBlockProcessor{} processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, eth.L1BlockRef{Number: 100}, processor, &stubRewinder{}) stage := NewChainProcessor(logger, client, processorChainID, eth.L1BlockRef{Number: 100}, processor, &stubRewinder{})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100}) stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99}) stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99})
...@@ -35,7 +39,7 @@ func TestUnsafeBlocksStage(t *testing.T) { ...@@ -35,7 +39,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block2 := eth.L1BlockRef{Number: 102} block2 := eth.L1BlockRef{Number: 102}
block3 := eth.L1BlockRef{Number: 103} block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{} processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor, &stubRewinder{}) stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block1) stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed) require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
stage.OnNewHead(ctx, block2) stage.OnNewHead(ctx, block2)
...@@ -53,7 +57,7 @@ func TestUnsafeBlocksStage(t *testing.T) { ...@@ -53,7 +57,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block0 := eth.L1BlockRef{Number: 100} block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101} block1 := eth.L1BlockRef{Number: 101}
processor := &stubBlockProcessor{} processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor, &stubRewinder{}) stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block1) stage.OnNewHead(ctx, block1)
require.NotEmpty(t, processor.processed) require.NotEmpty(t, processor.processed)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed) require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
...@@ -72,7 +76,7 @@ func TestUnsafeBlocksStage(t *testing.T) { ...@@ -72,7 +76,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block0 := eth.L1BlockRef{Number: 100} block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103} block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{} processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor, &stubRewinder{}) stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block3) stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed) require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
...@@ -88,7 +92,7 @@ func TestUnsafeBlocksStage(t *testing.T) { ...@@ -88,7 +92,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block3 := eth.L1BlockRef{Number: 103} block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{} processor := &stubBlockProcessor{}
rewinder := &stubRewinder{} rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, block0, processor, rewinder) stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
stage.OnNewHead(ctx, block3) stage.OnNewHead(ctx, block3)
require.Empty(t, processor.processed, "should not update any blocks because backfill failed") require.Empty(t, processor.processed, "should not update any blocks because backfill failed")
...@@ -107,7 +111,7 @@ func TestUnsafeBlocksStage(t *testing.T) { ...@@ -107,7 +111,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block3 := eth.L1BlockRef{Number: 103} block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{err: errors.New("boom")} processor := &stubBlockProcessor{err: errors.New("boom")}
rewinder := &stubRewinder{} rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, block0, processor, rewinder) stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
stage.OnNewHead(ctx, block3) stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101)}, processor.processed, "Attempted to process block 101") require.Equal(t, []eth.L1BlockRef{makeBlockRef(101)}, processor.processed, "Attempted to process block 101")
...@@ -127,7 +131,7 @@ func TestUnsafeBlocksStage(t *testing.T) { ...@@ -127,7 +131,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block1 := eth.L1BlockRef{Number: 101} block1 := eth.L1BlockRef{Number: 101}
processor := &stubBlockProcessor{err: errors.New("boom")} processor := &stubBlockProcessor{err: errors.New("boom")}
rewinder := &stubRewinder{} rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, block0, processor, rewinder) stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
// No skipped blocks // No skipped blocks
stage.OnNewHead(ctx, block1) stage.OnNewHead(ctx, block1)
...@@ -173,7 +177,10 @@ type stubRewinder struct { ...@@ -173,7 +177,10 @@ type stubRewinder struct {
rewindCalled bool rewindCalled bool
} }
func (s *stubRewinder) Rewind(headBlockNum uint64) error { func (s *stubRewinder) Rewind(chainID types.ChainID, headBlockNum uint64) error {
if chainID != processorChainID {
return fmt.Errorf("chainID mismatch, expected %v but was %v", processorChainID, chainID)
}
s.rewoundTo = headBlockNum s.rewoundTo = headBlockNum
s.rewindCalled = true s.rewindCalled = true
return nil return nil
......
...@@ -6,28 +6,30 @@ import ( ...@@ -6,28 +6,30 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
supTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types" ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
) )
type LogStorage interface { type LogStorage interface {
AddLog(logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error AddLog(chain supTypes.ChainID, logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error
} }
type logProcessor struct { type logProcessor struct {
chain supTypes.ChainID
logStore LogStorage logStore LogStorage
} }
func newLogProcessor(logStore LogStorage) *logProcessor { func newLogProcessor(chain supTypes.ChainID, logStore LogStorage) *logProcessor {
return &logProcessor{logStore} return &logProcessor{chain: chain, logStore: logStore}
} }
func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts ethTypes.Receipts) error { func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts ethTypes.Receipts) error {
for _, rcpt := range rcpts { for _, rcpt := range rcpts {
for _, l := range rcpt.Logs { for _, l := range rcpt.Logs {
logHash := logToHash(l) logHash := logToHash(l)
err := p.logStore.AddLog(logHash, block.ID(), block.Time, uint32(l.Index), nil) err := p.logStore.AddLog(p.chain, logHash, block.ID(), block.Time, uint32(l.Index), nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to add log %d from block %v: %w", l.Index, block.ID(), err) return fmt.Errorf("failed to add log %d from block %v: %w", l.Index, block.ID(), err)
} }
......
...@@ -2,21 +2,25 @@ package source ...@@ -2,21 +2,25 @@ package source
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
supTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types" ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var logProcessorChainID = supTypes.ChainIDFromUInt64(4)
func TestLogProcessor(t *testing.T) { func TestLogProcessor(t *testing.T) {
ctx := context.Background() ctx := context.Background()
block1 := eth.L1BlockRef{Number: 100, Hash: common.Hash{0x11}, Time: 1111} block1 := eth.L1BlockRef{Number: 100, Hash: common.Hash{0x11}, Time: 1111}
t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) { t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) {
store := &stubLogStorage{} store := &stubLogStorage{}
processor := newLogProcessor(store) processor := newLogProcessor(logProcessorChainID, store)
err := processor.ProcessLogs(ctx, block1, ethTypes.Receipts{}) err := processor.ProcessLogs(ctx, block1, ethTypes.Receipts{})
require.NoError(t, err) require.NoError(t, err)
...@@ -50,7 +54,7 @@ func TestLogProcessor(t *testing.T) { ...@@ -50,7 +54,7 @@ func TestLogProcessor(t *testing.T) {
}, },
} }
store := &stubLogStorage{} store := &stubLogStorage{}
processor := newLogProcessor(store) processor := newLogProcessor(logProcessorChainID, store)
err := processor.ProcessLogs(ctx, block1, rcpts) err := processor.ProcessLogs(ctx, block1, rcpts)
require.NoError(t, err) require.NoError(t, err)
...@@ -141,7 +145,10 @@ type stubLogStorage struct { ...@@ -141,7 +145,10 @@ type stubLogStorage struct {
logs []storedLog logs []storedLog
} }
func (s *stubLogStorage) AddLog(logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error { func (s *stubLogStorage) AddLog(chainID supTypes.ChainID, logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error {
if logProcessorChainID != chainID {
return fmt.Errorf("chain id mismatch, expected %v but got %v", logProcessorChainID, chainID)
}
s.logs = append(s.logs, storedLog{ s.logs = append(s.logs, storedLog{
block: block, block: block,
timestamp: timestamp, timestamp: timestamp,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment