Commit 80a122ac authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-node: Add option to enable safe head history database (#9575)

* op-node: Add option to enable safe head history database.

Currently it just logs the information and has no actual database.

* op-node: Introduce pebble db to store safe head updates

* op-node: Reset the pipeline if safe head updates fail to be recorded

* go mod tidy

* op-node: Truncate when L1 head is reduced.

* op-node: Record accurate safe head data when restarting

* op-node: Ensure the latest safe head update is reset on a pipeline reset.

* op-node: Improve thread safety of safedb

* op-node: Add L2 block number to stored data

* op-node: Add API method to retrieve safe head at an L1 block number

* op-node: Tidy up key handling in safedb.

* op-node: Use an explicit reset event to clear entries invalidated by a pipeline reset

Add action test to confirm reorgs are correctly handled.

* op-node: Undo changes to start. We always step the safe head back at least one block.

* op-node: Simplify error message when requested record is prior to start of history

* op-node: Improve log message

* op-node: Tidy up

* op-node: Include L1 block number in response.

* op-node: Add missing AssertExpectations

* op-node: Verify key prefix
parent 134f6883
......@@ -6,6 +6,7 @@ require (
github.com/BurntSushi/toml v1.3.2
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4
github.com/consensys/gnark-crypto v0.12.1
github.com/crate-crypto/go-kzg-4844 v0.7.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
......@@ -68,7 +69,6 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/consensys/bavard v0.1.13 // indirect
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
......@@ -44,7 +45,7 @@ type L2Sequencer struct {
func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc derive.L1BlobsFetcher,
eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, blobSrc, eng, cfg, &sync.Config{})
ver := NewL2Verifier(t, log, l1, blobSrc, eng, cfg, &sync.Config{}, safedb.Disabled)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1)
l1OriginSelector := &MockL1OriginSelector{
......
......@@ -58,10 +58,15 @@ type L2API interface {
OutputV0AtBlock(ctx context.Context, blockHash common.Hash) (*eth.OutputV0, error)
}
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier {
type safeDB interface {
derive.SafeHeadListener
node.SafeDBReader
}
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{}
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, nil, eng, engine, metrics, syncCfg)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, nil, eng, engine, metrics, syncCfg, safeHeadListener)
pipeline.Reset()
rollupNode := &L2Verifier{
......@@ -84,7 +89,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
apis := []rpc.API{
{
Namespace: "optimism",
Service: node.NewNodeAPI(cfg, eng, backend, log, m),
Service: node.NewNodeAPI(cfg, eng, backend, safeHeadListener, log, m),
Public: true,
Authenticated: false,
},
......
......@@ -3,6 +3,7 @@ package actions
import (
"testing"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
......@@ -13,11 +14,33 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, blobSrc derive.L1BlobsFetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) {
type verifierCfg struct {
safeHeadListener safeDB
}
type VerifierOpt func(opts *verifierCfg)
func WithSafeHeadListener(l safeDB) VerifierOpt {
return func(opts *verifierCfg) {
opts.safeHeadListener = l
}
}
func defaultVerifierCfg() *verifierCfg {
return &verifierCfg{
safeHeadListener: safedb.Disabled,
}
}
func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, blobSrc derive.L1BlobsFetcher, syncCfg *sync.Config, opts ...VerifierOpt) (*L2Engine, *L2Verifier) {
cfg := defaultVerifierCfg()
for _, opt := range opts {
opt(cfg)
}
jwtPath := e2eutils.WriteDefaultJWT(t)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, EngineWithP2P())
engCl := engine.EngineClient(t, sd.RollupCfg)
verifier := NewL2Verifier(t, log, l1F, blobSrc, engCl, sd.RollupCfg, syncCfg)
verifier := NewL2Verifier(t, log, l1F, blobSrc, engCl, sd.RollupCfg, syncCfg, cfg.safeHeadListener)
return engine, verifier
}
......
package actions
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestRecordSafeHeadUpdates(gt *testing.T) {
t := NewDefaultTesting(gt)
sd, miner, sequencer, verifier, verifierEng, batcher := setupSafeDBTest(t, defaultRollupTestParams)
verifEngClient := verifierEng.EngineClient(t, sd.RollupCfg)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)
// build empty L1 block
miner.ActEmptyBlock(t)
// Create L2 blocks, and reference the L1 head as origin
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1Head(t)
// submit all new L2 blocks
batcher.ActSubmitAll(t)
// new L1 block with L2 batch
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
batchTx := miner.l1Transactions[0]
miner.ActL1EndBlock(t)
// verifier picks up the L2 chain that was submitted
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Safe(), sequencer.L2Unsafe(), "verifier syncs from sequencer via L1")
require.NotEqual(t, sequencer.L2Safe(), sequencer.L2Unsafe(), "sequencer has not processed L1 yet")
// Verify the safe head is recorded
l1Head := miner.l1Chain.CurrentBlock()
firstSafeHeadUpdateL1Block := l1Head.Number.Uint64()
response, err := verifier.RollupClient().SafeHeadAtL1Block(context.Background(), firstSafeHeadUpdateL1Block)
require.NoError(t, err)
require.Equal(t, eth.HeaderBlockID(l1Head), response.L1Block)
require.Equal(t, verifier.L2Unsafe().ID(), response.SafeHead)
// Should get the same result for anything after that L1 block too
response, err = verifier.RollupClient().SafeHeadAtL1Block(context.Background(), firstSafeHeadUpdateL1Block+1)
require.NoError(t, err)
require.Equal(t, eth.HeaderBlockID(l1Head), response.L1Block)
require.Equal(t, verifier.L2Unsafe().ID(), response.SafeHead)
// Should get not found error before the L1 block because we have no earlier safe head recorded
_, err = verifier.RollupClient().SafeHeadAtL1Block(context.Background(), firstSafeHeadUpdateL1Block-1)
require.ErrorContains(t, err, safedb.ErrNotFound.Error())
// orphan the L1 block that included the batch tx, and build a new different L1 block
miner.ActL1RewindToParent(t)
miner.ActL1SetFeeRecipient(common.Address{'B'})
miner.ActEmptyBlock(t)
miner.ActEmptyBlock(t) // needs to be a longer chain for reorg to be applied.
// sync verifier again. The L1 reorg excluded the batch, so now the previous L2 chain should be unsafe again.
// However, the L2 chain can still be canonical later, since it did not reference the reorged L1 block
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Safe(), sequencer.L2Safe(), "verifier rewinds safe when L1 reorgs out batch")
ref, err := verifEngClient.L2BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
require.Equal(t, verifier.L2Safe(), ref, "verifier engine matches rollup client")
// The safe head has been reorged so the record should have been deleted
response, err = verifier.RollupClient().SafeHeadAtL1Block(context.Background(), firstSafeHeadUpdateL1Block)
require.ErrorContainsf(t, err, safedb.ErrNotFound.Error(), "Expected error but got %v", response)
// Now replay the batch tx in a new L1 block
miner.ActL1StartBlock(12)(t)
miner.ActL1SetFeeRecipient(common.Address{'C'})
// note: the geth tx pool reorgLoop is too slow (responds to chain head events, but async),
// and there's no way to manually trigger runReorg, so we re-insert it ourselves.
require.NoError(t, miner.eth.TxPool().Add([]*types.Transaction{batchTx}, true, true)[0])
// need to re-insert previously included tx into the block
miner.ActL1IncludeTx(sd.RollupCfg.Genesis.SystemConfig.BatcherAddr)(t)
miner.ActL1EndBlock(t)
// sync the verifier again: now it should be safe again
verifier.ActL1HeadSignal(t)
verifier.ActL2PipelineFull(t)
require.Equal(t, verifier.L2Safe(), sequencer.L2Unsafe(), "verifier syncs from sequencer via replayed batch on L1")
ref, err = verifEngClient.L2BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
require.Equal(t, verifier.L2Safe(), ref, "verifier engine matches rollup client")
// Verify the safe head is recorded again
l1Head = miner.l1Chain.CurrentBlock()
firstSafeHeadUpdateL1Block = l1Head.Number.Uint64()
response, err = verifier.RollupClient().SafeHeadAtL1Block(context.Background(), firstSafeHeadUpdateL1Block)
require.NoError(t, err)
require.Equal(t, eth.HeaderBlockID(l1Head), response.L1Block)
require.Equal(t, verifier.L2Unsafe().ID(), response.SafeHead)
}
func setupSafeDBTest(t Testing, config *e2eutils.TestParams) (*e2eutils.SetupData, *L1Miner, *L2Sequencer, *L2Verifier, *L2Engine, *L2Batcher) {
dp := e2eutils.MakeDeployParams(t, config)
sd := e2eutils.Setup(t, dp, defaultAlloc)
logger := testlog.Logger(t, log.LevelDebug)
return setupSafeDBTestActors(t, dp, sd, logger)
}
func setupSafeDBTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.SetupData, log log.Logger) (*e2eutils.SetupData, *L1Miner, *L2Sequencer, *L2Verifier, *L2Engine, *L2Batcher) {
dir := t.TempDir()
db, err := safedb.NewSafeDB(log, dir)
require.NoError(t, err)
t.Cleanup(func() {
_ = db.Close()
})
miner, seqEngine, sequencer := setupSequencerTest(t, sd, log)
miner.ActL1SetFeeRecipient(common.Address{'A'})
sequencer.ActL2PipelineFull(t)
verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), miner.BlobStore(), &sync.Config{}, WithSafeHeadListener(db))
rollupSeqCl := sequencer.RollupClient()
batcher := NewL2Batcher(log, sd.RollupCfg, DefaultBatcherCfg(dp),
rollupSeqCl, miner.EthClient(), seqEngine.EthClient(), seqEngine.EngineClient(t, sd.RollupCfg))
return sd, miner, sequencer, verifier, verifEngine, batcher
}
......@@ -291,6 +291,12 @@ var (
EnvVars: prefixEnvVars("ROLLUP_LOAD_PROTOCOL_VERSIONS"),
Category: RollupCategory,
}
SafeDBPath = &cli.StringFlag{
Name: "safedb.path",
Usage: "File path used to persist safe head update data. Disabled if not set.",
EnvVars: prefixEnvVars("SAFEDB_PATH"),
Hidden: true,
}
/* Deprecated Flags */
L2EngineSyncEnabled = &cli.BoolFlag{
Name: "l2.engine-sync",
......@@ -392,6 +398,7 @@ var optionalFlags = []cli.Flag{
ConductorEnabledFlag,
ConductorRpcFlag,
ConductorRpcTimeoutFlag,
SafeDBPath,
}
var DeprecatedFlags = []cli.Flag{
......
......@@ -2,8 +2,10 @@ package node
import (
"context"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
......@@ -33,6 +35,10 @@ type driverClient interface {
OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
}
type SafeDBReader interface {
SafeHeadAtL1(ctx context.Context, l1BlockNum uint64) (l1 eth.BlockID, l2 eth.BlockID, err error)
}
type adminAPI struct {
*rpc.CommonAdminAPI
dr driverClient
......@@ -89,15 +95,17 @@ type nodeAPI struct {
config *rollup.Config
client l2EthClient
dr driverClient
safeDB SafeDBReader
log log.Logger
m metrics.RPCMetricer
}
func NewNodeAPI(config *rollup.Config, l2Client l2EthClient, dr driverClient, log log.Logger, m metrics.RPCMetricer) *nodeAPI {
func NewNodeAPI(config *rollup.Config, l2Client l2EthClient, dr driverClient, safeDB SafeDBReader, log log.Logger, m metrics.RPCMetricer) *nodeAPI {
return &nodeAPI{
config: config,
client: l2Client,
dr: dr,
safeDB: safeDB,
log: log,
m: m,
}
......@@ -126,6 +134,21 @@ func (n *nodeAPI) OutputAtBlock(ctx context.Context, number hexutil.Uint64) (*et
}, nil
}
func (n *nodeAPI) SafeHeadAtL1Block(ctx context.Context, number hexutil.Uint64) (*eth.SafeHeadResponse, error) {
recordDur := n.m.RecordRPCServerRequest("optimism_safeHeadAtL1Block")
defer recordDur()
l1Block, safeHead, err := n.safeDB.SafeHeadAtL1(ctx, uint64(number))
if errors.Is(err, safedb.ErrNotFound) {
return nil, err
} else if err != nil {
return nil, fmt.Errorf("failed to get safe head at l1 block %s: %w", number, err)
}
return &eth.SafeHeadResponse{
L1Block: l1Block,
SafeHead: safeHead,
}, nil
}
func (n *nodeAPI) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
recordDur := n.m.RecordRPCServerRequest("optimism_syncStatus")
defer recordDur()
......
......@@ -44,6 +44,9 @@ type Config struct {
ConfigPersistence ConfigPersistence
// Path to store safe head database. Disabled when set to empty string
SafeDBPath string
// RuntimeConfigReloadInterval defines the interval between runtime config reloads.
// Disabled if <= 0.
// Runtime config changes should be picked up from log-events,
......
......@@ -4,9 +4,12 @@ import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/httputil"
......@@ -33,6 +36,12 @@ import (
var ErrAlreadyClosed = errors.New("node is already closed")
type closableSafeDB interface {
derive.SafeHeadListener
SafeDBReader
io.Closer
}
type OpNode struct {
log log.Logger
appVersion string
......@@ -51,6 +60,8 @@ type OpNode struct {
tracer Tracer // tracer to get events for testing/debugging
runCfg *RuntimeConfig // runtime configurables
safeDB closableSafeDB
rollupHalt string // when to halt the rollup, disabled if empty
pprofService *oppprof.Service
......@@ -130,7 +141,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
return fmt.Errorf("failed to init the P2P stack: %w", err)
}
// Only expose the server at the end, ensuring all RPC backend components are initialized.
if err := n.initRPCServer(ctx, cfg); err != nil {
if err := n.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to init the RPC server: %w", err)
}
if err := n.initMetricsServer(cfg); err != nil {
......@@ -379,13 +390,22 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
if cfg.Plasma.Enabled {
n.log.Info("Plasma DA enabled", "da_server", cfg.Plasma.DAServerURL)
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync, sequencerConductor, plasmaDA)
if cfg.SafeDBPath != "" {
n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath)
safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath)
if err != nil {
return fmt.Errorf("failed to create safe head database at %v: %w", cfg.SafeDBPath, err)
}
n.safeDB = safeDB
} else {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
return nil
}
func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
server, err := newRPCServer(ctx, &cfg.RPC, &cfg.Rollup, n.l2Source.L2Client, n.l2Driver, n.log, n.appVersion, n.metrics)
func (n *OpNode) initRPCServer(cfg *Config) error {
server, err := newRPCServer(&cfg.RPC, &cfg.Rollup, n.l2Source.L2Client, n.l2Driver, n.safeDB, n.log, n.appVersion, n.metrics)
if err != nil {
return err
}
......@@ -648,6 +668,12 @@ func (n *OpNode) Stop(ctx context.Context) error {
}
}
if n.safeDB != nil {
if err := n.safeDB.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close safe head db: %w", err))
}
}
// Wait for the runtime config loader to be done using the data sources before closing them
if n.runtimeConfigReloaderDone != nil {
<-n.runtimeConfigReloaderDone
......
package safedb
import (
"context"
"errors"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type DisabledDB struct{}
var (
Disabled = &DisabledDB{}
ErrNotEnabled = errors.New("safe head database not enabled")
)
func (d *DisabledDB) SafeHeadUpdated(_ eth.L2BlockRef, _ eth.BlockID) error {
return nil
}
func (d *DisabledDB) SafeHeadAtL1(_ context.Context, _ uint64) (l1 eth.BlockID, safeHead eth.BlockID, err error) {
err = ErrNotEnabled
return
}
func (d *DisabledDB) SafeHeadReset(_ eth.L2BlockRef) error {
return nil
}
func (d *DisabledDB) Close() error {
return nil
}
package safedb
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"slices"
"sync"
"github.com/cockroachdb/pebble"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrNotFound = errors.New("not found")
ErrInvalidEntry = errors.New("invalid db entry")
)
const (
// Keys are prefixed with a constant byte to allow us to differentiate different "columns" within the data
keyPrefixSafeByL1BlockNum byte = 0
)
var (
SafeByL1BlockNumKey = uint64Key{prefix: keyPrefixSafeByL1BlockNum}
)
type uint64Key struct {
prefix byte
}
func (c uint64Key) Of(num uint64) []byte {
key := make([]byte, 0, 9)
key = append(key, c.prefix)
key = binary.BigEndian.AppendUint64(key, num)
return key
}
func (c uint64Key) Max() []byte {
return c.Of(math.MaxUint64)
}
func (c uint64Key) IterRange() *pebble.IterOptions {
return &pebble.IterOptions{
LowerBound: c.Of(0),
UpperBound: c.Max(),
}
}
type SafeDB struct {
// m ensures all read iterators are closed before closing the database by preventing concurrent read and write
// operations (with close considered a write operation).
m sync.RWMutex
log log.Logger
db *pebble.DB
writeOpts *pebble.WriteOptions
closed bool
}
func SafeByL1BlockNumValue(l1 eth.BlockID, l2 eth.BlockID) []byte {
val := make([]byte, 0, 72)
val = append(val, l1.Hash.Bytes()...)
val = append(val, l2.Hash.Bytes()...)
val = binary.BigEndian.AppendUint64(val, l2.Number)
return val
}
func DecodeSafeByL1BlockNum(key []byte, val []byte) (l1 eth.BlockID, l2 eth.BlockID, err error) {
if len(key) != 9 || len(val) != 72 || key[0] != keyPrefixSafeByL1BlockNum {
err = ErrInvalidEntry
return
}
copy(l1.Hash[:], val[:32])
l1.Number = binary.BigEndian.Uint64(key[1:])
copy(l2.Hash[:], val[32:64])
l2.Number = binary.BigEndian.Uint64(val[64:])
return
}
func NewSafeDB(logger log.Logger, path string) (*SafeDB, error) {
db, err := pebble.Open(path, &pebble.Options{})
if err != nil {
return nil, err
}
return &SafeDB{
log: logger,
db: db,
writeOpts: &pebble.WriteOptions{Sync: true},
}, nil
}
func (d *SafeDB) SafeHeadUpdated(safeHead eth.L2BlockRef, l1Head eth.BlockID) error {
d.m.Lock()
defer d.m.Unlock()
d.log.Info("Record safe head", "l2", safeHead.ID(), "l1", l1Head)
batch := d.db.NewBatch()
defer batch.Close()
if err := batch.Set(SafeByL1BlockNumKey.Of(l1Head.Number), SafeByL1BlockNumValue(l1Head, safeHead.ID()), d.writeOpts); err != nil {
return fmt.Errorf("failed to record safe head update: %w", err)
}
if err := batch.Commit(d.writeOpts); err != nil {
return fmt.Errorf("failed to commit safe head update: %w", err)
}
return nil
}
func (d *SafeDB) SafeHeadReset(safeHead eth.L2BlockRef) error {
d.m.Lock()
defer d.m.Unlock()
iter, err := d.db.NewIter(SafeByL1BlockNumKey.IterRange())
if err != nil {
return fmt.Errorf("reset failed to create iterator: %w", err)
}
defer iter.Close()
if valid := iter.SeekGE(SafeByL1BlockNumKey.Of(safeHead.L1Origin.Number)); !valid {
// Reached end of column without finding any entries to delete
return nil
}
for {
val, err := iter.ValueAndErr()
if err != nil {
return fmt.Errorf("reset failed to read entry: %w", err)
}
l1Block, l2Block, err := DecodeSafeByL1BlockNum(iter.Key(), val)
if err != nil {
return fmt.Errorf("reset encountered invalid entry: %w", err)
}
if l2Block.Number >= safeHead.Number {
// Keep a copy of this key - it may be modified when calling Prev()
l1HeadKey := slices.Clone(iter.Key())
hasPrevEntry := iter.Prev()
// Found the first entry that made the new safe head safe.
batch := d.db.NewBatch()
if err := batch.DeleteRange(l1HeadKey, SafeByL1BlockNumKey.Max(), d.writeOpts); err != nil {
return fmt.Errorf("reset failed to delete entries after %v: %w", l1HeadKey, err)
}
// If we reset to a safe head before the first entry, we don't know if the new safe head actually became
// safe in that L1 block or if it was just before our records start, so don't record it as safe at the
// specified L1 block.
if hasPrevEntry {
if err := batch.Set(l1HeadKey, SafeByL1BlockNumValue(l1Block, safeHead.ID()), d.writeOpts); err != nil {
return fmt.Errorf("reset failed to record safe head update: %w", err)
}
}
if err := batch.Commit(d.writeOpts); err != nil {
return fmt.Errorf("reset failed to commit batch: %w", err)
}
return nil
}
if valid := iter.Next(); !valid {
// Reached end of column without finding any entries to delete
return nil
}
}
}
func (d *SafeDB) SafeHeadAtL1(ctx context.Context, l1BlockNum uint64) (l1Block eth.BlockID, safeHead eth.BlockID, err error) {
d.m.RLock()
defer d.m.RUnlock()
iter, err := d.db.NewIterWithContext(ctx, SafeByL1BlockNumKey.IterRange())
if err != nil {
return
}
defer iter.Close()
if valid := iter.SeekLT(SafeByL1BlockNumKey.Of(l1BlockNum + 1)); !valid {
err = ErrNotFound
return
}
// Found an entry at or before the requested L1 block
val, err := iter.ValueAndErr()
if err != nil {
return
}
l1Block, safeHead, err = DecodeSafeByL1BlockNum(iter.Key(), val)
return
}
func (d *SafeDB) Close() error {
d.m.Lock()
defer d.m.Unlock()
if d.closed {
// Already closed
return nil
}
d.closed = true
return d.db.Close()
}
package safedb
import (
"context"
"math"
"slices"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestStoreSafeHeads(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
dir := t.TempDir()
db, err := NewSafeDB(logger, dir)
require.NoError(t, err)
defer db.Close()
l2a := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xaa},
Number: 20,
}
l2b := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xbb},
Number: 25,
}
l1a := eth.BlockID{
Hash: common.Hash{0x01, 0xaa},
Number: 100,
}
l1b := eth.BlockID{
Hash: common.Hash{0x01, 0xbb},
Number: 150,
}
require.NoError(t, db.SafeHeadUpdated(l2a, l1a))
require.NoError(t, db.SafeHeadUpdated(l2b, l1b))
verifySafeHeads := func(db *SafeDB) {
_, _, err = db.SafeHeadAtL1(context.Background(), l1a.Number-1)
require.ErrorIs(t, err, ErrNotFound)
actualL1, actualL2, err := db.SafeHeadAtL1(context.Background(), l1a.Number)
require.NoError(t, err)
require.Equal(t, l1a, actualL1)
require.Equal(t, l2a.ID(), actualL2)
actualL1, actualL2, err = db.SafeHeadAtL1(context.Background(), l1a.Number+1)
require.NoError(t, err)
require.Equal(t, l1a, actualL1)
require.Equal(t, l2a.ID(), actualL2)
actualL1, actualL2, err = db.SafeHeadAtL1(context.Background(), l1b.Number)
require.NoError(t, err)
require.Equal(t, l1b, actualL1)
require.Equal(t, l2b.ID(), actualL2)
actualL1, actualL2, err = db.SafeHeadAtL1(context.Background(), l1b.Number+1)
require.NoError(t, err)
require.Equal(t, l1b, actualL1)
require.Equal(t, l2b.ID(), actualL2)
}
// Verify loading the safe heads with the already open DB
verifySafeHeads(db)
// Close the DB and open a new instance
require.NoError(t, db.Close())
newDB, err := NewSafeDB(logger, dir)
require.NoError(t, err)
// Verify the data is reloaded correctly
verifySafeHeads(newDB)
}
func TestSafeHeadAtL1_EmptyDatabase(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
dir := t.TempDir()
db, err := NewSafeDB(logger, dir)
require.NoError(t, err)
defer db.Close()
_, _, err = db.SafeHeadAtL1(context.Background(), 100)
require.ErrorIs(t, err, ErrNotFound)
}
func TestTruncateOnSafeHeadReset(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
dir := t.TempDir()
db, err := NewSafeDB(logger, dir)
require.NoError(t, err)
defer db.Close()
l2a := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xaa},
Number: 20,
L1Origin: eth.BlockID{
Number: 60,
},
}
l2b := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xbb},
Number: 22,
L1Origin: eth.BlockID{
Number: 90,
},
}
l2c := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xcc},
Number: 25,
L1Origin: eth.BlockID{
Number: 110,
},
}
l2d := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xcc},
Number: 30,
L1Origin: eth.BlockID{
Number: 120,
},
}
l1a := eth.BlockID{
Hash: common.Hash{0x01, 0xaa},
Number: 100,
}
l1b := eth.BlockID{
Hash: common.Hash{0x01, 0xbb},
Number: 150,
}
l1c := eth.BlockID{
Hash: common.Hash{0x01, 0xcc},
Number: 160,
}
// Add some entries
require.NoError(t, db.SafeHeadUpdated(l2a, l1a))
require.NoError(t, db.SafeHeadUpdated(l2c, l1b))
require.NoError(t, db.SafeHeadUpdated(l2d, l1c))
// Then reset to between the two existing entries
require.NoError(t, db.SafeHeadReset(l2b))
// Only the reset safe head is now safe at the previous L1 block number
actualL1, actualL2, err := db.SafeHeadAtL1(context.Background(), l1b.Number)
require.NoError(t, err)
require.Equal(t, l1b, actualL1)
require.Equal(t, l2b.ID(), actualL2)
actualL1, actualL2, err = db.SafeHeadAtL1(context.Background(), l1c.Number)
require.NoError(t, err)
require.Equal(t, l1b, actualL1)
require.Equal(t, l2b.ID(), actualL2)
// l2a is still safe from its original update
actualL1, actualL2, err = db.SafeHeadAtL1(context.Background(), l1a.Number)
require.NoError(t, err)
require.Equal(t, l1a, actualL1)
require.Equal(t, l2a.ID(), actualL2)
}
func TestTruncateOnSafeHeadReset_BeforeFirstEntry(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
dir := t.TempDir()
db, err := NewSafeDB(logger, dir)
require.NoError(t, err)
defer db.Close()
l2b := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xbb},
Number: 22,
L1Origin: eth.BlockID{
Number: 90,
},
}
l2c := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xcc},
Number: 25,
L1Origin: eth.BlockID{
Number: 110,
},
}
l2d := eth.L2BlockRef{
Hash: common.Hash{0x02, 0xcc},
Number: 30,
L1Origin: eth.BlockID{
Number: 120,
},
}
l1a := eth.BlockID{
Hash: common.Hash{0x01, 0xaa},
Number: 100,
}
l1b := eth.BlockID{
Hash: common.Hash{0x01, 0xbb},
Number: 150,
}
l1c := eth.BlockID{
Hash: common.Hash{0x01, 0xcc},
Number: 160,
}
// Add some entries
require.NoError(t, db.SafeHeadUpdated(l2c, l1b))
require.NoError(t, db.SafeHeadUpdated(l2d, l1c))
// Then reset to between the two existing entries
require.NoError(t, db.SafeHeadReset(l2b))
// All entries got removed
_, _, err = db.SafeHeadAtL1(context.Background(), l1a.Number)
require.ErrorIs(t, err, ErrNotFound)
_, _, err = db.SafeHeadAtL1(context.Background(), l1b.Number)
require.ErrorIs(t, err, ErrNotFound)
_, _, err = db.SafeHeadAtL1(context.Background(), l1c.Number)
require.ErrorIs(t, err, ErrNotFound)
}
func TestKeysFollowNaturalByteOrdering(t *testing.T) {
vals := []uint64{0, 1, math.MaxUint32 - 1, math.MaxUint32, math.MaxUint32 + 1, math.MaxUint64 - 1, math.MaxUint64}
for i := 1; i < len(vals); i++ {
prev := SafeByL1BlockNumKey.Of(vals[i-1])
cur := SafeByL1BlockNumKey.Of(vals[i])
require.True(t, slices.Compare(prev, cur) < 0, "Expected %v key %x to be less than %v key %x", vals[i-1], prev, vals[i], cur)
}
}
......@@ -27,8 +27,8 @@ type rpcServer struct {
sources.L2Client
}
func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Config, l2Client l2EthClient, dr driverClient, log log.Logger, appVersion string, m metrics.Metricer) (*rpcServer, error) {
api := NewNodeAPI(rollupCfg, l2Client, dr, log.New("rpc", "node"), m)
func newRPCServer(rpcCfg *RPCConfig, rollupCfg *rollup.Config, l2Client l2EthClient, dr driverClient, safedb SafeDBReader, log log.Logger, appVersion string, m metrics.Metricer) (*rpcServer, error) {
api := NewNodeAPI(rollupCfg, l2Client, dr, safedb, log.New("rpc", "node"), m)
// TODO: extend RPC config with options for WS, IPC and HTTP RPC connections
endpoint := net.JoinHostPort(rpcCfg.ListenAddr, strconv.Itoa(rpcCfg.ListenPort))
r := &rpcServer{
......
......@@ -7,6 +7,7 @@ import (
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/assert"
......@@ -97,10 +98,11 @@ func TestOutputAtBlock(t *testing.T) {
l2Client.ExpectOutputV0AtBlock(common.HexToHash("0x8512bee03061475e4b069171f7b406097184f16b22c3f5c97c0abfc49591c524"), output, nil)
drClient := &mockDriverClient{}
safeReader := &mockSafeDBReader{}
status := randomSyncStatus(rand.New(rand.NewSource(123)))
drClient.ExpectBlockRefWithStatus(0xdcdc89, ref, status, nil)
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NoopMetrics)
server, err := newRPCServer(rpcCfg, rollupCfg, l2Client, drClient, safeReader, log, "0.0", metrics.NoopMetrics)
require.NoError(t, err)
require.NoError(t, server.Start())
defer func() {
......@@ -121,12 +123,14 @@ func TestOutputAtBlock(t *testing.T) {
require.Equal(t, *status, *out.Status)
l2Client.Mock.AssertExpectations(t)
drClient.Mock.AssertExpectations(t)
safeReader.Mock.AssertExpectations(t)
}
func TestVersion(t *testing.T) {
log := testlog.Logger(t, log.LevelError)
l2Client := &testutils.MockL2Client{}
drClient := &mockDriverClient{}
safeReader := &mockSafeDBReader{}
rpcCfg := &RPCConfig{
ListenAddr: "localhost",
ListenPort: 0,
......@@ -134,7 +138,7 @@ func TestVersion(t *testing.T) {
rollupCfg := &rollup.Config{
// ignore other rollup config info in this test
}
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NoopMetrics)
server, err := newRPCServer(rpcCfg, rollupCfg, l2Client, drClient, safeReader, log, "0.0", metrics.NoopMetrics)
assert.NoError(t, err)
assert.NoError(t, server.Start())
defer func() {
......@@ -168,6 +172,7 @@ func TestSyncStatus(t *testing.T) {
log := testlog.Logger(t, log.LevelError)
l2Client := &testutils.MockL2Client{}
drClient := &mockDriverClient{}
safeReader := &mockSafeDBReader{}
rng := rand.New(rand.NewSource(1234))
status := randomSyncStatus(rng)
drClient.On("SyncStatus").Return(status)
......@@ -179,7 +184,7 @@ func TestSyncStatus(t *testing.T) {
rollupCfg := &rollup.Config{
// ignore other rollup config info in this test
}
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NoopMetrics)
server, err := newRPCServer(rpcCfg, rollupCfg, l2Client, drClient, safeReader, log, "0.0", metrics.NoopMetrics)
assert.NoError(t, err)
assert.NoError(t, server.Start())
defer func() {
......@@ -195,6 +200,52 @@ func TestSyncStatus(t *testing.T) {
assert.Equal(t, status, out)
}
func TestSafeHeadAtL1Block(t *testing.T) {
log := testlog.Logger(t, log.LevelError)
l2Client := &testutils.MockL2Client{}
drClient := &mockDriverClient{}
safeReader := &mockSafeDBReader{}
l1BlockNum := uint64(5223)
expectedL1 := eth.BlockID{
Hash: common.Hash{0xdd},
Number: l1BlockNum - 2,
}
expectedSafeHead := eth.BlockID{
Hash: common.Hash{0xee},
Number: 223,
}
expected := &eth.SafeHeadResponse{
L1Block: expectedL1,
SafeHead: expectedSafeHead,
}
safeReader.ExpectSafeHeadAtL1(l1BlockNum, expectedL1, expectedSafeHead, nil)
rpcCfg := &RPCConfig{
ListenAddr: "localhost",
ListenPort: 0,
}
rollupCfg := &rollup.Config{
// ignore other rollup config info in this test
}
server, err := newRPCServer(rpcCfg, rollupCfg, l2Client, drClient, safeReader, log, "0.0", metrics.NoopMetrics)
require.NoError(t, err)
require.NoError(t, server.Start())
defer func() {
require.NoError(t, server.Stop(context.Background()))
}()
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
require.NoError(t, err)
var out *eth.SafeHeadResponse
err = client.CallContext(context.Background(), &out, "optimism_safeHeadAtL1Block", hexutil.Uint64(l1BlockNum).String())
require.NoError(t, err)
require.Equal(t, expected, out)
l2Client.Mock.AssertExpectations(t)
drClient.Mock.AssertExpectations(t)
safeReader.Mock.AssertExpectations(t)
}
type mockDriverClient struct {
mock.Mock
}
......@@ -231,3 +282,16 @@ func (c *mockDriverClient) SequencerActive(ctx context.Context) (bool, error) {
func (c *mockDriverClient) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return c.Mock.MethodCalled("OnUnsafeL2Payload").Get(0).(error)
}
type mockSafeDBReader struct {
mock.Mock
}
func (m *mockSafeDBReader) SafeHeadAtL1(ctx context.Context, l1BlockNum uint64) (l1Hash eth.BlockID, l2Hash eth.BlockID, err error) {
r := m.Mock.MethodCalled("SafeHeadAtL1", l1BlockNum)
return r[0].(eth.BlockID), r[1].(eth.BlockID), *r[2].(*error)
}
func (m *mockSafeDBReader) ExpectSafeHeadAtL1(l1BlockNum uint64, l1 eth.BlockID, safeHead eth.BlockID, err error) {
m.Mock.On("SafeHeadAtL1", l1BlockNum).Return(l1, safeHead, &err)
}
......@@ -92,6 +92,19 @@ type LocalEngineControl interface {
SetPendingSafeL2Head(eth.L2BlockRef)
}
// SafeHeadListener is called when the safe head is updated.
// The safe head may advance by more than one block in a single update
// The l1Block specified is the first L1 block that includes sufficient information to derive the new safe head
type SafeHeadListener interface {
// SafeHeadUpdated indicates that the safe head has been updated in response to processing batch data
// The l1Block specified is the first L1 block containing all required batch data to derive newSafeHead
SafeHeadUpdated(newSafeHead eth.L2BlockRef, l1Block eth.BlockID) error
// SafeHeadReset indicates that the derivation pipeline reset back to the specified safe head
// The L1 block that made the new safe head safe is unknown.
SafeHeadReset(resetSafeHead eth.L2BlockRef) error
}
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024
......@@ -153,10 +166,13 @@ type EngineQueue struct {
l1Fetcher L1Fetcher
syncCfg *sync.Config
safeHeadNotifs SafeHeadListener // notified when safe head is updated
lastNotifiedSafeHead eth.L2BlockRef
}
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
......@@ -168,6 +184,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engin
prev: prev,
l1Fetcher: l1Fetcher,
syncCfg: syncCfg,
safeHeadNotifs: safeHeadNotifs,
}
}
......@@ -271,7 +288,10 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
return err
}
eq.origin = newOrigin
eq.postProcessSafeL2() // make sure we track the last L2 safe head for every new L1 block
// make sure we track the last L2 safe head for every new L1 block
if err := eq.postProcessSafeL2(); err != nil {
return err
}
// try to finalize the L2 blocks we have synced so far (no-op if L1 finality is behind)
if err := eq.tryFinalizePastL2Blocks(ctx); err != nil {
return err
......@@ -379,7 +399,10 @@ func (eq *EngineQueue) tryFinalizeL2() {
// postProcessSafeL2 buffers the L1 block the safe head was fully derived from,
// to finalize it once the L1 block, or later, finalizes.
func (eq *EngineQueue) postProcessSafeL2() {
func (eq *EngineQueue) postProcessSafeL2() error {
if err := eq.notifyNewSafeHead(eq.ec.SafeL2Head()); err != nil {
return err
}
// prune finality data if necessary
if len(eq.finalityData) >= finalityLookback {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
......@@ -401,6 +424,23 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.log.Debug("updated finality-data", "last_l1", last.L1Block, "last_l2", last.L2Block)
}
}
return nil
}
// notifyNewSafeHead calls the safe head listener with the current safe head and l1 origin information.
func (eq *EngineQueue) notifyNewSafeHead(safeHead eth.L2BlockRef) error {
if eq.lastNotifiedSafeHead == safeHead {
// No change, no need to notify
return nil
}
if err := eq.safeHeadNotifs.SafeHeadUpdated(safeHead, eq.origin.ID()); err != nil {
// At this point our state is in a potentially inconsistent state as we've updated the safe head
// in the execution client but failed to post process it. Reset the pipeline so the safe head rolls back
// a little (it always rolls back at least 1 block) and then it will retry storing the entry
return NewResetError(fmt.Errorf("safe head notifications failed: %w", err))
}
eq.lastNotifiedSafeHead = safeHead
return nil
}
func (eq *EngineQueue) logSyncProgress(reason string) {
......@@ -520,7 +560,9 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
eq.ec.SetPendingSafeL2Head(ref)
if eq.safeAttributes.isLastInSpan {
eq.ec.SetSafeHead(ref)
eq.postProcessSafeL2()
if err := eq.postProcessSafeL2(); err != nil {
return err
}
}
// unsafe head stays the same, we did not reorg the chain.
eq.safeAttributes = nil
......@@ -581,7 +623,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
eq.safeAttributes = nil
eq.logSyncProgress("processed safe block derived from L1")
if lastInSpan {
eq.postProcessSafeL2()
if err := eq.postProcessSafeL2(); err != nil {
return err
}
}
return nil
......@@ -656,6 +700,10 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
// note: we do not clear the unsafe payloads queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.origin = pipelineOrigin
eq.sysCfg = l1Cfg
eq.lastNotifiedSafeHead = safe
if err := eq.safeHeadNotifs.SafeHeadReset(safe); err != nil {
return err
}
eq.logSyncProgress("reset derivation work")
return io.EOF
}
......
......@@ -8,6 +8,7 @@ import (
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"
......@@ -251,7 +252,7 @@ func TestEngineQueue_Finalize(t *testing.T) {
prev := &fakeAttributesQueue{}
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
......@@ -262,13 +263,13 @@ func TestEngineQueue_Finalize(t *testing.T) {
eq.origin = refD
prev.origin = refD
eq.ec.SetSafeHead(refC1)
eq.postProcessSafeL2()
require.NoError(t, eq.postProcessSafeL2())
// now say D0 was included in E and became the new safe head
eq.origin = refE
prev.origin = refE
eq.ec.SetSafeHead(refD0)
eq.postProcessSafeL2()
require.NoError(t, eq.postProcessSafeL2())
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD)
......@@ -487,7 +488,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
prev := &fakeAttributesQueue{origin: refE}
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
......@@ -817,7 +818,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
prev := &fakeAttributesQueue{origin: refE}
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
......@@ -914,7 +915,7 @@ func TestBlockBuildingRace(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{}, safedb.Disabled)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
id := eth.PayloadID{0xff}
......@@ -1086,7 +1087,7 @@ func TestResetLoop(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled)
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA1)
eq.ec.SetFinalizedHead(refA0)
......@@ -1192,7 +1193,7 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA}
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{}, safedb.Disabled)
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA0)
eq.ec.SetFinalizedHead(refA0)
......
......@@ -68,7 +68,7 @@ type DerivationPipeline struct {
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasmaInputs PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, plasmaInputs PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config, safeHeadListener SafeHeadListener) *DerivationPipeline {
// Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
......@@ -82,7 +82,7 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue)
// Step stages
eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg)
eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg, safeHeadListener)
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other.
......
......@@ -114,14 +114,30 @@ type SequencerStateListener interface {
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1Blobs derive.L1BlobsFetcher, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config, sequencerConductor conductor.SequencerConductor, plasma derive.PlasmaInputFetcher) *Driver {
func NewDriver(
driverCfg *Config,
cfg *rollup.Config,
l2 L2Chain,
l1 L1Chain,
l1Blobs derive.L1BlobsFetcher,
altSync AltSync,
network Network,
log log.Logger,
snapshotLog log.Logger,
metrics Metrics,
sequencerStateListener SequencerStateListener,
safeHeadListener derive.SafeHeadListener,
syncCfg *sync.Config,
sequencerConductor conductor.SequencerConductor,
plasma derive.PlasmaInputFetcher,
) *Driver {
l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
engine := derive.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, engine, metrics, syncCfg)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, engine, metrics, syncCfg, safeHeadListener)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
......
......@@ -107,6 +107,23 @@ func TestFindSyncStart(t *testing.T) {
SafeL2Head: 'A',
ExpectedErr: nil,
},
{
Name: "already synced with safe head after genesis",
GenesisL1Num: 0,
L1: "abcdefghijkj",
L2: "ABCDEFGHIJKJ",
NewL1: "abcdefghijkj",
PreFinalizedL2: 'B',
PreSafeL2: 'D',
GenesisL1: 'a',
GenesisL2: 'A',
UnsafeL2Head: 'J',
SeqWindowSize: 2,
// Important this steps back at least one safe block so the safedb is sent the latest safe head
// again - we may be resetting because the safedb failed to write the previous entry
SafeL2Head: 'C',
ExpectedErr: nil,
},
{
Name: "small reorg long chain",
GenesisL1Num: 0,
......
......@@ -101,6 +101,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
URL: ctx.String(flags.HeartbeatURLFlag.Name),
},
ConfigPersistence: configPersistence,
SafeDBPath: ctx.String(flags.SafeDBPath.Name),
Sync: *syncConfig,
RollupHalt: haltOption,
RethDBPath: ctx.String(flags.L1RethDBPath.Name),
......
......@@ -7,6 +7,7 @@ import (
"io"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
......@@ -39,7 +40,7 @@ type Driver struct {
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, nil, l2Source, engine, metrics.NoopMetrics, &sync.Config{})
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, nil, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled)
pipeline.Reset()
return &Driver{
logger: logger,
......
......@@ -16,6 +16,11 @@ type OutputResponse struct {
Status *SyncStatus `json:"syncStatus"`
}
type SafeHeadResponse struct {
L1Block BlockID `json:"l1Block"`
SafeHead BlockID `json:"safeHead"`
}
var (
ErrInvalidOutput = errors.New("invalid output")
ErrInvalidOutputVersion = errors.New("invalid output version")
......
......@@ -27,6 +27,12 @@ func (r *RollupClient) OutputAtBlock(ctx context.Context, blockNum uint64) (*eth
return output, err
}
func (r *RollupClient) SafeHeadAtL1Block(ctx context.Context, blockNum uint64) (*eth.SafeHeadResponse, error) {
var output *eth.SafeHeadResponse
err := r.rpc.CallContext(ctx, &output, "optimism_safeHeadAtL1Block", hexutil.Uint64(blockNum))
return output, err
}
func (r *RollupClient) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
var output *eth.SyncStatus
err := r.rpc.CallContext(ctx, &output, "optimism_syncStatus")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment