Commit 24de4fb3 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

op-node: Improve sequencer scheduler, improve timeliness of blocks (#4236)

* op-node: Reduce wait time

* Update op-node/rollup/driver/sequencer.go
Co-authored-by: default avatarprotolambda <proto@protolambda.com>

* op-node: fix remaining time

* op-node: handle when scheduling is bad

* op-node: improve sequencing scheduling to maximize building time, timeliness and tx inclusion

* op-node: estimate desired delay and sealing mode

* op-node: only re-schedule next sequencing action if head changes
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent f2e88d9b
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
...@@ -27,7 +28,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c ...@@ -27,7 +28,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c
ver := NewL2Verifier(t, log, l1, eng, cfg) ver := NewL2Verifier(t, log, l1, eng, cfg)
return &L2Sequencer{ return &L2Sequencer{
L2Verifier: *ver, L2Verifier: *ver,
sequencer: driver.NewSequencer(log, cfg, l1, eng), sequencer: driver.NewSequencer(log, cfg, l1, eng, ver.derivation, metrics.NoopMetrics),
l1OriginSelector: driver.NewL1OriginSelector(log, cfg, l1, seqConfDepth), l1OriginSelector: driver.NewL1OriginSelector(log, cfg, l1, seqConfDepth),
seqOldOrigin: false, seqOldOrigin: false,
failL2GossipUnsafeBlock: nil, failL2GossipUnsafeBlock: nil,
...@@ -60,7 +61,7 @@ func (s *L2Sequencer) ActL2StartBlock(t Testing) { ...@@ -60,7 +61,7 @@ func (s *L2Sequencer) ActL2StartBlock(t Testing) {
origin = l1Origin origin = l1Origin
} }
err := s.sequencer.StartBuildingBlock(t.Ctx(), parent, s.derivation.SafeL2Head().ID(), s.derivation.Finalized().ID(), origin) err := s.sequencer.StartBuildingBlock(t.Ctx(), origin)
require.NoError(t, err, "failed to start block building") require.NoError(t, err, "failed to start block building")
s.l2Building = true s.l2Building = true
......
...@@ -57,6 +57,8 @@ type Metricer interface { ...@@ -57,6 +57,8 @@ type Metricer interface {
IncStreamCount() IncStreamCount()
DecStreamCount() DecStreamCount()
RecordBandwidth(ctx context.Context, bwc *libp2pmetrics.BandwidthCounter) RecordBandwidth(ctx context.Context, bwc *libp2pmetrics.BandwidthCounter)
RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration)
} }
type Metrics struct { type Metrics struct {
...@@ -80,6 +82,12 @@ type Metrics struct { ...@@ -80,6 +82,12 @@ type Metrics struct {
SequencingErrors *EventMetrics SequencingErrors *EventMetrics
PublishingErrors *EventMetrics PublishingErrors *EventMetrics
SequencerBuildingDiffDurationSeconds prometheus.Histogram
SequencerBuildingDiffTotal prometheus.Counter
SequencerSealingDurationSeconds prometheus.Histogram
SequencerSealingTotal prometheus.Counter
UnsafePayloadsBufferLen prometheus.Gauge UnsafePayloadsBufferLen prometheus.Gauge
UnsafePayloadsBufferMemSize prometheus.Gauge UnsafePayloadsBufferMemSize prometheus.Gauge
...@@ -105,6 +113,8 @@ type Metrics struct { ...@@ -105,6 +113,8 @@ type Metrics struct {
registry *prometheus.Registry registry *prometheus.Registry
} }
var _ Metricer = (*Metrics)(nil)
func NewMetrics(procName string) *Metrics { func NewMetrics(procName string) *Metrics {
if procName == "" { if procName == "" {
procName = "default" procName = "default"
...@@ -281,6 +291,31 @@ func NewMetrics(procName string) *Metrics { ...@@ -281,6 +291,31 @@ func NewMetrics(procName string) *Metrics {
"direction", "direction",
}), }),
SequencerBuildingDiffDurationSeconds: promauto.With(registry).NewHistogram(prometheus.HistogramOpts{
Namespace: ns,
Name: "sequencer_building_diff_seconds",
Buckets: []float64{
-10, -5, -2.5, -1, -.5, -.25, -.1, -0.05, -0.025, -0.01, -0.005,
.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of Sequencer building time, minus block time",
}),
SequencerBuildingDiffTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "sequencer_building_diff_total",
Help: "Number of sequencer block building jobs",
}),
SequencerSealingDurationSeconds: promauto.With(registry).NewHistogram(prometheus.HistogramOpts{
Namespace: ns,
Name: "sequencer_sealing_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of Sequencer block sealing time",
}),
SequencerSealingTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "sequencer_sealing_total",
Help: "Number of sequencer block sealing jobs",
}),
registry: registry, registry: registry,
} }
} }
...@@ -448,6 +483,21 @@ func (m *Metrics) RecordBandwidth(ctx context.Context, bwc *libp2pmetrics.Bandwi ...@@ -448,6 +483,21 @@ func (m *Metrics) RecordBandwidth(ctx context.Context, bwc *libp2pmetrics.Bandwi
} }
} }
// RecordSequencerBuildingDiffTime tracks the amount of time the sequencer was allowed between
// start to finish, incl. sealing, minus the block time.
// Ideally this is 0, realistically the sequencer scheduler may be busy with other jobs like syncing sometimes.
func (m *Metrics) RecordSequencerBuildingDiffTime(duration time.Duration) {
m.SequencerBuildingDiffTotal.Inc()
m.SequencerBuildingDiffDurationSeconds.Observe(float64(duration) / float64(time.Second))
}
// RecordSequencerSealingTime tracks the amount of time the sequencer took to finish sealing the block.
// Ideally this is 0, realistically it may take some time.
func (m *Metrics) RecordSequencerSealingTime(duration time.Duration) {
m.SequencerSealingTotal.Inc()
m.SequencerSealingDurationSeconds.Observe(float64(duration) / float64(time.Second))
}
// Serve starts the metrics server on the given hostname and port. // Serve starts the metrics server on the given hostname and port.
// The server will be closed when the passed-in context is cancelled. // The server will be closed when the passed-in context is cancelled.
func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error { func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
...@@ -467,7 +517,7 @@ func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error { ...@@ -467,7 +517,7 @@ func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
type noopMetricer struct{} type noopMetricer struct{}
var NoopMetrics = new(noopMetricer) var NoopMetrics Metricer = new(noopMetricer)
func (n *noopMetricer) RecordInfo(version string) { func (n *noopMetricer) RecordInfo(version string) {
} }
...@@ -539,3 +589,9 @@ func (n *noopMetricer) DecStreamCount() { ...@@ -539,3 +589,9 @@ func (n *noopMetricer) DecStreamCount() {
func (n *noopMetricer) RecordBandwidth(ctx context.Context, bwc *libp2pmetrics.BandwidthCounter) { func (n *noopMetricer) RecordBandwidth(ctx context.Context, bwc *libp2pmetrics.BandwidthCounter) {
} }
func (n *noopMetricer) RecordSequencerBuildingDiffTime(duration time.Duration) {
}
func (n *noopMetricer) RecordSequencerSealingTime(duration time.Duration) {
}
...@@ -7,17 +7,16 @@ import ( ...@@ -7,17 +7,16 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
) )
// TODO(inphi): add metrics
type rpcServer struct { type rpcServer struct {
endpoint string endpoint string
apis []rpc.API apis []rpc.API
...@@ -28,7 +27,7 @@ type rpcServer struct { ...@@ -28,7 +27,7 @@ type rpcServer struct {
sources.L2Client sources.L2Client
} }
func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Config, l2Client l2EthClient, dr driverClient, log log.Logger, appVersion string, m *metrics.Metrics) (*rpcServer, error) { func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Config, l2Client l2EthClient, dr driverClient, log log.Logger, appVersion string, m metrics.Metricer) (*rpcServer, error) {
api := NewNodeAPI(rollupCfg, l2Client, dr, log.New("rpc", "node"), m) api := NewNodeAPI(rollupCfg, l2Client, dr, log.New("rpc", "node"), m)
// TODO: extend RPC config with options for WS, IPC and HTTP RPC connections // TODO: extend RPC config with options for WS, IPC and HTTP RPC connections
endpoint := net.JoinHostPort(rpcCfg.ListenAddr, strconv.Itoa(rpcCfg.ListenPort)) endpoint := net.JoinHostPort(rpcCfg.ListenAddr, strconv.Itoa(rpcCfg.ListenPort))
......
...@@ -110,7 +110,7 @@ func TestOutputAtBlock(t *testing.T) { ...@@ -110,7 +110,7 @@ func TestOutputAtBlock(t *testing.T) {
status := randomSyncStatus(rand.New(rand.NewSource(123))) status := randomSyncStatus(rand.New(rand.NewSource(123)))
drClient.ExpectBlockRefWithStatus(0xdcdc89, ref, status, nil) drClient.ExpectBlockRefWithStatus(0xdcdc89, ref, status, nil)
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NewMetrics("")) server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, server.Start()) require.NoError(t, server.Start())
defer server.Stop() defer server.Stop()
...@@ -142,7 +142,7 @@ func TestVersion(t *testing.T) { ...@@ -142,7 +142,7 @@ func TestVersion(t *testing.T) {
rollupCfg := &rollup.Config{ rollupCfg := &rollup.Config{
// ignore other rollup config info in this test // ignore other rollup config info in this test
} }
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NewMetrics("")) server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NoopMetrics)
assert.NoError(t, err) assert.NoError(t, err)
assert.NoError(t, server.Start()) assert.NoError(t, server.Start())
defer server.Stop() defer server.Stop()
...@@ -184,7 +184,7 @@ func TestSyncStatus(t *testing.T) { ...@@ -184,7 +184,7 @@ func TestSyncStatus(t *testing.T) {
rollupCfg := &rollup.Config{ rollupCfg := &rollup.Config{
// ignore other rollup config info in this test // ignore other rollup config info in this test
} }
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NewMetrics("")) server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NoopMetrics)
assert.NoError(t, err) assert.NoError(t, err)
assert.NoError(t, server.Start()) assert.NoError(t, server.Start())
defer server.Stop() defer server.Stop()
......
...@@ -2,6 +2,7 @@ package driver ...@@ -2,6 +2,7 @@ package driver
import ( import (
"context" "context"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -28,6 +29,8 @@ type Metrics interface { ...@@ -28,6 +29,8 @@ type Metrics interface {
RecordL1ReorgDepth(d uint64) RecordL1ReorgDepth(d uint64)
CountSequencedTxs(count int) CountSequencedTxs(count int)
SequencerMetrics
} }
type L1Chain interface { type L1Chain interface {
...@@ -70,11 +73,9 @@ type L1OriginSelectorIface interface { ...@@ -70,11 +73,9 @@ type L1OriginSelectorIface interface {
} }
type SequencerIface interface { type SequencerIface interface {
StartBuildingBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) error StartBuildingBlock(ctx context.Context, l1Origin eth.L1BlockRef) error
CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayload, error) CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayload, error)
PlanNextSequencerAction(sequenceErr error) (delay time.Duration, seal bool, onto eth.BlockID)
// createNewBlock builds a new block based on the L2 Head, L1 Origin, and the current mempool.
CreateNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error)
} }
type Network interface { type Network interface {
...@@ -84,11 +85,11 @@ type Network interface { ...@@ -84,11 +85,11 @@ type Network interface {
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
sequencer := NewSequencer(log, cfg, l1, l2)
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
findL1Origin := NewL1OriginSelector(log, cfg, l1, driverCfg.SequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, l1, driverCfg.SequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics) derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics)
sequencer := NewSequencer(log, cfg, l1, l2, derivationPipeline, metrics)
return &Driver{ return &Driver{
l1State: l1State, l1State: l1State,
......
...@@ -2,6 +2,7 @@ package driver ...@@ -2,6 +2,7 @@ package driver
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
...@@ -19,33 +20,57 @@ type Downloader interface { ...@@ -19,33 +20,57 @@ type Downloader interface {
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
} }
type SequencerMetrics interface {
RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration)
}
type EngineState interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef
}
// Sequencer implements the sequencing interface of the driver: it starts and completes block building jobs. // Sequencer implements the sequencing interface of the driver: it starts and completes block building jobs.
type Sequencer struct { type Sequencer struct {
log log.Logger log log.Logger
config *rollup.Config config *rollup.Config
l1 Downloader l1 Downloader
l2 derive.Engine l2 derive.Engine
engineState EngineState
buildingOnto eth.ForkchoiceState buildingOnto eth.L2BlockRef
buildingID eth.PayloadID buildingID eth.PayloadID
buildingStartTime time.Time
metrics SequencerMetrics
} }
func NewSequencer(log log.Logger, cfg *rollup.Config, l1 Downloader, l2 derive.Engine) *Sequencer { func NewSequencer(log log.Logger, cfg *rollup.Config, l1 Downloader, l2 derive.Engine, engineState EngineState, metrics SequencerMetrics) *Sequencer {
return &Sequencer{ return &Sequencer{
log: log, log: log,
config: cfg, config: cfg,
l1: l1, l1: l1,
l2: l2, l2: l2,
metrics: metrics,
engineState: engineState,
} }
} }
// StartBuildingBlock initiates a block building job on top of the given L2 head, safe and finalized blocks, and using the provided l1Origin. // StartBuildingBlock initiates a block building job on top of the given L2 head, safe and finalized blocks, and using the provided l1Origin.
func (d *Sequencer) StartBuildingBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) error { func (d *Sequencer) StartBuildingBlock(ctx context.Context, l1Origin eth.L1BlockRef) error {
l2Head := d.engineState.UnsafeL2Head()
if !(l2Head.L1Origin.Hash == l1Origin.ParentHash || l2Head.L1Origin.Hash == l1Origin.Hash) {
return fmt.Errorf("cannot build new L2 block with L1 origin %s (parent L1 %s) on current L2 head %s with L1 origin %s", l1Origin, l1Origin.ParentHash, l2Head, l2Head.L1Origin)
}
d.log.Info("creating new block", "parent", l2Head, "l1Origin", l1Origin) d.log.Info("creating new block", "parent", l2Head, "l1Origin", l1Origin)
if d.buildingID != (eth.PayloadID{}) { // This may happen when we decide to build a different block in response to a reorg. Or when previous block building failed. if d.buildingID != (eth.PayloadID{}) { // This may happen when we decide to build a different block in response to a reorg. Or when previous block building failed.
d.log.Warn("did not finish previous block building, starting new building now", "prev_onto", d.buildingOnto.HeadBlockHash, "prev_payload_id", d.buildingID, "new_onto", l2Head) d.log.Warn("did not finish previous block building, starting new building now", "prev_onto", d.buildingOnto, "prev_payload_id", d.buildingID, "new_onto", l2Head)
} }
d.buildingStartTime = time.Now()
fetchCtx, cancel := context.WithTimeout(ctx, time.Second*20) fetchCtx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel() defer cancel()
...@@ -65,15 +90,15 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context, l2Head eth.L2BlockRe ...@@ -65,15 +90,15 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context, l2Head eth.L2BlockRe
// updated as a result of executing the block based on the attributes described above. // updated as a result of executing the block based on the attributes described above.
fc := eth.ForkchoiceState{ fc := eth.ForkchoiceState{
HeadBlockHash: l2Head.Hash, HeadBlockHash: l2Head.Hash,
SafeBlockHash: l2SafeHead.Hash, SafeBlockHash: d.engineState.SafeL2Head().Hash,
FinalizedBlockHash: l2Finalized.Hash, FinalizedBlockHash: d.engineState.Finalized().Hash,
} }
// Start a payload building process. // Start a payload building process.
id, errTyp, err := derive.StartPayload(ctx, d.l2, fc, attrs) id, errTyp, err := derive.StartPayload(ctx, d.l2, fc, attrs)
if err != nil { if err != nil {
return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err) return fmt.Errorf("failed to start building on top of L2 chain %s, error (%d): %w", l2Head, errTyp, err)
} }
d.buildingOnto = fc d.buildingOnto = l2Head
d.buildingID = id d.buildingID = id
return nil return nil
} }
...@@ -85,36 +110,72 @@ func (d *Sequencer) CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPa ...@@ -85,36 +110,72 @@ func (d *Sequencer) CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPa
if d.buildingID == (eth.PayloadID{}) { if d.buildingID == (eth.PayloadID{}) {
return nil, fmt.Errorf("cannot complete payload building: not currently building a payload") return nil, fmt.Errorf("cannot complete payload building: not currently building a payload")
} }
sealingStart := time.Now()
l2Head := d.engineState.UnsafeL2Head()
if d.buildingOnto.Hash != l2Head.Hash {
return nil, fmt.Errorf("engine reorged from %s to %s while building block", d.buildingOnto, l2Head)
}
fc := eth.ForkchoiceState{
HeadBlockHash: l2Head.Hash,
SafeBlockHash: d.engineState.SafeL2Head().Hash,
FinalizedBlockHash: d.engineState.Finalized().Hash,
}
// Actually execute the block and add it to the head of the chain. // Actually execute the block and add it to the head of the chain.
payload, errTyp, err := derive.ConfirmPayload(ctx, d.log, d.l2, d.buildingOnto, d.buildingID, false) payload, errTyp, err := derive.ConfirmPayload(ctx, d.log, d.l2, fc, d.buildingID, false)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to complete building on top of L2 chain %s, error (%d): %w", d.buildingOnto.HeadBlockHash, errTyp, err) return nil, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", d.buildingOnto, d.buildingID, errTyp, err)
} }
now := time.Now()
sealTime := now.Sub(sealingStart)
buildTime := now.Sub(d.buildingStartTime)
d.metrics.RecordSequencerSealingTime(sealTime)
d.metrics.RecordSequencerBuildingDiffTime(buildTime - time.Duration(d.config.BlockTime)*time.Second)
d.log.Debug("sequenced block", "seal_time", sealTime, "build_time", buildTime)
d.buildingID = eth.PayloadID{} d.buildingID = eth.PayloadID{}
return payload, nil return payload, nil
} }
// CreateNewBlock sequences a L2 block with immediate building and sealing. // PlanNextSequencerAction returns a desired delay till the next action, and if we should seal the block:
func (d *Sequencer) CreateNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error) { // - true whenever we need to complete a block
if err := d.StartBuildingBlock(ctx, l2Head, l2SafeHead, l2Finalized, l1Origin); err != nil { // - false whenever we need to start a block
return l2Head, nil, err func (d *Sequencer) PlanNextSequencerAction(sequenceErr error) (delay time.Duration, seal bool, onto eth.BlockID) {
blockTime := time.Duration(d.config.BlockTime) * time.Second
head := d.engineState.UnsafeL2Head()
// based on the build error, delay and start over again
if sequenceErr != nil {
if errors.Is(sequenceErr, UninitializedL1StateErr) {
// temporary errors are not so bad, just retry in 500ms
return 500 * time.Millisecond, false, head.ID()
} else {
// we just hit an unknown type of error, delay a re-attempt by as much as a block
return blockTime, false, head.ID()
}
} }
payloadTime := time.Unix(int64(l2Head.Time+d.config.BlockTime), 0) payloadTime := time.Unix(int64(head.Time+d.config.BlockTime), 0)
remaining := -time.Until(payloadTime) remainingTime := time.Until(payloadTime)
// TODO: allowing to breathe when remaining time is in the negative is very generous,
// we can reduce this if the block building timing gets better with PR 3818 // If we started building a block already, and if that work is still consistent,
d.log.Debug("using remaining time for better block production", "remaining_time", remaining) // then we would like to finish it by sealing the block.
time.Sleep(500 * time.Millisecond) if d.buildingID != (eth.PayloadID{}) && d.buildingOnto.Hash == head.Hash {
// if we started building already, then we will schedule the sealing.
payload, err := d.CompleteBuildingBlock(ctx) if remainingTime < sealingDuration {
if err != nil { return 0, true, head.ID() // if there's not enough time for sealing, don't wait.
return l2Head, nil, err } else {
// finish with margin of sealing duration before payloadTime
return remainingTime - sealingDuration, true, head.ID()
}
} else {
// if we did not yet start building, then we will schedule the start.
if remainingTime > blockTime {
// if we have too much time, then wait before starting the build
return remainingTime - blockTime, false, head.ID()
} else {
// otherwise start instantly
return 0, false, head.ID()
}
} }
// Generate an L2 block ref from the payload.
ref, err := derive.PayloadToBlockRef(payload, &d.config.Genesis)
return ref, payload, err
} }
...@@ -20,6 +20,11 @@ import ( ...@@ -20,6 +20,11 @@ import (
// Deprecated: use eth.SyncStatus instead. // Deprecated: use eth.SyncStatus instead.
type SyncStatus = eth.SyncStatus type SyncStatus = eth.SyncStatus
// sealingDuration defines the expected time it takes to seal the block
const sealingDuration = time.Millisecond * 50
var UninitializedL1StateErr = errors.New("the L1 Head in L1 State is not initialized yet")
type Driver struct { type Driver struct {
l1State L1StateIface l1State L1StateIface
...@@ -127,16 +132,13 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa ...@@ -127,16 +132,13 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa
} }
} }
// createNewL2Block builds a L2 block on top of the L2 Head (unsafe). Used by Sequencer nodes to // startNewL2Block starts sequencing a new L2 block on top of the unsafe L2 Head.
// construct new L2 blocks. Verifier nodes will use handleEpoch instead. func (s *Driver) startNewL2Block(ctx context.Context) error {
func (s *Driver) createNewL2Block(ctx context.Context) error {
l2Head := s.derivation.UnsafeL2Head() l2Head := s.derivation.UnsafeL2Head()
l2Safe := s.derivation.SafeL2Head()
l2Finalized := s.derivation.Finalized()
l1Head := s.l1State.L1Head() l1Head := s.l1State.L1Head()
if l1Head == (eth.L1BlockRef{}) { if l1Head == (eth.L1BlockRef{}) {
return derive.NewTemporaryError(errors.New("L1 Head in L1 State is not initizalited yet")) return UninitializedL1StateErr
} }
// Figure out which L1 origin block we're going to be building on top of. // Figure out which L1 origin block we're going to be building on top of.
...@@ -150,7 +152,7 @@ func (s *Driver) createNewL2Block(ctx context.Context) error { ...@@ -150,7 +152,7 @@ func (s *Driver) createNewL2Block(ctx context.Context) error {
// reached. Don't produce any blocks until we're at that genesis block. // reached. Don't produce any blocks until we're at that genesis block.
if l1Origin.Number < s.config.Genesis.L1.Number { if l1Origin.Number < s.config.Genesis.L1.Number {
s.log.Info("Skipping block production because the next L1 Origin is behind the L1 genesis", "next", l1Origin.ID(), "genesis", s.config.Genesis.L1) s.log.Info("Skipping block production because the next L1 Origin is behind the L1 genesis", "next", l1Origin.ID(), "genesis", s.config.Genesis.L1)
return nil return fmt.Errorf("the L1 origin %s cannot be before genesis at %s", l1Origin, s.config.Genesis.L1)
} }
// Should never happen. Sequencer will halt if we get into this situation somehow. // Should never happen. Sequencer will halt if we get into this situation somehow.
...@@ -162,13 +164,27 @@ func (s *Driver) createNewL2Block(ctx context.Context) error { ...@@ -162,13 +164,27 @@ func (s *Driver) createNewL2Block(ctx context.Context) error {
l2Head, nextL2Time, l1Origin, l1Origin.Time) l2Head, nextL2Time, l1Origin, l1Origin.Time)
} }
// Actually create the new block. // Start creating the new block.
newUnsafeL2Head, payload, err := s.sequencer.CreateNewBlock(ctx, l2Head, l2Safe.ID(), l2Finalized.ID(), l1Origin) return s.sequencer.StartBuildingBlock(ctx, l1Origin)
}
// completeNewBlock completes a previously started L2 block sequencing job.
func (s *Driver) completeNewBlock(ctx context.Context) error {
payload, err := s.sequencer.CompleteBuildingBlock(ctx)
if err != nil { if err != nil {
s.log.Error("Could not extend chain as sequencer", "err", err, "l2_parent", l2Head, "l1_origin", l1Origin) s.metrics.RecordSequencingError()
s.log.Error("Failed to seal block as sequencer", "err", err)
return err return err
} }
// Generate an L2 block ref from the payload.
newUnsafeL2Head, err := derive.PayloadToBlockRef(payload, &s.config.Genesis)
if err != nil {
s.metrics.RecordSequencingError()
s.log.Error("Sequenced payload cannot be transformed into valid L2 block reference", "err", err)
return fmt.Errorf("sequenced payload cannot be transformed into valid L2 block reference: %w", err)
}
// Update our L2 head block based on the new unsafe block we just generated. // Update our L2 head block based on the new unsafe block we just generated.
s.derivation.SetUnsafeHead(newUnsafeL2Head) s.derivation.SetUnsafeHead(newUnsafeL2Head)
...@@ -182,7 +198,6 @@ func (s *Driver) createNewL2Block(ctx context.Context) error { ...@@ -182,7 +198,6 @@ func (s *Driver) createNewL2Block(ctx context.Context) error {
// publishing of unsafe data via p2p is optional. Errors are not severe enough to change/halt sequencing but should be logged and metered. // publishing of unsafe data via p2p is optional. Errors are not severe enough to change/halt sequencing but should be logged and metered.
} }
} }
return nil return nil
} }
...@@ -194,32 +209,9 @@ func (s *Driver) eventLoop() { ...@@ -194,32 +209,9 @@ func (s *Driver) eventLoop() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// Start a ticker to produce L2 blocks at a constant rate. Ticker will only run if we're
// running in Sequencer mode.
var l2BlockCreationTickerCh <-chan time.Time
if s.driverConfig.SequencerEnabled {
l2BlockCreationTicker := time.NewTicker(time.Duration(s.config.BlockTime) * time.Second)
defer l2BlockCreationTicker.Stop()
l2BlockCreationTickerCh = l2BlockCreationTicker.C
}
// stepReqCh is used to request that the driver attempts to step forward by one L1 block. // stepReqCh is used to request that the driver attempts to step forward by one L1 block.
stepReqCh := make(chan struct{}, 1) stepReqCh := make(chan struct{}, 1)
// l2BlockCreationReqCh is used to request that the driver create a new L2 block. Only used if
// we're running in Sequencer mode, because otherwise we'll be deriving our blocks via the
// stepping process.
l2BlockCreationReqCh := make(chan struct{}, 1)
// reqL2BlockCreation requests that a block be created. Won't deadlock if the channel is full.
reqL2BlockCreation := func() {
select {
case l2BlockCreationReqCh <- struct{}{}:
// Don't deadlock if the channel is already full
default:
}
}
// channel, nil by default (not firing), but used to schedule re-attempts with delay // channel, nil by default (not firing), but used to schedule re-attempts with delay
var delayedStepReq <-chan time.Time var delayedStepReq <-chan time.Time
...@@ -257,39 +249,60 @@ func (s *Driver) eventLoop() { ...@@ -257,39 +249,60 @@ func (s *Driver) eventLoop() {
// L1 chain that we need to handle. // L1 chain that we need to handle.
reqStep() reqStep()
blockTime := time.Duration(s.config.BlockTime) * time.Second
var sequenceErr error
var sequenceErrTime time.Time
sequencerTimer := time.NewTimer(0)
var sequencerCh <-chan time.Time
var sequencingPlannedOnto eth.BlockID
var sequencerSealNext bool
planSequencerAction := func() {
delay, seal, onto := s.sequencer.PlanNextSequencerAction(sequenceErr)
if sequenceErr != nil && time.Since(sequenceErrTime) > delay {
sequenceErr = nil
}
sequencerCh = sequencerTimer.C
if len(sequencerCh) > 0 { // empty if not already drained before resetting
<-sequencerCh
}
sequencerTimer.Reset(delay)
sequencingPlannedOnto = onto
sequencerSealNext = seal
}
for { for {
select { // If we are sequencing, update the trigger for the next sequencer action.
case <-l2BlockCreationTickerCh: // This may adjust at any time based on fork-choice changes or previous errors.
s.log.Trace("L2 Creation Ticker") if s.driverConfig.SequencerEnabled {
s.snapshot("L2 Creation Ticker") // update sequencer time if the head changed
reqL2BlockCreation() if sequencingPlannedOnto != s.derivation.UnsafeL2Head().ID() {
planSequencerAction()
case <-l2BlockCreationReqCh:
s.snapshot("L2 Block Creation Request")
if !s.idleDerivation {
s.log.Warn("not creating block, node is deriving new l2 data", "head_l1", s.l1State.L1Head())
break
}
ctx, cancel := context.WithTimeout(ctx, 20*time.Minute)
err := s.createNewL2Block(ctx)
cancel()
if err != nil {
s.log.Error("Error creating new L2 block", "err", err)
s.metrics.RecordSequencingError()
break // if we fail, we wait for the next block creation trigger.
} }
} else {
sequencerCh = nil
}
// We need to catch up to the next origin as quickly as possible. We can do this by select {
// requesting a new block ASAP instead of waiting for the next tick. case <-sequencerCh:
// We don't request a block if the confirmation depth is not met. s.log.Info("sequencing now!", "seal", sequencerSealNext, "idle_derivation", s.idleDerivation)
l2Head := s.derivation.UnsafeL2Head() if sequencerSealNext {
if wallClock := uint64(time.Now().Unix()); l2Head.Time+s.config.BlockTime <= wallClock { // try to seal the current block task, and allow it to take up to 3 block times.
s.log.Trace("Building another L2 block asap to catch up with wallclock", // If this fails we will simply start a new block building job.
"l2_unsafe", l2Head, "l2_unsafe_time", l2Head.Time, "wallclock", wallClock) ctx, cancel := context.WithTimeout(ctx, 3*blockTime)
// But not too quickly to minimize busy-waiting for new blocks sequenceErr = s.completeNewBlock(ctx)
time.AfterFunc(time.Millisecond*10, reqL2BlockCreation) cancel()
} else {
// Start the block building, don't allow the starting of sequencing to get stuck for more the time of 1 block.
ctx, cancel := context.WithTimeout(ctx, blockTime)
sequenceErr = s.startNewL2Block(ctx)
cancel()
} }
if sequenceErr != nil {
s.log.Error("sequencing error", "err", sequenceErr)
sequenceErrTime = time.Now()
}
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case payload := <-s.unsafeL2Payloads: case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
......
...@@ -4,49 +4,81 @@ package driver ...@@ -4,49 +4,81 @@ package driver
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"math/rand" "math/rand"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
) )
type TestDummyOutputImpl struct { type TestDummyOutputImpl struct {
willError bool willError bool
SequencerIface
cfg *rollup.Config
l1Origin eth.L1BlockRef
l2Head eth.L2BlockRef
} }
func (d TestDummyOutputImpl) CreateNewBlock(ctx context.Context, l2Head eth.L2BlockRef, l2SafeHead eth.BlockID, l2Finalized eth.BlockID, l1Origin eth.L1BlockRef) (eth.L2BlockRef, *eth.ExecutionPayload, error) { func (d *TestDummyOutputImpl) PlanNextSequencerAction(sequenceErr error) (delay time.Duration, seal bool, onto eth.BlockID) {
return 0, d.l1Origin != (eth.L1BlockRef{}), d.l2Head.ParentID()
}
func (d *TestDummyOutputImpl) StartBuildingBlock(ctx context.Context, l1Origin eth.L1BlockRef) error {
d.l1Origin = l1Origin
return nil
}
func (d *TestDummyOutputImpl) CompleteBuildingBlock(ctx context.Context) (*eth.ExecutionPayload, error) {
// If we're meant to error, return one // If we're meant to error, return one
if d.willError { if d.willError {
return l2Head, nil, errors.New("the TestDummyOutputImpl.createNewBlock operation failed") return nil, errors.New("the TestDummyOutputImpl.createNewBlock operation failed")
}
info := &testutils.MockBlockInfo{
InfoHash: d.l1Origin.Hash,
InfoParentHash: d.l1Origin.ParentHash,
InfoCoinbase: common.Address{},
InfoRoot: common.Hash{},
InfoNum: d.l1Origin.Number,
InfoTime: d.l1Origin.Time,
InfoMixDigest: [32]byte{},
InfoBaseFee: big.NewInt(123),
InfoReceiptRoot: common.Hash{},
}
infoTx, err := derive.L1InfoDepositBytes(d.l2Head.SequenceNumber, info, eth.SystemConfig{})
if err != nil {
panic(err)
} }
payload := eth.ExecutionPayload{ payload := eth.ExecutionPayload{
ParentHash: common.Hash{}, ParentHash: d.l2Head.Hash,
FeeRecipient: common.Address{}, FeeRecipient: common.Address{},
StateRoot: eth.Bytes32{}, StateRoot: eth.Bytes32{},
ReceiptsRoot: eth.Bytes32{}, ReceiptsRoot: eth.Bytes32{},
LogsBloom: eth.Bytes256{}, LogsBloom: eth.Bytes256{},
PrevRandao: eth.Bytes32{}, PrevRandao: eth.Bytes32{},
BlockNumber: 0, BlockNumber: eth.Uint64Quantity(d.l2Head.Number + 1),
GasLimit: 0, GasLimit: 0,
GasUsed: 0, GasUsed: 0,
Timestamp: 0, Timestamp: eth.Uint64Quantity(d.l2Head.Time + d.cfg.BlockTime),
ExtraData: nil, ExtraData: nil,
BaseFeePerGas: eth.Uint256Quantity{}, BaseFeePerGas: eth.Uint256Quantity{},
BlockHash: common.Hash{}, BlockHash: common.Hash{123},
Transactions: []eth.Data{}, Transactions: []eth.Data{infoTx},
} }
return l2Head, &payload, nil return &payload, nil
} }
var _ SequencerIface = (*TestDummyOutputImpl)(nil)
type TestDummyDerivationPipeline struct { type TestDummyDerivationPipeline struct {
DerivationPipeline DerivationPipeline
l2Head eth.L2BlockRef l2Head eth.L2BlockRef
...@@ -104,27 +136,30 @@ func TestRejectCreateBlockBadTimestamp(t *testing.T) { ...@@ -104,27 +136,30 @@ func TestRejectCreateBlockBadTimestamp(t *testing.T) {
l2HeadRef.Time = l2l1OriginBlock.Time - (cfg.BlockTime * 2) l2HeadRef.Time = l2l1OriginBlock.Time - (cfg.BlockTime * 2)
// Create our outputter // Create our outputter
outputProvider := TestDummyOutputImpl{willError: false} outputProvider := &TestDummyOutputImpl{cfg: &cfg, l2Head: l2HeadRef, willError: false}
// Create our state // Create our state
s := Driver{ s := Driver{
l1State: &L1State{ l1State: &L1State{
l1Head: l1HeadRef, l1Head: l1HeadRef,
log: log.New(), log: log.New(),
metrics: &metrics.Metrics{TransactionsSequencedTotal: prometheus.NewCounter(prometheus.CounterOpts{})}, metrics: metrics.NoopMetrics,
}, },
log: log.New(), log: log.New(),
l1OriginSelector: TestDummyL1OriginSelector{retval: l1HeadRef}, l1OriginSelector: TestDummyL1OriginSelector{retval: l1HeadRef},
config: &cfg, config: &cfg,
sequencer: outputProvider, sequencer: outputProvider,
derivation: TestDummyDerivationPipeline{}, derivation: TestDummyDerivationPipeline{},
metrics: &metrics.Metrics{TransactionsSequencedTotal: prometheus.NewCounter(prometheus.CounterOpts{})}, metrics: metrics.NoopMetrics,
} }
// Create a new block // Create a new block
// - L2Head's L1Origin, its timestamp should be greater than L1 genesis. // - L2Head's L1Origin, its timestamp should be greater than L1 genesis.
// - L2Head timestamp + BlockTime should be greater than or equal to the L1 Time. // - L2Head timestamp + BlockTime should be greater than or equal to the L1 Time.
err := s.createNewL2Block(ctx) err := s.startNewL2Block(ctx)
if err == nil {
err = s.completeNewBlock(ctx)
}
// Verify the L1Origin's block number is greater than L1 genesis in our config. // Verify the L1Origin's block number is greater than L1 genesis in our config.
if l2l1OriginBlock.Number < s.config.Genesis.L1.Number { if l2l1OriginBlock.Number < s.config.Genesis.L1.Number {
...@@ -187,27 +222,30 @@ func FuzzRejectCreateBlockBadTimestamp(f *testing.F) { ...@@ -187,27 +222,30 @@ func FuzzRejectCreateBlockBadTimestamp(f *testing.F) {
l2HeadRef.Time = currentL2HeadTime l2HeadRef.Time = currentL2HeadTime
// Create our outputter // Create our outputter
outputProvider := TestDummyOutputImpl{willError: forceOutputFail} outputProvider := &TestDummyOutputImpl{cfg: &cfg, l2Head: l2HeadRef, willError: forceOutputFail}
// Create our state // Create our state
s := Driver{ s := Driver{
l1State: &L1State{ l1State: &L1State{
l1Head: l1HeadRef, l1Head: l1HeadRef,
log: log.New(), log: log.New(),
metrics: &metrics.Metrics{TransactionsSequencedTotal: prometheus.NewCounter(prometheus.CounterOpts{})}, metrics: metrics.NoopMetrics,
}, },
log: log.New(), log: log.New(),
l1OriginSelector: TestDummyL1OriginSelector{retval: l1HeadRef}, l1OriginSelector: TestDummyL1OriginSelector{retval: l1HeadRef},
config: &cfg, config: &cfg,
sequencer: outputProvider, sequencer: outputProvider,
derivation: TestDummyDerivationPipeline{}, derivation: TestDummyDerivationPipeline{},
metrics: &metrics.Metrics{TransactionsSequencedTotal: prometheus.NewCounter(prometheus.CounterOpts{})}, metrics: metrics.NoopMetrics,
} }
// Create a new block // Create a new block
// - L2Head's L1Origin, its timestamp should be greater than L1 genesis. // - L2Head's L1Origin, its timestamp should be greater than L1 genesis.
// - L2Head timestamp + BlockTime should be greater than or equal to the L1 Time. // - L2Head timestamp + BlockTime should be greater than or equal to the L1 Time.
err := s.createNewL2Block(ctx) err := s.startNewL2Block(ctx)
if err == nil {
err = s.completeNewBlock(ctx)
}
// Verify the L1Origin's timestamp is greater than L1 genesis in our config. // Verify the L1Origin's timestamp is greater than L1 genesis in our config.
if l2l1OriginBlock.Number < s.config.Genesis.L1.Number { if l2l1OriginBlock.Number < s.config.Genesis.L1.Number {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment