Commit e81c5c93 authored by protolambda's avatar protolambda Committed by GitHub

op-node: split driver state/model into L1 state, L1 origin selector, L2...

op-node: split driver state/model into L1 state, L1 origin selector, L2 sequencer, L2 derivation (#3647)

* op-node: split driver state/model into l1 state, l1 origin selector, l2 sequencer, l2 derivation

* op-node: implement driver refactor review feedback
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent d47369f8
...@@ -8,15 +8,16 @@ import ( ...@@ -8,15 +8,16 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
) )
type OpNode struct { type OpNode struct {
...@@ -220,11 +221,8 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error { ...@@ -220,11 +221,8 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) Start(ctx context.Context) error { func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Starting execution engine driver") n.log.Info("Starting execution engine driver")
// Request initial head update, default to genesis otherwise
reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10)
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine // start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := n.l2Driver.Start(reqCtx) err := n.l2Driver.Start()
reqCancel()
if err != nil { if err != nil {
n.log.Error("Could not start a rollup node", "err", err) n.log.Error("Could not start a rollup node", "err", err)
return err return err
......
...@@ -3,18 +3,14 @@ package driver ...@@ -3,18 +3,14 @@ package driver
import ( import (
"context" "context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
type Driver struct {
s *state
}
type Metrics interface { type Metrics interface {
RecordPipelineReset() RecordPipelineReset()
RecordSequencingError() RecordSequencingError()
...@@ -34,11 +30,6 @@ type Metrics interface { ...@@ -34,11 +30,6 @@ type Metrics interface {
CountSequencedTxs(count int) CountSequencedTxs(count int)
} }
type Downloader interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, eth.ReceiptsFetcher, error)
}
type L1Chain interface { type L1Chain interface {
derive.L1Fetcher derive.L1Fetcher
L1BlockRefByLabel(context.Context, eth.BlockLabel) (eth.L1BlockRef, error) L1BlockRefByLabel(context.Context, eth.BlockLabel) (eth.L1BlockRef, error)
...@@ -62,58 +53,61 @@ type DerivationPipeline interface { ...@@ -62,58 +53,61 @@ type DerivationPipeline interface {
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
} }
type outputInterface interface { type L1StateIface interface {
// createNewBlock builds a new block based on the L2 Head, L1 Origin, and the current mempool. HandleNewL1HeadBlock(head eth.L1BlockRef)
createNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error) HandleNewL1SafeBlock(safe eth.L1BlockRef)
} HandleNewL1FinalizedBlock(finalized eth.L1BlockRef)
type Network interface {
// PublishL2Payload is called by the driver whenever there is a new payload to publish, synchronously with the driver main loop.
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
}
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
output := &outputImpl{
Config: cfg,
dl: l1,
l2: l2,
log: log,
}
var state *state
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, func() eth.L1BlockRef { return state.l1Head }, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics)
state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network, metrics)
return &Driver{s: state}
}
func (d *Driver) OnL1Head(ctx context.Context, head eth.L1BlockRef) error { L1Head() eth.L1BlockRef
return d.s.OnL1Head(ctx, head) L1Safe() eth.L1BlockRef
L1Finalized() eth.L1BlockRef
} }
func (d *Driver) OnL1Safe(ctx context.Context, safe eth.L1BlockRef) error { type L1OriginSelectorIface interface {
return d.s.OnL1Safe(ctx, safe) FindL1Origin(ctx context.Context, l1Head eth.L1BlockRef, l2Head eth.L2BlockRef) (eth.L1BlockRef, error)
} }
func (d *Driver) OnL1Finalized(ctx context.Context, finalized eth.L1BlockRef) error { type SequencerIface interface {
return d.s.OnL1Finalized(ctx, finalized) StartBuildingBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) error
} CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayload, error)
func (d *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error { // createNewBlock builds a new block based on the L2 Head, L1 Origin, and the current mempool.
return d.s.OnUnsafeL2Payload(ctx, payload) CreateNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error)
} }
func (d *Driver) ResetDerivationPipeline(ctx context.Context) error { type Network interface {
return d.s.ResetDerivationPipeline(ctx) // PublishL2Payload is called by the driver whenever there is a new payload to publish, synchronously with the driver main loop.
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
} }
func (d *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
return d.s.SyncStatus(ctx) func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
} sequencer := NewSequencer(log, cfg, l1, l2)
l1State := NewL1State(log, metrics)
findL1Origin := NewL1OriginSelector(log, cfg, l1, driverCfg.SequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics)
func (d *Driver) Start(ctx context.Context) error { return &Driver{
return d.s.Start(ctx) l1State: l1State,
} derivation: derivationPipeline,
func (d *Driver) Close() error { idleDerivation: false,
return d.s.Close() syncStatusReq: make(chan chan eth.SyncStatus, 10),
forceReset: make(chan chan struct{}, 10),
config: cfg,
driverConfig: driverCfg,
done: make(chan struct{}),
log: log,
snapshotLog: snapshotLog,
l1: l1,
l2: l2,
l1OriginSelector: findL1Origin,
sequencer: sequencer,
network: network,
metrics: metrics,
l1HeadSig: make(chan eth.L1BlockRef, 10),
l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
}
} }
package driver
import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth"
)
type L1Metrics interface {
RecordL1ReorgDepth(d uint64)
RecordL1Ref(name string, ref eth.L1BlockRef)
}
// L1State tracks L1 head, safe and finalized blocks. It is not safe to write and read concurrently.
type L1State struct {
log log.Logger
metrics L1Metrics
// Latest recorded head, safe block and finalized block of the L1 Chain, independent of derivation work
l1Head eth.L1BlockRef
l1Safe eth.L1BlockRef
l1Finalized eth.L1BlockRef
}
func NewL1State(log log.Logger, metrics L1Metrics) *L1State {
return &L1State{
log: log,
metrics: metrics,
}
}
func (s *L1State) HandleNewL1HeadBlock(head eth.L1BlockRef) {
// We don't need to do anything if the head hasn't changed.
if s.l1Head == (eth.L1BlockRef{}) {
s.log.Info("Received first L1 head signal", "l1_head", head)
} else if s.l1Head.Hash == head.Hash {
s.log.Trace("Received L1 head signal that is the same as the current head", "l1_head", head)
} else if s.l1Head.Hash == head.ParentHash {
// We got a new L1 block whose parent hash is the same as the current L1 head. Means we're
// dealing with a linear extension (new block is the immediate child of the old one).
s.log.Debug("L1 head moved forward", "l1_head", head)
} else {
if s.l1Head.Number >= head.Number {
s.metrics.RecordL1ReorgDepth(s.l1Head.Number - head.Number)
}
// New L1 block is not the same as the current head or a single step linear extension.
// This could either be a long L1 extension, or a reorg, or we simply missed a head update.
s.log.Warn("L1 head signal indicates a possible L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", head.ParentHash, "new_l1_head", head)
}
s.metrics.RecordL1Ref("l1_head", head)
s.l1Head = head
}
func (s *L1State) HandleNewL1SafeBlock(safe eth.L1BlockRef) {
s.log.Info("New L1 safe block", "l1_safe", safe)
s.metrics.RecordL1Ref("l1_safe", safe)
s.l1Safe = safe
}
func (s *L1State) HandleNewL1FinalizedBlock(finalized eth.L1BlockRef) {
s.log.Info("New L1 finalized block", "l1_finalized", finalized)
s.metrics.RecordL1Ref("l1_finalized", finalized)
s.l1Finalized = finalized
}
func (s *L1State) L1Head() eth.L1BlockRef {
return s.l1Head
}
func (s *L1State) L1Safe() eth.L1BlockRef {
return s.l1Safe
}
func (s *L1State) L1Finalized() eth.L1BlockRef {
return s.l1Finalized
}
package driver
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type L1Blocks interface {
derive.L1BlockRefByHashFetcher
derive.L1BlockRefByNumberFetcher
}
type L1OriginSelector struct {
log log.Logger
cfg *rollup.Config
l1 L1Blocks
sequencingConfDepth uint64
}
func NewL1OriginSelector(log log.Logger, cfg *rollup.Config, l1 L1Blocks, sequencingConfDepth uint64) *L1OriginSelector {
return &L1OriginSelector{
log: log,
cfg: cfg,
l1: l1,
sequencingConfDepth: sequencingConfDepth,
}
}
// FindL1Origin determines what the next L1 Origin should be.
// The L1 Origin is either the L2 Head's Origin, or the following L1 block
// if the next L2 block's time is greater than or equal to the L2 Head's Origin.
func (los *L1OriginSelector) FindL1Origin(ctx context.Context, l1Head eth.L1BlockRef, l2Head eth.L2BlockRef) (eth.L1BlockRef, error) {
// If we are at the head block, don't do a lookup.
if l2Head.L1Origin.Hash == l1Head.Hash {
return l1Head, nil
}
// Grab a reference to the current L1 origin block.
currentOrigin, err := los.l1.L1BlockRefByHash(ctx, l2Head.L1Origin.Hash)
if err != nil {
return eth.L1BlockRef{}, err
}
if currentOrigin.Number+1+los.sequencingConfDepth > l1Head.Number {
// TODO: we can decide to ignore confirmation depth if we would be forced
// to make an empty block (only deposits) by staying on the current origin.
log.Info("sequencing with old origin to preserve conf depth",
"current", currentOrigin, "current_time", currentOrigin.Time,
"l1_head", l1Head, "l1_head_time", l1Head.Time,
"l2_head", l2Head, "l2_head_time", l2Head.Time,
"depth", los.sequencingConfDepth)
return currentOrigin, nil
}
// Attempt to find the next L1 origin block, where the next origin is the immediate child of
// the current origin block.
nextOrigin, err := los.l1.L1BlockRefByNumber(ctx, currentOrigin.Number+1)
if err != nil {
log.Error("Failed to get next origin. Falling back to current origin", "err", err)
return currentOrigin, nil
}
// If the next L2 block time is greater than the next origin block's time, we can choose to
// start building on top of the next origin. Sequencer implementation has some leeway here and
// could decide to continue to build on top of the previous origin until the Sequencer runs out
// of slack. For simplicity, we implement our Sequencer to always start building on the latest
// L1 block when we can.
if l2Head.Time+los.cfg.BlockTime >= nextOrigin.Time {
return nextOrigin, nil
}
return currentOrigin, nil
}
package driver
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type Downloader interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, eth.ReceiptsFetcher, error)
}
// Sequencer implements the sequencing interface of the driver: it starts and completes block building jobs.
type Sequencer struct {
log log.Logger
config *rollup.Config
l1 Downloader
l2 derive.Engine
buildingOnto eth.ForkchoiceState
buildingID eth.PayloadID
}
func NewSequencer(log log.Logger, cfg *rollup.Config, l1 Downloader, l2 derive.Engine) *Sequencer {
return &Sequencer{
log: log,
config: cfg,
l1: l1,
l2: l2,
}
}
// StartBuildingBlock initiates a block building job on top of the given L2 head, safe and finalized blocks, and using the provided l1Origin.
func (d *Sequencer) StartBuildingBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) error {
d.log.Info("creating new block", "parent", l2Head, "l1Origin", l1Origin)
if d.buildingID != (eth.PayloadID{}) { // This may happen when we decide to build a different block in response to a reorg. Or when previous block building failed.
d.log.Warn("did not finish previous block building, starting new building now", "prev_onto", d.buildingOnto.HeadBlockHash, "prev_payload_id", d.buildingID, "new_onto", l2Head)
}
fetchCtx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
attrs, err := derive.PreparePayloadAttributes(fetchCtx, d.config, d.l1, l2Head, l2Head.Time+d.config.BlockTime, l1Origin.ID())
if err != nil {
return err
}
// If our next L2 block timestamp is beyond the Sequencer drift threshold, then we must produce
// empty blocks (other than the L1 info deposit and any user deposits). We handle this by
// setting NoTxPool to true, which will cause the Sequencer to not include any transactions
// from the transaction pool.
attrs.NoTxPool = uint64(attrs.Timestamp) >= l1Origin.Time+d.config.MaxSequencerDrift
// And construct our fork choice state. This is our current fork choice state and will be
// updated as a result of executing the block based on the attributes described above.
fc := eth.ForkchoiceState{
HeadBlockHash: l2Head.Hash,
SafeBlockHash: l2SafeHead.Hash,
FinalizedBlockHash: l2Finalized.Hash,
}
// Start a payload building process.
id, errTyp, err := derive.StartPayload(ctx, d.l2, fc, attrs)
if err != nil {
return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err)
}
d.buildingOnto = fc
d.buildingID = id
return nil
}
// CompleteBuildingBlock takes the current block that is being built, and asks the engine to complete the building, seal the block, and persist it as canonical.
// Warning: the safe and finalized L2 blocks as viewed during the initiation of the block building are reused for completion of the block building.
// The Execution engine should not change the safe and finalized blocks between start and completion of block building.
func (d *Sequencer) CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayload, error) {
if d.buildingID == (eth.PayloadID{}) {
return nil, fmt.Errorf("cannot complete payload building: not currently building a payload")
}
// Actually execute the block and add it to the head of the chain.
payload, errTyp, err := derive.ConfirmPayload(ctx, d.log, d.l2, d.buildingOnto, d.buildingID, false)
if err != nil {
return nil, fmt.Errorf("failed to complete building on top of L2 chain %s, error (%d): %w", d.buildingOnto.HeadBlockHash, errTyp, err)
}
return payload, nil
}
// CreateNewBlock sequences a L2 block with immediate building and sealing.
func (d *Sequencer) CreateNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error) {
if err := d.StartBuildingBlock(ctx, l2Head, l2SafeHead, l2Finalized, l1Origin); err != nil {
return l2Head, nil, err
}
payload, err := d.CompleteBuildingBlock(ctx)
if err != nil {
return l2Head, nil, err
}
d.buildingID = eth.PayloadID{}
// Generate an L2 block ref from the payload.
ref, err := derive.PayloadToBlockRef(payload, &d.config.Genesis)
return ref, payload, err
}
This diff is collapsed.
package driver
import (
"context"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/log"
)
type outputImpl struct {
dl Downloader
l2 derive.Engine
log log.Logger
Config *rollup.Config
}
func (d *outputImpl) createNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error) {
d.log.Info("creating new block", "parent", l2Head, "l1Origin", l1Origin)
fetchCtx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
attrs, err := derive.PreparePayloadAttributes(fetchCtx, d.Config, d.dl, l2Head, l2Head.Time+d.Config.BlockTime, l1Origin.ID())
if err != nil {
return l2Head, nil, err
}
// If our next L2 block timestamp is beyond the Sequencer drift threshold, then we must produce
// empty blocks (other than the L1 info deposit and any user deposits). We handle this by
// setting NoTxPool to true, which will cause the Sequencer to not include any transactions
// from the transaction pool.
attrs.NoTxPool = uint64(attrs.Timestamp) >= l1Origin.Time+d.Config.MaxSequencerDrift
// And construct our fork choice state. This is our current fork choice state and will be
// updated as a result of executing the block based on the attributes described above.
fc := eth.ForkchoiceState{
HeadBlockHash: l2Head.Hash,
SafeBlockHash: l2SafeHead.Hash,
FinalizedBlockHash: l2Finalized.Hash,
}
// Actually execute the block and add it to the head of the chain.
payload, errType, err := derive.InsertHeadBlock(ctx, d.log, d.l2, fc, attrs, false)
if err != nil {
return l2Head, nil, fmt.Errorf("failed to extend L2 chain, error (%d): %w", errType, err)
}
// Generate an L2 block ref from the payload.
ref, err := derive.PayloadToBlockRef(payload, &d.Config.Genesis)
return ref, payload, err
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment