Commit 7f403ea5 authored by protolambda's avatar protolambda Committed by GitHub

op-node: maintain sync-status through events, remove legacy snapshot-log (#11008)

* op-node: maintain sync-status through events, remove legacy snapshot-log

* op-service: fix typo
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>

* op-node: clarify hidden snapshot log flag

* op-node: make CurrentL1 SyncStatus update more frequently

---------
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>
parent 036a14a0
......@@ -6,7 +6,6 @@ import (
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
......@@ -17,7 +16,6 @@ import (
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -242,15 +240,6 @@ func L2Finalization(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
engBlock, err := engCl.L2BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
require.Equal(t, heightToSubmit, engBlock.Number, "engine finalizes what rollup node finalizes")
// Now try to finalize block 4, but with a bad/malicious alternative hash.
// If we get this false signal, we shouldn't finalize the L2 chain.
altBlock4 := sequencer.SyncStatus().SafeL1
altBlock4.Hash = common.HexToHash("0xdead")
sequencer.synchronousEvents.Emit(finality.FinalizeL1Event{FinalizedL1: altBlock4})
sequencer.ActL2PipelineFull(t)
require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number)
require.Equal(t, heightToSubmit, sequencer.SyncStatus().FinalizedL2.Number, "unknown/bad finalized L1 blocks are ignored")
}
// L2FinalizationWithSparseL1 tests that safe L2 blocks can be finalized even if we do not regularly get a L1 finalization signal
......
......@@ -4,16 +4,18 @@ import (
"context"
"errors"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -47,7 +49,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri
plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer {
ver := NewL2Verifier(t, log, l1, blobSrc, plasmaSrc, eng, cfg, &sync.Config{}, safedb.Disabled)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1)
seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.syncStatus.L1Head, l1)
l1OriginSelector := &MockL1OriginSelector{
actual: driver.NewL1OriginSelector(log, cfg, seqConfDepthL1),
}
......@@ -103,7 +105,16 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
// For advanced tests we can catch those and print a warning instead.
require.NoError(t, err)
// TODO: action-test publishing of payload on p2p
// After having built a L2 block, make sure to get an engine update processed.
// This will ensure the sync-status and such reflect the latest changes.
s.synchronousEvents.Emit(engine.TryUpdateEngineEvent{})
s.synchronousEvents.Emit(engine.ForkchoiceRequestEvent{})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool {
x, ok := ev.(engine.ForkchoiceUpdateEvent)
return ok && x.UnsafeL2Head == s.engine.UnsafeL2Head()
}, false))
require.Equal(t, s.engine.UnsafeL2Head(), s.syncStatus.SyncStatus().UnsafeL2,
"sync status must be accurate after block building")
}
// ActL2KeepL1Origin makes the sequencer use the current L1 origin, even if the next origin is available.
......@@ -117,7 +128,7 @@ func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) {
// ActBuildToL1Head builds empty blocks until (incl.) the L1 head becomes the L2 origin
func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
for s.engine.UnsafeL2Head().L1Origin.Number < s.syncStatus.L1Head().Number {
s.ActL2PipelineFull(t)
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
......@@ -126,7 +137,7 @@ func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
// ActBuildToL1HeadUnsafe builds empty blocks until (incl.) the L1 head becomes the L1 origin of the L2 head
func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) {
for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
for s.engine.UnsafeL2Head().L1Origin.Number < s.syncStatus.L1Head().Number {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
......@@ -139,7 +150,7 @@ func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
s.ActL2PipelineFull(t)
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head())
require.NoError(t, err)
if nextOrigin.Number >= s.l1State.L1Head().Number {
if nextOrigin.Number >= s.syncStatus.L1Head().Number {
break
}
s.ActL2StartBlock(t)
......@@ -153,7 +164,7 @@ func (s *L2Sequencer) ActBuildToL1HeadExclUnsafe(t Testing) {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head())
require.NoError(t, err)
if nextOrigin.Number >= s.l1State.L1Head().Number {
if nextOrigin.Number >= s.syncStatus.L1Head().Number {
break
}
s.ActL2StartBlock(t)
......
......@@ -20,6 +20,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/status"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -38,6 +39,8 @@ type L2Verifier struct {
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
}
syncStatus driver.SyncStatusTracker
synchronousEvents *rollup.SynchronousEvents
syncDeriver *driver.SyncDeriver
......@@ -52,7 +55,6 @@ type L2Verifier struct {
syncCfg *sync.Config
l1 derive.L1Fetcher
l1State *driver.L1State
l2PipelineIdle bool
l2Building bool
......@@ -107,6 +109,8 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline, synchronousEvents)
syncStatusTracker := status.NewStatusTracker(log, metrics)
syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
......@@ -136,7 +140,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
syncCfg: syncCfg,
syncDeriver: syncDeriver,
l1: l1,
l1State: driver.NewL1State(log, metrics),
syncStatus: syncStatusTracker,
l2PipelineIdle: true,
l2Building: false,
rollupCfg: cfg,
......@@ -145,6 +149,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
}
*rootDeriver = rollup.SynchronousDerivers{
syncStatusTracker,
syncDeriver,
engineResetDeriver,
engDeriv,
......@@ -238,17 +243,7 @@ func (s *L2Verifier) L2BackupUnsafe() eth.L2BlockRef {
}
func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(),
CurrentL1Finalized: s.finalizer.FinalizedL1(),
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
UnsafeL2: s.L2Unsafe(),
SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(),
}
return s.syncStatus.SyncStatus()
}
func (s *L2Verifier) RollupClient() *sources.RollupClient {
......@@ -279,20 +274,34 @@ func (s *L2Verifier) ActRPCFail(t Testing) {
func (s *L2Verifier) ActL1HeadSignal(t Testing) {
head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
s.l1State.HandleNewL1HeadBlock(head)
s.synchronousEvents.Emit(status.L1UnsafeEvent{L1Unsafe: head})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool {
x, ok := ev.(status.L1UnsafeEvent)
return ok && x.L1Unsafe == head
}, false))
require.Equal(t, head, s.syncStatus.SyncStatus().HeadL1)
}
func (s *L2Verifier) ActL1SafeSignal(t Testing) {
safe, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
s.l1State.HandleNewL1SafeBlock(safe)
s.synchronousEvents.Emit(status.L1SafeEvent{L1Safe: safe})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool {
x, ok := ev.(status.L1SafeEvent)
return ok && x.L1Safe == safe
}, false))
require.Equal(t, safe, s.syncStatus.SyncStatus().SafeL1)
}
func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
s.l1State.HandleNewL1FinalizedBlock(finalized)
s.synchronousEvents.Emit(finality.FinalizeL1Event{FinalizedL1: finalized})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool {
x, ok := ev.(finality.FinalizeL1Event)
return ok && x.FinalizedL1 == finalized
}, false))
require.Equal(t, finalized, s.syncStatus.SyncStatus().FinalizedL1)
}
func (s *L2Verifier) OnEvent(ev rollup.Event) {
......
......@@ -825,7 +825,7 @@ func SyncAfterReorg(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
miner.ActL1SetFeeRecipient(common.Address{'A', 0})
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.syncStatus.L1Head().Number {
// build L2 blocks until the L1 origin is the current L1 head(A0)
sequencer.ActL2PipelineFull(t)
sequencer.ActL2StartBlock(t)
......
......@@ -515,7 +515,7 @@ func TestSpanBatchLowThroughputChain(gt *testing.T) {
totalTxCount := 0
// Make 600 L2 blocks (L1BlockTime / L2BlockTime * 50) including 1~3 txs
for i := 0; i < 50; i++ {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.syncStatus.L1Head().Number {
sequencer.ActL2StartBlock(t)
// fill the block with random number of L2 txs
for j := 0; j < rand.Intn(3); j++ {
......@@ -654,7 +654,7 @@ func TestBatchEquivalence(gt *testing.T) {
sequencer.ActL2PipelineFull(t)
totalTxCount := 0
// Build random blocks
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.syncStatus.L1Head().Number {
sequencer.ActL2StartBlock(t)
// fill the block with random number of L2 txs
for j := 0; j < rand.Intn(3); j++ {
......
......@@ -140,7 +140,8 @@ func FinalizeWhileSyncing(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
verifier.ActL2PipelineFull(t)
// Verify the verifier finalized something new
require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync")
result := verifier.SyncStatus()
require.Less(t, verifierStartStatus.FinalizedL2.Number, result.FinalizedL2.Number, "verifier finalized L2 blocks during sync")
}
// TestUnsafeSync tests that a verifier properly imports unsafe blocks via gossip.
......
......@@ -689,9 +689,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
}
}
// Don't log state snapshots in test output
snapLog := log.NewLogger(log.DiscardHandler())
// Rollup nodes
// Ensure we are looping through the nodes in alphabetical order
......@@ -732,7 +729,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
l.Warn("closed op-node!")
}()
}
node, err := rollupNode.New(context.Background(), &c, l, snapLog, "", metrics.NewMetrics(""))
node, err := rollupNode.New(context.Background(), &c, l, "", metrics.NewMetrics(""))
if err != nil {
return nil, err
}
......
......@@ -765,7 +765,6 @@ func TestSystemP2PAltSync(t *testing.T) {
// set up our syncer node, connect it to alice/bob
cfg.Loggers["syncer"] = testlog.Logger(t, log.LevelInfo).New("role", "syncer")
snapLog := log.NewLogger(log.DiscardHandler())
// Create a peer, and hook up alice and bob
h, err := sys.newMockNetPeer()
......@@ -803,7 +802,7 @@ func TestSystemP2PAltSync(t *testing.T) {
configureL2(syncNodeCfg, syncerL2Engine, cfg.JWTSecret)
syncerNode, err := rollupNode.New(ctx, syncNodeCfg, cfg.Loggers["syncer"], snapLog, "", metrics.NewMetrics(""))
syncerNode, err := rollupNode.New(ctx, syncNodeCfg, cfg.Loggers["syncer"], "", metrics.NewMetrics(""))
require.NoError(t, err)
err = syncerNode.Start(ctx)
require.NoError(t, err)
......
......@@ -85,11 +85,6 @@ func RollupNodeMain(ctx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.
}
cfg.Cancel = closeApp
snapshotLog, err := opnode.NewSnapshotLogger(ctx)
if err != nil {
return nil, fmt.Errorf("unable to create snapshot root logger: %w", err)
}
// Only pretty-print the banner if it is a terminal log. Other log it as key-value pairs.
if logCfg.Format == "terminal" {
log.Info("rollup config:\n" + cfg.Rollup.Description(chaincfg.L2ChainIDToNetworkDisplayName))
......@@ -97,7 +92,7 @@ func RollupNodeMain(ctx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.
cfg.Rollup.LogDescription(log, chaincfg.L2ChainIDToNetworkDisplayName)
}
n, err := node.New(ctx.Context, cfg, log, snapshotLog, VersionWithMeta, m)
n, err := node.New(ctx.Context, cfg, log, VersionWithMeta, m)
if err != nil {
return nil, fmt.Errorf("unable to create the rollup node: %w", err)
}
......
......@@ -260,9 +260,10 @@ var (
}
SnapshotLog = &cli.StringFlag{
Name: "snapshotlog.file",
Usage: "Path to the snapshot log file",
Usage: "Deprecated. This flag is ignored, but here for compatibility.",
EnvVars: prefixEnvVars("SNAPSHOT_LOG"),
Category: OperationsCategory,
Hidden: true, // non-critical function, removed, flag is no-op to avoid breaking setups.
}
HeartbeatEnabledFlag = &cli.BoolFlag{
Name: "heartbeat.enabled",
......
......@@ -89,7 +89,7 @@ var _ p2p.GossipIn = (*OpNode)(nil)
// New creates a new OpNode instance.
// The provided ctx argument is for the span of initialization only;
// the node will immediately Stop(ctx) before finishing initialization if the context is canceled during initialization.
func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logger, appVersion string, m *metrics.Metrics) (*OpNode, error) {
func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m *metrics.Metrics) (*OpNode, error) {
if err := cfg.Check(); err != nil {
return nil, err
}
......@@ -104,7 +104,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
// not a context leak, gossipsub is closed with a context.
n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())
err := n.init(ctx, cfg, snapshotLog)
err := n.init(ctx, cfg)
if err != nil {
log.Error("Error initializing the rollup node", "err", err)
// ensure we always close the node resources if we fail to initialize the node.
......@@ -116,7 +116,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
return n, nil
}
func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
func (n *OpNode) init(ctx context.Context, cfg *Config) error {
n.log.Info("Initializing rollup node", "version", n.appVersion)
if err := n.initTracer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the trace: %w", err)
......@@ -127,7 +127,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL1BeaconAPI(ctx, cfg); err != nil {
return err
}
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
if err := n.initL2(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L2: %w", err)
}
if err := n.initRuntimeConfig(ctx, cfg); err != nil { // depends on L2, to signal initial runtime values to
......@@ -363,7 +363,7 @@ func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error {
}
}
func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err)
......@@ -401,7 +401,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
} else {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, plasmaDA)
return nil
}
......
......@@ -17,6 +17,14 @@ func (d DeriverIdleEvent) String() string {
return "derivation-idle"
}
type DeriverL1StatusEvent struct {
Origin eth.L1BlockRef
}
func (d DeriverL1StatusEvent) String() string {
return "deriver-l1-status"
}
type DeriverMoreEvent struct{}
func (d DeriverMoreEvent) String() string {
......@@ -83,7 +91,12 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) {
return
}
d.pipeline.log.Trace("Derivation pipeline step", "onto_origin", d.pipeline.Origin())
preOrigin := d.pipeline.Origin()
attrib, err := d.pipeline.Step(d.ctx, x.PendingSafe)
postOrigin := d.pipeline.Origin()
if preOrigin != postOrigin {
d.emitter.Emit(DeriverL1StatusEvent{Origin: postOrigin})
}
if err == io.EOF {
d.pipeline.log.Debug("Derivation process went idle", "progress", d.pipeline.Origin(), "err", err)
d.emitter.Emit(DeriverIdleEvent{Origin: d.pipeline.Origin()})
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/status"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -104,14 +105,10 @@ type PlasmaIface interface {
derive.PlasmaInputFetcher
}
type L1StateIface interface {
HandleNewL1HeadBlock(head eth.L1BlockRef)
HandleNewL1SafeBlock(safe eth.L1BlockRef)
HandleNewL1FinalizedBlock(finalized eth.L1BlockRef)
type SyncStatusTracker interface {
rollup.Deriver
SyncStatus() *eth.SyncStatus
L1Head() eth.L1BlockRef
L1Safe() eth.L1BlockRef
L1Finalized() eth.L1BlockRef
}
type SequencerIface interface {
......@@ -162,7 +159,6 @@ func NewDriver(
altSync AltSync,
network Network,
log log.Logger,
snapshotLog log.Logger,
metrics Metrics,
sequencerStateListener SequencerStateListener,
safeHeadListener rollup.SafeHeadListener,
......@@ -174,11 +170,12 @@ func NewDriver(
rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := rollup.NewSynchronousEvents(log, driverCtx, rootDeriver)
statusTracker := status.NewStatusTracker(log, metrics)
l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, statusTracker.L1Head, l1)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
engineResetDeriver := engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg, synchronousEvents)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)
......@@ -217,7 +214,7 @@ func NewDriver(
schedDeriv := NewStepSchedulingDeriver(log, synchronousEvents)
driver := &Driver{
l1State: l1State,
statusTracker: statusTracker,
SyncDeriver: syncDeriver,
sched: schedDeriv,
synchronousEvents: synchronousEvents,
......@@ -231,7 +228,6 @@ func NewDriver(
driverCtx: driverCtx,
driverCancel: driverCancel,
log: log,
snapshotLog: snapshotLog,
sequencer: sequencer,
network: network,
metrics: metrics,
......@@ -254,6 +250,7 @@ func NewDriver(
pipelineDeriver,
attributesHandler,
finalizer,
statusTracker,
}
return driver
......
package driver
import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type L1Metrics interface {
RecordL1ReorgDepth(d uint64)
RecordL1Ref(name string, ref eth.L1BlockRef)
}
// L1State tracks L1 head, safe and finalized blocks. It is not safe to write and read concurrently.
type L1State struct {
log log.Logger
metrics L1Metrics
// Latest recorded head, safe block and finalized block of the L1 Chain, independent of derivation work
l1Head eth.L1BlockRef
l1Safe eth.L1BlockRef
l1Finalized eth.L1BlockRef
}
func NewL1State(log log.Logger, metrics L1Metrics) *L1State {
return &L1State{
log: log,
metrics: metrics,
}
}
func (s *L1State) HandleNewL1HeadBlock(head eth.L1BlockRef) {
// We don't need to do anything if the head hasn't changed.
if s.l1Head == (eth.L1BlockRef{}) {
s.log.Info("Received first L1 head signal", "l1_head", head)
} else if s.l1Head.Hash == head.Hash {
s.log.Trace("Received L1 head signal that is the same as the current head", "l1_head", head)
} else if s.l1Head.Hash == head.ParentHash {
// We got a new L1 block whose parent hash is the same as the current L1 head. Means we're
// dealing with a linear extension (new block is the immediate child of the old one).
s.log.Debug("L1 head moved forward", "l1_head", head)
} else {
if s.l1Head.Number >= head.Number {
s.metrics.RecordL1ReorgDepth(s.l1Head.Number - head.Number)
}
// New L1 block is not the same as the current head or a single step linear extension.
// This could either be a long L1 extension, or a reorg, or we simply missed a head update.
s.log.Warn("L1 head signal indicates a possible L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", head.ParentHash, "new_l1_head", head)
}
s.metrics.RecordL1Ref("l1_head", head)
s.l1Head = head
}
func (s *L1State) HandleNewL1SafeBlock(safe eth.L1BlockRef) {
s.log.Info("New L1 safe block", "l1_safe", safe)
s.metrics.RecordL1Ref("l1_safe", safe)
s.l1Safe = safe
}
func (s *L1State) HandleNewL1FinalizedBlock(finalized eth.L1BlockRef) {
s.log.Info("New L1 finalized block", "l1_finalized", finalized)
s.metrics.RecordL1Ref("l1_finalized", finalized)
s.l1Finalized = finalized
}
// L1Head returns either the stored L1 head or an empty block reference
// if the L1 Head has not been initialized yet.
func (s *L1State) L1Head() eth.L1BlockRef {
return s.l1Head
}
// L1Safe returns either the stored L1 safe block or an empty block reference
// if the L1 safe block has not been initialized yet.
func (s *L1State) L1Safe() eth.L1BlockRef {
return s.l1Safe
}
// L1Finalized returns either the stored L1 finalized block or an empty block reference
// if the L1 finalized block has not been initialized yet.
func (s *L1State) L1Finalized() eth.L1BlockRef {
return s.l1Finalized
}
......@@ -3,7 +3,6 @@ package driver
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
gosync "sync"
......@@ -19,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/status"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -35,7 +35,7 @@ type SyncStatus = eth.SyncStatus
const sealingDuration = time.Millisecond * 50
type Driver struct {
l1State L1StateIface
statusTracker SyncStatusTracker
*SyncDeriver
......@@ -97,7 +97,6 @@ type Driver struct {
metrics Metrics
log log.Logger
snapshotLog log.Logger
wg gosync.WaitGroup
......@@ -233,7 +232,7 @@ func (s *Driver) eventLoop() {
// This may adjust at any time based on fork-choice changes or previous errors.
// And avoid sequencing if the derivation pipeline indicates the engine is not ready.
if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped &&
s.l1State.L1Head() != (eth.L1BlockRef{}) && s.Derivation.DerivationReady() {
s.statusTracker.L1Head() != (eth.L1BlockRef{}) && s.Derivation.DerivationReady() {
if s.driverConfig.SequencerMaxSafeLag > 0 && s.Engine.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.Engine.UnsafeL2Head().Number {
// If the safe head has fallen behind by a significant number of blocks, delay creating new blocks
// until the safe lag is below SequencerMaxSafeLag.
......@@ -284,7 +283,6 @@ func (s *Driver) eventLoop() {
s.log.Warn("failed to check for unsafe L2 blocks to sync", "err", err)
}
case envelope := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload")
// If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync.
if s.SyncCfg.SyncMode == sync.CLSync || !s.Engine.IsEngineSyncing() {
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID())
......@@ -306,13 +304,12 @@ func (s *Driver) eventLoop() {
}
}
case newL1Head := <-s.l1HeadSig:
s.l1State.HandleNewL1HeadBlock(newL1Head)
s.Emitter.Emit(status.L1UnsafeEvent{L1Unsafe: newL1Head})
reqStep() // a new L1 head may mean we have the data to not get an EOF again.
case newL1Safe := <-s.l1SafeSig:
s.l1State.HandleNewL1SafeBlock(newL1Safe)
s.Emitter.Emit(status.L1SafeEvent{L1Safe: newL1Safe})
// no step, justified L1 information does not do anything for L2 derivation or status
case newL1Finalized := <-s.l1FinalizedSig:
s.l1State.HandleNewL1FinalizedBlock(newL1Finalized)
s.Emit(finality.FinalizeL1Event{FinalizedL1: newL1Finalized})
reqStep() // we may be able to mark more L2 data as finalized now
case <-s.sched.NextDelayedStep():
......@@ -640,34 +637,9 @@ func (s *Driver) OverrideLeader(ctx context.Context) error {
return s.sequencerConductor.OverrideLeader(ctx)
}
// syncStatus returns the current sync status, and should only be called synchronously with
// the driver event loop to avoid retrieval of an inconsistent status.
func (s *Driver) syncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.Derivation.Origin(),
CurrentL1Finalized: s.Finalizer.FinalizedL1(),
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
UnsafeL2: s.Engine.UnsafeL2Head(),
SafeL2: s.Engine.SafeL2Head(),
FinalizedL2: s.Engine.Finalized(),
PendingSafeL2: s.Engine.PendingSafeL2Head(),
}
}
// SyncStatus blocks the driver event loop and captures the syncing status.
// If the event loop is too busy and the context expires, a context error is returned.
func (s *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
wait := make(chan struct{})
select {
case s.stateReq <- wait:
resp := s.syncStatus()
<-wait
return resp, nil
case <-ctx.Done():
return nil, ctx.Err()
}
return s.statusTracker.SyncStatus(), nil
}
// BlockRefWithStatus blocks the driver event loop and captures the syncing status,
......@@ -677,7 +649,7 @@ func (s *Driver) BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2Bloc
wait := make(chan struct{})
select {
case s.stateReq <- wait:
resp := s.syncStatus()
resp := s.statusTracker.SyncStatus()
ref, err := s.L2.L2BlockRefByNumber(ctx, num)
<-wait
return ref, resp, err
......@@ -686,25 +658,6 @@ func (s *Driver) BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2Bloc
}
}
// deferJSONString helps avoid a JSON-encoding performance hit if the snapshot logger does not run
type deferJSONString struct {
x any
}
func (v deferJSONString) String() string {
out, _ := json.Marshal(v.x)
return string(out)
}
func (s *Driver) snapshot(event string) {
s.snapshotLog.Info("Rollup State Snapshot",
"event", event,
"l1Head", deferJSONString{s.l1State.L1Head()},
"l2Head", deferJSONString{s.Engine.UnsafeL2Head()},
"l2Safe", deferJSONString{s.Engine.SafeL2Head()},
"l2FinalizedHead", deferJSONString{s.Engine.Finalized()})
}
type hashAndError struct {
hash common.Hash
err error
......
......@@ -265,6 +265,8 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
return
}
d.ec.SetFinalizedHead(x.Ref)
// Try to apply the forkchoice changes
d.emitter.Emit(TryUpdateEngineEvent{})
}
}
......
package status
import (
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type L1UnsafeEvent struct {
L1Unsafe eth.L1BlockRef
}
func (ev L1UnsafeEvent) String() string {
return "l1-unsafe"
}
type L1SafeEvent struct {
L1Safe eth.L1BlockRef
}
func (ev L1SafeEvent) String() string {
return "l1-safe"
}
type Metrics interface {
RecordL1ReorgDepth(d uint64)
RecordL1Ref(name string, ref eth.L1BlockRef)
}
type StatusTracker struct {
data eth.SyncStatus
published atomic.Pointer[eth.SyncStatus]
log log.Logger
metrics Metrics
mu sync.RWMutex
}
func NewStatusTracker(log log.Logger, metrics Metrics) *StatusTracker {
st := &StatusTracker{
log: log,
metrics: metrics,
}
st.data = eth.SyncStatus{}
st.published.Store(&eth.SyncStatus{})
return st
}
func (st *StatusTracker) OnEvent(ev rollup.Event) {
st.mu.Lock()
defer st.mu.Unlock()
switch x := ev.(type) {
case engine.ForkchoiceUpdateEvent:
st.data.UnsafeL2 = x.UnsafeL2Head
st.data.SafeL2 = x.SafeL2Head
st.data.FinalizedL2 = x.FinalizedL2Head
case engine.PendingSafeUpdateEvent:
st.data.UnsafeL2 = x.Unsafe
st.data.PendingSafeL2 = x.PendingSafe
case derive.DeriverL1StatusEvent:
st.data.CurrentL1 = x.Origin
case L1UnsafeEvent:
st.metrics.RecordL1Ref("l1_head", x.L1Unsafe)
// We don't need to do anything if the head hasn't changed.
if st.data.HeadL1 == (eth.L1BlockRef{}) {
st.log.Info("Received first L1 head signal", "l1_head", x.L1Unsafe)
} else if st.data.HeadL1.Hash == x.L1Unsafe.Hash {
st.log.Trace("Received L1 head signal that is the same as the current head", "l1_head", x.L1Unsafe)
} else if st.data.HeadL1.Hash == x.L1Unsafe.ParentHash {
// We got a new L1 block whose parent hash is the same as the current L1 head. Means we're
// dealing with a linear extension (new block is the immediate child of the old one).
st.log.Debug("L1 head moved forward", "l1_head", x.L1Unsafe)
} else {
if st.data.HeadL1.Number >= x.L1Unsafe.Number {
st.metrics.RecordL1ReorgDepth(st.data.HeadL1.Number - x.L1Unsafe.Number)
}
// New L1 block is not the same as the current head or a single step linear extension.
// This could either be a long L1 extension, or a reorg, or we simply missed a head update.
st.log.Warn("L1 head signal indicates a possible L1 re-org",
"old_l1_head", st.data.HeadL1, "new_l1_head_parent", x.L1Unsafe.ParentHash, "new_l1_head", x.L1Unsafe)
}
st.data.HeadL1 = x.L1Unsafe
case L1SafeEvent:
st.log.Info("New L1 safe block", "l1_safe", x.L1Safe)
st.metrics.RecordL1Ref("l1_safe", x.L1Safe)
st.data.SafeL1 = x.L1Safe
case finality.FinalizeL1Event:
st.log.Info("New L1 finalized block", "l1_finalized", x.FinalizedL1)
st.metrics.RecordL1Ref("l1_finalized", x.FinalizedL1)
st.data.FinalizedL1 = x.FinalizedL1
st.data.CurrentL1Finalized = x.FinalizedL1
case rollup.ResetEvent:
st.data.UnsafeL2 = eth.L2BlockRef{}
st.data.SafeL2 = eth.L2BlockRef{}
st.data.CurrentL1 = eth.L1BlockRef{}
case engine.EngineResetConfirmedEvent:
st.data.UnsafeL2 = x.Unsafe
st.data.SafeL2 = x.Safe
st.data.FinalizedL2 = x.Finalized
default: // other events do not affect the sync status
return
}
// If anything changes, then copy the state to the published SyncStatus
// @dev: If this becomes a performance bottleneck during sync (because mem copies onto heap, and 1KB comparisons),
// we can rate-limit updates of the published data.
published := *st.published.Load()
if st.data != published {
published = st.data
st.published.Store(&published)
}
}
// SyncStatus is thread safe, and reads the latest view of L1 and L2 block labels
func (st *StatusTracker) SyncStatus() *eth.SyncStatus {
return st.published.Load()
}
// L1Head is a helper function; the L1 head is closely monitored for confirmation-distance logic.
func (st *StatusTracker) L1Head() eth.L1BlockRef {
return st.SyncStatus().HeadL1
}
......@@ -259,20 +259,6 @@ func applyOverrides(ctx *cli.Context, rollupConfig *rollup.Config) {
}
}
func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) {
snapshotFile := ctx.String(flags.SnapshotLog.Name)
if snapshotFile == "" {
return log.NewLogger(log.DiscardHandler()), nil
}
sf, err := os.OpenFile(snapshotFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
handler := log.JSONHandler(sf)
return log.NewLogger(handler), nil
}
func NewSyncConfig(ctx *cli.Context, log log.Logger) (*sync.Config, error) {
if ctx.IsSet(flags.L2EngineSyncEnabled.Name) && ctx.IsSet(flags.SyncModeFlag.Name) {
return nil, errors.New("cannot set both --l2.engine-sync and --syncmode at the same time")
......
......@@ -3,17 +3,14 @@ package eth
// SyncStatus is a snapshot of the driver.
// Values may be zeroed if not yet initialized.
type SyncStatus struct {
// CurrentL1 is the L1 block that the derivation process is currently at in the inner-most stage.
// CurrentL1 is the L1 block that the derivation process is last idled at.
// This may not be fully derived into L2 data yet.
// The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block.
// If the node is synced, this matches the HeadL1, minus the verifier confirmation distance.
CurrentL1 L1BlockRef `json:"current_l1"`
// CurrentL1Finalized is the L1 block that the derivation process is currently accepting as finalized
// in the inner-most stage,
// This may not be fully derived into L2 data yet.
// The finalized L2 blocks were produced/included fully from the L1 chain up to and including this L1 block.
// This may lag behind the FinalizedL1 when the FinalizedL1 could not yet be verified
// to be canonical w.r.t. the currently derived L2 chain. It may be zeroed if no block could be verified yet.
// CurrentL1Finalized is a legacy sync-status attribute. This is deprecated.
// A previous version of the L1 finalization-signal was updated only after the block was retrieved by number.
// This attribute just matches FinalizedL1 now.
CurrentL1Finalized L1BlockRef `json:"current_l1_finalized"`
// HeadL1 is the perceived head of the L1 chain, no confirmation distance.
// The head is not guaranteed to build on the other L1 sync status fields,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment