Commit 7ec23bb8 authored by Zach Howard's avatar Zach Howard Committed by GitHub

op-node: adds pre unsafe-publish hook to check current leader sequencer (#8894)

parent d27f4695
...@@ -43,6 +43,11 @@ func (c *APIClient) AddServerAsVoter(ctx context.Context, id string, addr string ...@@ -43,6 +43,11 @@ func (c *APIClient) AddServerAsVoter(ctx context.Context, id string, addr string
return c.c.CallContext(ctx, nil, prefixRPC("addServerAsVoter"), id, addr) return c.c.CallContext(ctx, nil, prefixRPC("addServerAsVoter"), id, addr)
} }
// Close closes the underlying RPC client.
func (c *APIClient) Close() {
c.c.Close()
}
// CommitUnsafePayload implements API. // CommitUnsafePayload implements API.
func (c *APIClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error { func (c *APIClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return c.c.CallContext(ctx, nil, prefixRPC("commitUnsafePayload"), payload) return c.c.CallContext(ctx, nil, prefixRPC("commitUnsafePayload"), payload)
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
...@@ -108,7 +109,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) { ...@@ -108,7 +109,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
} }
s.l2Building = false s.l2Building = false
_, err := s.sequencer.CompleteBuildingBlock(t.Ctx(), async.NoOpGossiper{}) _, err := s.sequencer.CompleteBuildingBlock(t.Ctx(), async.NoOpGossiper{}, &conductor.NoOpConductor{})
// TODO: there may be legitimate temporary errors here, if we mock engine API RPC-failure. // TODO: there may be legitimate temporary errors here, if we mock engine API RPC-failure.
// For advanced tests we can catch those and print a warning instead. // For advanced tests we can catch those and print a warning instead.
require.NoError(t, err) require.NoError(t, err)
......
...@@ -256,6 +256,24 @@ var ( ...@@ -256,6 +256,24 @@ var (
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Hidden: true, Hidden: true,
} }
ConductorEnabledFlag = &cli.BoolFlag{
Name: "conductor.enabled",
Usage: "Enable the conductor service",
EnvVars: prefixEnvVars("CONDUCTOR_ENABLED"),
Value: false,
}
ConductorRpcFlag = &cli.StringFlag{
Name: "conductor.rpc",
Usage: "Conductor service rpc endpoint",
EnvVars: prefixEnvVars("CONDUCTOR_RPC"),
Value: "http://127.0.0.1:8547",
}
ConductorRpcTimeoutFlag = &cli.DurationFlag{
Name: "conductor.rpc-timeout",
Usage: "Conductor service rpc timeout",
EnvVars: prefixEnvVars("CONDUCTOR_RPC_TIMEOUT"),
Value: time.Second * 1,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -295,6 +313,9 @@ var optionalFlags = []cli.Flag{ ...@@ -295,6 +313,9 @@ var optionalFlags = []cli.Flag{
RollupHalt, RollupHalt,
RollupLoadProtocolVersions, RollupLoadProtocolVersions,
L1RethDBPath, L1RethDBPath,
ConductorEnabledFlag,
ConductorRpcFlag,
ConductorRpcTimeoutFlag,
} }
var DeprecatedFlags = []cli.Flag{ var DeprecatedFlags = []cli.Flag{
......
package node
import (
"context"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/log"
conductorRpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
)
// ConductorClient is a client for the op-conductor RPC service.
type ConductorClient struct {
cfg *Config
metrics *metrics.Metrics
log log.Logger
apiClient *conductorRpc.APIClient
}
var _ conductor.SequencerConductor = &ConductorClient{}
// NewConductorClient returns a new conductor client for the op-conductor RPC service.
func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) *ConductorClient {
return &ConductorClient{cfg: cfg, metrics: metrics, log: log}
}
// Initialize initializes the conductor client.
func (c *ConductorClient) initialize() error {
if c.apiClient != nil {
return nil
}
conductorRpcClient, err := dial.DialRPCClientWithTimeout(context.Background(), time.Minute*1, c.log, c.cfg.ConductorRpc)
if err != nil {
return fmt.Errorf("failed to dial conductor RPC: %w", err)
}
c.apiClient = conductorRpc.NewAPIClient(conductorRpcClient)
return nil
}
// Leader returns true if this node is the leader sequencer.
func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
if err := c.initialize(); err != nil {
return false, err
}
ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout)
defer cancel()
isLeader, err := retry.Do(ctx, 2, retry.Fixed(50*time.Millisecond), func() (bool, error) {
record := c.metrics.RecordRPCClientRequest("conductor_leader")
result, err := c.apiClient.Leader(ctx)
record(err)
return result, err
})
return isLeader, err
}
// CommitUnsafePayload commits an unsafe payload to the conductor log.
func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
if err := c.initialize(); err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout)
defer cancel()
// extra bool return value is required for the generic, can be ignored.
_, err := retry.Do(ctx, 2, retry.Fixed(50*time.Millisecond), func() (bool, error) {
record := c.metrics.RecordRPCClientRequest("conductor_commitUnsafePayload")
err := c.apiClient.CommitUnsafePayload(ctx, payload)
record(err)
return true, err
})
return err
}
func (c *ConductorClient) Close() {
if c.apiClient == nil {
return
}
c.apiClient.Close()
c.apiClient = nil
}
...@@ -64,6 +64,11 @@ type Config struct { ...@@ -64,6 +64,11 @@ type Config struct {
// [OPTIONAL] The reth DB path to read receipts from // [OPTIONAL] The reth DB path to read receipts from
RethDBPath string RethDBPath string
// Conductor is used to determine this node is the leader sequencer.
ConductorEnabled bool
ConductorRpc string
ConductorRpcTimeout time.Duration
} }
type RPCConfig struct { type RPCConfig struct {
...@@ -151,5 +156,13 @@ func (cfg *Config) Check() error { ...@@ -151,5 +156,13 @@ func (cfg *Config) Check() error {
if !(cfg.RollupHalt == "" || cfg.RollupHalt == "major" || cfg.RollupHalt == "minor" || cfg.RollupHalt == "patch") { if !(cfg.RollupHalt == "" || cfg.RollupHalt == "major" || cfg.RollupHalt == "minor" || cfg.RollupHalt == "patch") {
return fmt.Errorf("invalid rollup halting option: %q", cfg.RollupHalt) return fmt.Errorf("invalid rollup halting option: %q", cfg.RollupHalt)
} }
if cfg.ConductorEnabled {
if state, _ := cfg.ConfigPersistence.SequencerState(); state != StateUnset {
return fmt.Errorf("config persistence must be disabled when conductor is enabled")
}
if !cfg.Driver.SequencerEnabled {
return fmt.Errorf("sequencer must be enabled when conductor is enabled")
}
}
return nil return nil
} }
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/heartbeat" "github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/version" "github.com/ethereum-optimism/optimism/op-node/version"
...@@ -367,7 +368,11 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -367,7 +368,11 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync) var sequencerConductor conductor.SequencerConductor = &conductor.NoOpConductor{}
if cfg.ConductorEnabled {
sequencerConductor = NewConductorClient(cfg, n.log, n.metrics)
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.beacon, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync, sequencerConductor)
return nil return nil
} }
......
package conductor
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// SequencerConductor is an interface for the driver to communicate with the sequencer conductor.
// It is used to determine if the current node is the active sequencer, and to commit unsafe payloads to the conductor log.
type SequencerConductor interface {
Leader(ctx context.Context) (bool, error)
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
Close()
}
// NoOpConductor is a no-op conductor that assumes this node is the leader sequencer.
type NoOpConductor struct{}
// Leader returns true if this node is the leader sequencer. NoOpConductor always returns true.
func (c *NoOpConductor) Leader(ctx context.Context) (bool, error) {
return true, nil
}
// CommitUnsafePayload commits an unsafe payload to the conductor log.
func (c *NoOpConductor) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return nil
}
// Close closes the conductor client.
func (c *NoOpConductor) Close() {}
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -170,7 +171,7 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR ...@@ -170,7 +171,7 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR
return BlockInsertOK, nil return BlockInsertOK, nil
} }
func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) { func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
if e.buildingID == (eth.PayloadID{}) { if e.buildingID == (eth.PayloadID{}) {
return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload") return nil, BlockInsertPrestateErr, fmt.Errorf("cannot complete payload building: not currently building a payload")
} }
...@@ -184,7 +185,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy ...@@ -184,7 +185,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
} }
// Update the safe head if the payload is built with the last attributes in the batch. // Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingID, updateSafe, agossip) envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingID, updateSafe, agossip, sequencerConductor)
if err != nil { if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingID, errTyp, err) return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingID, errTyp, err)
} }
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -68,7 +69,7 @@ type EngineControl interface { ...@@ -68,7 +69,7 @@ type EngineControl interface {
// If updateSafe, the resulting block will be marked as a safe block. // If updateSafe, the resulting block will be marked as a safe block.
StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error)
// ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned. // ConfirmPayload requests the engine to complete the current block. If no block is being built, or if it fails, an error is returned.
ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error)
// CancelPayload requests the engine to stop building the current block without making it canonical. // CancelPayload requests the engine to stop building the current block without making it canonical.
// This is optional, as the engine expires building jobs that are left uncompleted, but can still save resources. // This is optional, as the engine expires building jobs that are left uncompleted, but can still save resources.
CancelPayload(ctx context.Context, force bool) error CancelPayload(ctx context.Context, force bool) error
...@@ -537,7 +538,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { ...@@ -537,7 +538,7 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
lastInSpan := eq.safeAttributes.isLastInSpan lastInSpan := eq.safeAttributes.isLastInSpan
errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true) errType, err := eq.StartPayload(ctx, eq.ec.PendingSafeL2Head(), eq.safeAttributes, true)
if err == nil { if err == nil {
_, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}) _, errType, err = eq.ec.ConfirmPayload(ctx, async.NoOpGossiper{}, &conductor.NoOpConductor{})
} }
if err != nil { if err != nil {
switch errType { switch errType {
...@@ -590,8 +591,8 @@ func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, ...@@ -590,8 +591,8 @@ func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef,
return eq.ec.StartPayload(ctx, parent, attrs, updateSafe) return eq.ec.StartPayload(ctx, parent, attrs, updateSafe)
} }
func (eq *EngineQueue) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) { func (eq *EngineQueue) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
return eq.ec.ConfirmPayload(ctx, agossip) return eq.ec.ConfirmPayload(ctx, agossip, sequencerConductor)
} }
func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error { func (eq *EngineQueue) CancelPayload(ctx context.Context, force bool) error {
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -1003,7 +1004,7 @@ func TestBlockBuildingRace(t *testing.T) { ...@@ -1003,7 +1004,7 @@ func TestBlockBuildingRace(t *testing.T) {
eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil) eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil)
// Now complete the job, as external user of the engine // Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background(), async.NoOpGossiper{}) _, _, err = eq.ConfirmPayload(context.Background(), async.NoOpGossiper{}, &conductor.NoOpConductor{})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed") require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed")
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -126,6 +127,7 @@ func confirmPayload( ...@@ -126,6 +127,7 @@ func confirmPayload(
id eth.PayloadID, id eth.PayloadID,
updateSafe bool, updateSafe bool,
agossip async.AsyncGossiper, agossip async.AsyncGossiper,
sequencerConductor conductor.SequencerConductor,
) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) { ) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
var envelope *eth.ExecutionPayloadEnvelope var envelope *eth.ExecutionPayloadEnvelope
// if the payload is available from the async gossiper, it means it was not yet imported, so we reuse it // if the payload is available from the async gossiper, it means it was not yet imported, so we reuse it
...@@ -148,6 +150,9 @@ func confirmPayload( ...@@ -148,6 +150,9 @@ func confirmPayload(
if err := sanityCheckPayload(payload); err != nil { if err := sanityCheckPayload(payload); err != nil {
return nil, BlockInsertPayloadErr, err return nil, BlockInsertPayloadErr, err
} }
if err := sequencerConductor.CommitUnsafePayload(ctx, envelope); err != nil {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to commit unsafe payload to conductor: %w", err)
}
// begin gossiping as soon as possible // begin gossiping as soon as possible
// agossip.Clear() will be called later if an non-temporary error is found, or if the payload is successfully inserted // agossip.Clear() will be called later if an non-temporary error is found, or if the payload is successfully inserted
agossip.Gossip(envelope) agossip.Gossip(envelope)
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -76,9 +77,9 @@ type L1StateIface interface { ...@@ -76,9 +77,9 @@ type L1StateIface interface {
type SequencerIface interface { type SequencerIface interface {
StartBuildingBlock(ctx context.Context) error StartBuildingBlock(ctx context.Context) error
CompleteBuildingBlock(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error) CompleteBuildingBlock(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (*eth.ExecutionPayloadEnvelope, error)
PlanNextSequencerAction() time.Duration PlanNextSequencerAction() time.Duration
RunNextSequencerAction(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error) RunNextSequencerAction(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (*eth.ExecutionPayloadEnvelope, error)
BuildingOnto() eth.L2BlockRef BuildingOnto() eth.L2BlockRef
CancelBuildingBlock(ctx context.Context) CancelBuildingBlock(ctx context.Context)
} }
...@@ -113,7 +114,7 @@ type SequencerStateListener interface { ...@@ -113,7 +114,7 @@ type SequencerStateListener interface {
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1Blobs derive.L1BlobsFetcher, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1Blobs derive.L1BlobsFetcher, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener, syncCfg *sync.Config, sequencerConductor conductor.SequencerConductor) *Driver {
l1 = NewMeteredL1Fetcher(l1, metrics) l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
...@@ -127,32 +128,33 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1 ...@@ -127,32 +128,33 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1
driverCtx, driverCancel := context.WithCancel(context.Background()) driverCtx, driverCancel := context.WithCancel(context.Background())
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics) asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
return &Driver{ return &Driver{
l1State: l1State, l1State: l1State,
derivation: derivationPipeline, derivation: derivationPipeline,
engineController: engine, engineController: engine,
stateReq: make(chan chan struct{}), stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10), forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10), startSequencer: make(chan hashAndErrorChannel, 10),
stopSequencer: make(chan chan hashAndError, 10), stopSequencer: make(chan chan hashAndError, 10),
sequencerActive: make(chan chan bool, 10), sequencerActive: make(chan chan bool, 10),
sequencerNotifs: sequencerStateListener, sequencerNotifs: sequencerStateListener,
config: cfg, config: cfg,
syncCfg: syncCfg, syncCfg: syncCfg,
driverConfig: driverCfg, driverConfig: driverCfg,
driverCtx: driverCtx, driverCtx: driverCtx,
driverCancel: driverCancel, driverCancel: driverCancel,
log: log, log: log,
snapshotLog: snapshotLog, snapshotLog: snapshotLog,
l1: l1, l1: l1,
l2: l2, l2: l2,
sequencer: sequencer, sequencer: sequencer,
network: network, network: network,
metrics: metrics, metrics: metrics,
l1HeadSig: make(chan eth.L1BlockRef, 10), l1HeadSig: make(chan eth.L1BlockRef, 10),
l1SafeSig: make(chan eth.L1BlockRef, 10), l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayloadEnvelope, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayloadEnvelope, 10),
altSync: altSync, altSync: altSync,
asyncGossiper: asyncGossiper, asyncGossiper: asyncGossiper,
sequencerConductor: sequencerConductor,
} }
} }
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -61,10 +62,10 @@ func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef, ...@@ -61,10 +62,10 @@ func (m *MeteredEngine) StartPayload(ctx context.Context, parent eth.L2BlockRef,
return errType, err return errType, err
} }
func (m *MeteredEngine) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) { func (m *MeteredEngine) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) {
sealingStart := time.Now() sealingStart := time.Now()
// Actually execute the block and add it to the head of the chain. // Actually execute the block and add it to the head of the chain.
payload, errType, err := m.inner.ConfirmPayload(ctx, agossip) payload, errType, err := m.inner.ConfirmPayload(ctx, agossip, sequencerConductor)
if err != nil { if err != nil {
m.metrics.RecordSequencingError() m.metrics.RecordSequencingError()
return payload, errType, err return payload, errType, err
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -114,8 +115,8 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error { ...@@ -114,8 +115,8 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error {
// CompleteBuildingBlock takes the current block that is being built, and asks the engine to complete the building, seal the block, and persist it as canonical. // CompleteBuildingBlock takes the current block that is being built, and asks the engine to complete the building, seal the block, and persist it as canonical.
// Warning: the safe and finalized L2 blocks as viewed during the initiation of the block building are reused for completion of the block building. // Warning: the safe and finalized L2 blocks as viewed during the initiation of the block building are reused for completion of the block building.
// The Execution engine should not change the safe and finalized blocks between start and completion of block building. // The Execution engine should not change the safe and finalized blocks between start and completion of block building.
func (d *Sequencer) CompleteBuildingBlock(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error) { func (d *Sequencer) CompleteBuildingBlock(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (*eth.ExecutionPayloadEnvelope, error) {
envelope, errTyp, err := d.engine.ConfirmPayload(ctx, agossip) envelope, errTyp, err := d.engine.ConfirmPayload(ctx, agossip, sequencerConductor)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to complete building block: error (%d): %w", errTyp, err) return nil, fmt.Errorf("failed to complete building block: error (%d): %w", errTyp, err)
} }
...@@ -204,7 +205,7 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef { ...@@ -204,7 +205,7 @@ func (d *Sequencer) BuildingOnto() eth.L2BlockRef {
// If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish, // If the derivation pipeline does force a conflicting block, then an ongoing sequencer task might still finish,
// but the derivation can continue to reset until the chain is correct. // but the derivation can continue to reset until the chain is correct.
// If the engine is currently building safe blocks, then that building is not interrupted, and sequencing is delayed. // If the engine is currently building safe blocks, then that building is not interrupted, and sequencing is delayed.
func (d *Sequencer) RunNextSequencerAction(ctx context.Context, agossip async.AsyncGossiper) (*eth.ExecutionPayloadEnvelope, error) { func (d *Sequencer) RunNextSequencerAction(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (*eth.ExecutionPayloadEnvelope, error) {
// if the engine returns a non-empty payload, OR if the async gossiper already has a payload, we can CompleteBuildingBlock // if the engine returns a non-empty payload, OR if the async gossiper already has a payload, we can CompleteBuildingBlock
if onto, buildingID, safe := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) || agossip.Get() != nil { if onto, buildingID, safe := d.engine.BuildingPayload(); buildingID != (eth.PayloadID{}) || agossip.Get() != nil {
if safe { if safe {
...@@ -213,7 +214,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context, agossip async.As ...@@ -213,7 +214,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context, agossip async.As
d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime)) d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime))
return nil, nil return nil, nil
} }
envelope, err := d.CompleteBuildingBlock(ctx, agossip) envelope, err := d.CompleteBuildingBlock(ctx, agossip, sequencerConductor)
if err != nil { if err != nil {
if errors.Is(err, derive.ErrCritical) { if errors.Is(err, derive.ErrCritical) {
return nil, err // bubble up critical errors. return nil, err // bubble up critical errors.
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -74,7 +75,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block ...@@ -74,7 +75,7 @@ func (m *FakeEngineControl) StartPayload(ctx context.Context, parent eth.L2Block
return derive.BlockInsertOK, nil return derive.BlockInsertOK, nil
} }
func (m *FakeEngineControl) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) { func (m *FakeEngineControl) ConfirmPayload(ctx context.Context, agossip async.AsyncGossiper, sequencerConductor conductor.SequencerConductor) (out *eth.ExecutionPayloadEnvelope, errTyp derive.BlockInsertionErrType, err error) {
if m.err != nil { if m.err != nil {
return nil, m.errTyp, m.err return nil, m.errTyp, m.err
} }
...@@ -345,7 +346,7 @@ func TestSequencerChaosMonkey(t *testing.T) { ...@@ -345,7 +346,7 @@ func TestSequencerChaosMonkey(t *testing.T) {
default: default:
// no error // no error
} }
payload, err := seq.RunNextSequencerAction(context.Background(), async.NoOpGossiper{}) payload, err := seq.RunNextSequencerAction(context.Background(), async.NoOpGossiper{}, &conductor.NoOpConductor{})
// RunNextSequencerAction passes ErrReset & ErrCritical through. // RunNextSequencerAction passes ErrReset & ErrCritical through.
// Only suppress ErrReset, not ErrCritical // Only suppress ErrReset, not ErrCritical
if !errors.Is(err, derive.ErrReset) { if !errors.Is(err, derive.ErrReset) {
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -64,6 +65,8 @@ type Driver struct { ...@@ -64,6 +65,8 @@ type Driver struct {
// Rollup config: rollup chain configuration // Rollup config: rollup chain configuration
config *rollup.Config config *rollup.Config
sequencerConductor conductor.SequencerConductor
// Driver config: verifier and sequencer settings // Driver config: verifier and sequencer settings
driverConfig *Config driverConfig *Config
...@@ -138,6 +141,7 @@ func (s *Driver) Close() error { ...@@ -138,6 +141,7 @@ func (s *Driver) Close() error {
s.driverCancel() s.driverCancel()
s.wg.Wait() s.wg.Wait()
s.asyncGossiper.Stop() s.asyncGossiper.Stop()
s.sequencerConductor.Close()
return nil return nil
} }
...@@ -302,7 +306,7 @@ func (s *Driver) eventLoop() { ...@@ -302,7 +306,7 @@ func (s *Driver) eventLoop() {
case <-sequencerCh: case <-sequencerCh:
// the payload publishing is handled by the async gossiper, which will begin gossiping as soon as available // the payload publishing is handled by the async gossiper, which will begin gossiping as soon as available
// so, we don't need to receive the payload here // so, we don't need to receive the payload here
_, err := s.sequencer.RunNextSequencerAction(s.driverCtx, s.asyncGossiper) _, err := s.sequencer.RunNextSequencerAction(s.driverCtx, s.asyncGossiper, s.sequencerConductor)
if errors.Is(err, derive.ErrReset) { if errors.Is(err, derive.ErrReset) {
s.derivation.Reset() s.derivation.Reset()
} else if err != nil { } else if err != nil {
...@@ -466,6 +470,11 @@ func (s *Driver) StartSequencer(ctx context.Context, blockHash common.Hash) erro ...@@ -466,6 +470,11 @@ func (s *Driver) StartSequencer(ctx context.Context, blockHash common.Hash) erro
if !s.driverConfig.SequencerEnabled { if !s.driverConfig.SequencerEnabled {
return errors.New("sequencer is not enabled") return errors.New("sequencer is not enabled")
} }
if isLeader, err := s.sequencerConductor.Leader(ctx); err != nil {
return fmt.Errorf("sequencer leader check failed: %w", err)
} else if !isLeader {
return errors.New("sequencer is not the leader, aborting.")
}
h := hashAndErrorChannel{ h := hashAndErrorChannel{
hash: blockHash, hash: blockHash,
err: make(chan error, 1), err: make(chan error, 1),
......
...@@ -103,12 +103,21 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -103,12 +103,21 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
Sync: *syncConfig, Sync: *syncConfig,
RollupHalt: haltOption, RollupHalt: haltOption,
RethDBPath: ctx.String(flags.L1RethDBPath.Name), RethDBPath: ctx.String(flags.L1RethDBPath.Name),
ConductorEnabled: ctx.Bool(flags.ConductorEnabledFlag.Name),
ConductorRpc: ctx.String(flags.ConductorRpcFlag.Name),
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),
} }
if err := cfg.LoadPersisted(log); err != nil { if err := cfg.LoadPersisted(log); err != nil {
return nil, fmt.Errorf("failed to load driver config: %w", err) return nil, fmt.Errorf("failed to load driver config: %w", err)
} }
// conductor controls the sequencer state
if cfg.ConductorEnabled {
cfg.Driver.SequencerStopped = true
}
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return nil, err return nil, err
} }
......
...@@ -47,6 +47,15 @@ func DialRollupClientWithTimeout(ctx context.Context, timeout time.Duration, log ...@@ -47,6 +47,15 @@ func DialRollupClientWithTimeout(ctx context.Context, timeout time.Duration, log
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
} }
// DialRPCClientWithTimeout attempts to dial the RPC provider using the provided URL.
// If the dial doesn't complete within timeout seconds, this method will return an error.
func DialRPCClientWithTimeout(ctx context.Context, timeout time.Duration, log log.Logger, url string) (*rpc.Client, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return dialRPCClientWithBackoff(ctx, log, url)
}
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. // Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) { func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) {
bOff := retry.Fixed(defaultRetryTime) bOff := retry.Fixed(defaultRetryTime)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment