Commit 9eea16e3 authored by protolambda's avatar protolambda Committed by GitHub

op-node: poll for L1 safe and finalized data changes, deduplicate L2 head tracking (#3310)

parent 02303023
......@@ -284,7 +284,11 @@ mainLoop:
l.log.Warn("issue fetching L2 head", "err", err)
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock)
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
l.log.Info("Rollup node has no L1 head info yet")
continue
}
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock, "l1_head", syncStatus.HeadL1)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
continue
......
......@@ -8,9 +8,8 @@ import (
"math/big"
"time"
"github.com/ethereum/go-ethereum"
rollupEth "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
......@@ -20,6 +19,7 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/node"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
......@@ -102,7 +102,60 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi
HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
}
return createGethNode(false, nodeConfig, ethConfig, []*ecdsa.PrivateKey{pk})
l1Node, l1Eth, err := createGethNode(false, nodeConfig, ethConfig, []*ecdsa.PrivateKey{pk})
if err != nil {
return nil, nil, err
}
// Clique does not have safe/finalized block info. But we do want to test the usage of that,
// since post-merge L1 has it (incl. Goerli testnet which is already upgraded). So we mock it on top of clique.
l1Node.RegisterLifecycle(&fakeSafeFinalizedL1{
eth: l1Eth,
// for testing purposes we make it really fast, otherwise we don't see it finalize in short tests
finalizedDistance: 8,
safeDistance: 4,
})
return l1Node, l1Eth, nil
}
type fakeSafeFinalizedL1 struct {
eth *eth.Ethereum
finalizedDistance uint64
safeDistance uint64
sub ethereum.Subscription
}
var _ node.Lifecycle = (*fakeSafeFinalizedL1)(nil)
func (f *fakeSafeFinalizedL1) Start() error {
headChanges := make(chan core.ChainHeadEvent, 10)
headsSub := f.eth.BlockChain().SubscribeChainHeadEvent(headChanges)
f.sub = event.NewSubscription(func(quit <-chan struct{}) error {
defer headsSub.Unsubscribe()
for {
select {
case head := <-headChanges:
num := head.Block.NumberU64()
if num > f.finalizedDistance {
toFinalize := f.eth.BlockChain().GetBlockByNumber(num - f.finalizedDistance)
f.eth.BlockChain().SetFinalized(toFinalize)
}
if num > f.safeDistance {
toSafe := f.eth.BlockChain().GetBlockByNumber(num - f.safeDistance)
f.eth.BlockChain().SetSafe(toSafe)
}
case <-quit:
return nil
}
}
})
return nil
}
func (f *fakeSafeFinalizedL1) Stop() error {
f.sub.Unsubscribe()
return nil
}
// init a geth node.
......
......@@ -110,6 +110,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
"sequencer": {
Driver: driver.Config{
......@@ -123,6 +124,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
ListenPort: 9093,
EnableAdmin: true,
},
L1EpochPollInterval: time.Second * 4,
},
},
Loggers: map[string]log.Logger{
......
......@@ -2,10 +2,12 @@ package eth
import (
"context"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
// HeadSignalFn is used as callback function to accept head-signals
......@@ -43,3 +45,40 @@ func WatchHeadChanges(ctx context.Context, src NewHeadSource, fn HeadSignalFn) (
}
}), nil
}
type L1BlockRefsSource interface {
L1BlockRefByLabel(ctx context.Context, label BlockLabel) (L1BlockRef, error)
}
// PollBlockChanges opens a polling loop to fetch the L1 block reference with the given label,
// on provided interval and with request timeout. Results are returned with provided callback fn,
// which may block to pause/back-pressure polling.
func PollBlockChanges(ctx context.Context, log log.Logger, src L1BlockRefsSource, fn HeadSignalFn,
label BlockLabel, interval time.Duration, timeout time.Duration) ethereum.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
if interval <= 0 {
log.Warn("polling of block is disabled", "interval", interval, "label", label)
<-quit
return nil
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
reqCtx, reqCancel := context.WithTimeout(ctx, timeout)
ref, err := src.L1BlockRefByLabel(reqCtx, label)
reqCancel()
if err != nil {
log.Warn("failed to poll L1 block", "label", label, "err", err)
} else {
fn(ctx, ref)
}
case <-ctx.Done():
return ctx.Err()
case <-quit:
return nil
}
}
})
}
......@@ -2,6 +2,7 @@ package flags
import (
"fmt"
"time"
"github.com/urfave/cli"
)
......@@ -81,6 +82,13 @@ var (
Required: false,
Value: 4,
}
L1EpochPollIntervalFlag = cli.DurationFlag{
Name: "l1.epoch-poll-interval",
Usage: "Poll interval for retrieving new L1 epoch updates such as safe and finalized block changes. Disabled if 0 or negative.",
EnvVar: prefixEnvVar("L1_EPOCH_POLL_INTERVAL"),
Required: false,
Value: time.Second * 12 * 32,
}
LogLevelFlag = cli.StringFlag{
Name: "log.level",
Usage: "The lowest log level that will be output",
......@@ -154,6 +162,7 @@ var optionalFlags = append([]cli.Flag{
VerifierL1Confs,
SequencerEnabledFlag,
SequencerL1Confs,
L1EpochPollIntervalFlag,
LogLevelFlag,
LogFormatFlag,
LogColorFlag,
......
......@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
......@@ -30,6 +31,9 @@ type Config struct {
Pprof PprofConfig
// Used to poll the L1 for new finalized or safe blocks
L1EpochPollInterval time.Duration
// Optional
Tracer Tracer
}
......
......@@ -25,14 +25,18 @@ type OpNode struct {
log log.Logger
appVersion string
metrics *metrics.Metrics
l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging
l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging
// some resources cannot be stopped directly, like the p2p gossipsub router (not our design),
// and depend on this ctx to be closed.
......@@ -129,6 +133,13 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
}
n.log.Error("l1 heads subscription error", "err", err)
}()
// Poll for the safe L1 block and finalized block,
// which only change once per epoch at most and may be delayed.
n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Safe, eth.Safe,
cfg.L1EpochPollInterval, time.Second*10)
n.l1FinalizedSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized,
cfg.L1EpochPollInterval, time.Second*10)
return nil
}
......@@ -233,7 +244,24 @@ func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
if err := n.l2Driver.OnL1Head(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
}
}
func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) {
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := n.l2Driver.OnL1Safe(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 safe block change", "err", err)
}
}
func (n *OpNode) OnNewL1Finalized(ctx context.Context, sig eth.L1BlockRef) {
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := n.l2Driver.OnL1Finalized(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 finalized block change", "err", err)
}
}
func (n *OpNode) PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error {
......
......@@ -47,13 +47,15 @@ type EngineQueue struct {
unsafePayloads []*eth.ExecutionPayload
engine Engine
metrics Metrics
}
var _ AttributesQueueOutput = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine) *EngineQueue {
return &EngineQueue{log: log, cfg: cfg, engine: engine}
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
return &EngineQueue{log: log, cfg: cfg, engine: engine, metrics: metrics}
}
func (eq *EngineQueue) Progress() Progress {
......@@ -62,6 +64,7 @@ func (eq *EngineQueue) Progress() Progress {
func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
eq.unsafeHead = head
eq.metrics.RecordL2Ref("l2_unsafe", head)
}
func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
......@@ -130,6 +133,17 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
// return nil
//}
func (eq *EngineQueue) logSyncProgress(reason string) {
eq.log.Info("Sync progress",
"reason", reason,
"l2_finalized", eq.finalized,
"l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time,
"l1_derived", eq.progress.Origin,
)
}
func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first := eq.unsafePayloads[0]
......@@ -192,7 +206,9 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
}
eq.unsafeHead = ref
eq.unsafePayloads = eq.unsafePayloads[1:]
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("unsafe payload from sequencer")
return nil
}
......@@ -206,6 +222,7 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error {
// For some reason the unsafe head is behind the safe head. Log it, and correct it.
eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead)
eq.unsafeHead = eq.safeHead
eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead)
return nil
}
}
......@@ -233,7 +250,7 @@ func (eq *EngineQueue) consolidateNextSafeAttributes(ctx context.Context) error
eq.safeHead = ref
// unsafe head stays the same, we did not reorg the chain.
eq.safeAttributes = eq.safeAttributes[1:]
eq.log.Trace("Reconciled safe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("reconciled with L1")
return nil
}
......@@ -283,8 +300,10 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
}
eq.safeHead = ref
eq.unsafeHead = ref
eq.metrics.RecordL2Ref("l2_safe", ref)
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.safeAttributes = eq.safeAttributes[1:]
eq.log.Trace("Inserted safe block", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("processed safe block derived from L1")
return nil
}
......@@ -316,5 +335,9 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
Origin: l1Origin,
Closed: false,
}
eq.metrics.RecordL2Ref("l2_finalized", eq.finalized) // todo(proto): finalized L2 block updates
eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
eq.logSyncProgress("reset derivation work")
return io.EOF
}
......@@ -10,6 +10,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type Metrics interface {
RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
}
type L1Fetcher interface {
L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error)
L1BlockRefByNumberFetcher
......@@ -71,11 +76,13 @@ type DerivationPipeline struct {
stages []Stage
eng EngineQueueStage
metrics Metrics
}
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine) *DerivationPipeline {
eng := NewEngineQueue(log, cfg, engine)
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline {
eng := NewEngineQueue(log, cfg, engine, metrics)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng)
batchQueue := NewBatchQueue(log, cfg, attributesQueue)
chInReader := NewChannelInReader(log, batchQueue)
......@@ -93,6 +100,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
active: 0,
stages: stages,
eng: eng,
metrics: metrics,
}
}
......@@ -137,6 +145,8 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
// An error is expected when the underlying source closes.
// When Step returns nil, it should be called again, to continue the derivation process.
func (dp *DerivationPipeline) Step(ctx context.Context) error {
defer dp.metrics.RecordL1Ref("l1_derived", dp.Progress().Origin)
// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF {
......
......@@ -27,7 +27,7 @@ func NewConfDepth(depth uint64, l1Head func() eth.L1BlockRef, fetcher derive.L1F
// Any block numbers that are within confirmation depth of the L1 head are mocked to be "not found",
// effectively hiding the uncertain part of the L1 chain.
func (c *confDepth) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) {
// TODO: performance optimization: buffer the l1Head, invalidate any reorged previous buffer content,
// TODO: performance optimization: buffer the l1Unsafe, invalidate any reorged previous buffer content,
// and instantly return the origin by number from the buffer if we can.
if num == 0 || c.depth == 0 || num+c.depth <= c.l1Head().Number {
......
......@@ -79,7 +79,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne
var state *state
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, func() eth.L1BlockRef { return state.l1Head }, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics)
state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network, metrics)
return &Driver{s: state}
}
......@@ -88,6 +88,14 @@ func (d *Driver) OnL1Head(ctx context.Context, head eth.L1BlockRef) error {
return d.s.OnL1Head(ctx, head)
}
func (d *Driver) OnL1Safe(ctx context.Context, safe eth.L1BlockRef) error {
return d.s.OnL1Safe(ctx, safe)
}
func (d *Driver) OnL1Finalized(ctx context.Context, finalized eth.L1BlockRef) error {
return d.s.OnL1Finalized(ctx, finalized)
}
func (d *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error {
return d.s.OnUnsafeL2Payload(ctx, payload)
}
......
This diff is collapsed.
......@@ -78,8 +78,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ListenAddr: ctx.GlobalString(flags.PprofAddrFlag.Name),
ListenPort: ctx.GlobalString(flags.PprofPortFlag.Name),
},
P2P: p2pConfig,
P2PSigner: p2pSignerSetup,
P2P: p2pConfig,
P2PSigner: p2pSignerSetup,
L1EpochPollInterval: ctx.GlobalDuration(flags.L1EpochPollIntervalFlag.Name),
}
if err := cfg.Check(); err != nil {
return nil, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment