Commit fe519340 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6105 from ethereum-optimism/aj/persist-synchronizer-state

op-node: Persist synchroniser state
parents 4d7798f7 189ae1e1
...@@ -170,6 +170,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -170,6 +170,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
EnableAdmin: true, EnableAdmin: true,
}, },
L1EpochPollInterval: time.Second * 4, L1EpochPollInterval: time.Second * 4,
ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
}, },
"verifier": { "verifier": {
Driver: driver.Config{ Driver: driver.Config{
...@@ -178,6 +179,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -178,6 +179,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
SequencerEnabled: false, SequencerEnabled: false,
}, },
L1EpochPollInterval: time.Second * 4, L1EpochPollInterval: time.Second * 4,
ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
}, },
}, },
Loggers: map[string]log.Logger{ Loggers: map[string]log.Logger{
...@@ -546,6 +548,9 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -546,6 +548,9 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
nodeConfig := cfg.Nodes[name] nodeConfig := cfg.Nodes[name]
c := *nodeConfig // copy c := *nodeConfig // copy
c.Rollup = makeRollupConfig() c.Rollup = makeRollupConfig()
if err := c.LoadPersisted(cfg.Loggers[name]); err != nil {
return nil, err
}
if p, ok := p2pNodes[name]; ok { if p, ok := p2pNodes[name]; ok {
c.P2P = p c.P2P = p
......
package op_e2e
import (
"context"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
)
func TestStopStartSequencer(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
rollupNode := sys.RollupNodes["sequencer"]
nodeRPC, err := rpc.DialContext(context.Background(), rollupNode.HTTPEndpoint())
require.Nil(t, err, "Error dialing node")
blockBefore := latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter := latestBlock(t, l2Seq)
require.Greaterf(t, blockAfter, blockBefore, "Chain did not advance")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
blockHash := common.Hash{}
err = nodeRPC.CallContext(ctx, &blockHash, "admin_stopSequencer")
require.Nil(t, err, "Error stopping sequencer")
blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Equal(t, blockAfter, blockBefore, "Chain advanced after stopping sequencer")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = nodeRPC.CallContext(ctx, nil, "admin_startSequencer", blockHash)
require.Nil(t, err, "Error starting sequencer")
blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Greater(t, blockAfter, blockBefore, "Chain did not advance after starting sequencer")
}
func TestPersistSequencerStateWhenChanged(t *testing.T) {
InitParallel(t)
ctx := context.Background()
dir := t.TempDir()
stateFile := dir + "/state.json"
cfg := DefaultSystemConfig(t)
// We don't need a verifier - just the sequencer is enough
delete(cfg.Nodes, "verifier")
cfg.Nodes["sequencer"].ConfigPersistence = node.NewConfigPersistence(stateFile)
sys, err := cfg.Start()
require.NoError(t, err)
defer sys.Close()
assertPersistedSequencerState(t, stateFile, node.StateStarted)
rollupRPCClient, err := rpc.DialContext(ctx, sys.RollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err)
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient))
err = rollupClient.StartSequencer(ctx, common.Hash{0xaa})
require.ErrorContains(t, err, "sequencer already running")
head, err := rollupClient.StopSequencer(ctx)
require.NoError(t, err)
require.NotEqual(t, common.Hash{}, head)
assertPersistedSequencerState(t, stateFile, node.StateStopped)
}
func TestLoadSequencerStateOnStarted_Stopped(t *testing.T) {
InitParallel(t)
ctx := context.Background()
dir := t.TempDir()
stateFile := dir + "/state.json"
// Prepare the persisted state file with sequencer stopped
configReader := node.NewConfigPersistence(stateFile)
require.NoError(t, configReader.SequencerStopped())
cfg := DefaultSystemConfig(t)
// We don't need a verifier - just the sequencer is enough
delete(cfg.Nodes, "verifier")
seqCfg := cfg.Nodes["sequencer"]
seqCfg.ConfigPersistence = node.NewConfigPersistence(stateFile)
sys, err := cfg.Start()
require.NoError(t, err)
defer sys.Close()
rollupRPCClient, err := rpc.DialContext(ctx, sys.RollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err)
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient))
// Still persisted as stopped after startup
assertPersistedSequencerState(t, stateFile, node.StateStopped)
// Sequencer is really stopped
_, err = rollupClient.StopSequencer(ctx)
require.ErrorContains(t, err, "sequencer not running")
assertPersistedSequencerState(t, stateFile, node.StateStopped)
}
func TestLoadSequencerStateOnStarted_Started(t *testing.T) {
InitParallel(t)
ctx := context.Background()
dir := t.TempDir()
stateFile := dir + "/state.json"
// Prepare the persisted state file with sequencer stopped
configReader := node.NewConfigPersistence(stateFile)
require.NoError(t, configReader.SequencerStarted())
cfg := DefaultSystemConfig(t)
// We don't need a verifier - just the sequencer is enough
delete(cfg.Nodes, "verifier")
seqCfg := cfg.Nodes["sequencer"]
seqCfg.Driver.SequencerStopped = true
seqCfg.ConfigPersistence = node.NewConfigPersistence(stateFile)
sys, err := cfg.Start()
require.NoError(t, err)
defer sys.Close()
rollupRPCClient, err := rpc.DialContext(ctx, sys.RollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err)
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient))
// Still persisted as stopped after startup
assertPersistedSequencerState(t, stateFile, node.StateStarted)
// Sequencer is really stopped
err = rollupClient.StartSequencer(ctx, common.Hash{})
require.ErrorContains(t, err, "sequencer already running")
assertPersistedSequencerState(t, stateFile, node.StateStarted)
}
func assertPersistedSequencerState(t *testing.T, stateFile string, expected node.RunningState) {
configReader := node.NewConfigPersistence(stateFile)
state, err := configReader.SequencerState()
require.NoError(t, err)
require.Equalf(t, expected, state, "expected sequencer state %v but was %v", expected, state)
}
...@@ -1245,47 +1245,6 @@ func TestFees(t *testing.T) { ...@@ -1245,47 +1245,6 @@ func TestFees(t *testing.T) {
require.Equal(t, balanceDiff, totalFee, "balances should add up") require.Equal(t, balanceDiff, totalFee, "balances should add up")
} }
func TestStopStartSequencer(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
rollupNode := sys.RollupNodes["sequencer"]
nodeRPC, err := rpc.DialContext(context.Background(), rollupNode.HTTPEndpoint())
require.Nil(t, err, "Error dialing node")
blockBefore := latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter := latestBlock(t, l2Seq)
require.Greaterf(t, blockAfter, blockBefore, "Chain did not advance")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
blockHash := common.Hash{}
err = nodeRPC.CallContext(ctx, &blockHash, "admin_stopSequencer")
require.Nil(t, err, "Error stopping sequencer")
blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Equal(t, blockAfter, blockBefore, "Chain advanced after stopping sequencer")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = nodeRPC.CallContext(ctx, nil, "admin_startSequencer", blockHash)
require.Nil(t, err, "Error starting sequencer")
blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Greater(t, blockAfter, blockBefore, "Chain did not advance after starting sequencer")
}
func TestStopStartBatcher(t *testing.T) { func TestStopStartBatcher(t *testing.T) {
InitParallel(t) InitParallel(t)
......
...@@ -54,13 +54,18 @@ var ( ...@@ -54,13 +54,18 @@ var (
Usage: "RPC listening port", Usage: "RPC listening port",
EnvVars: prefixEnvVars("RPC_PORT"), EnvVars: prefixEnvVars("RPC_PORT"),
} }
/* Optional Flags */
RPCEnableAdmin = &cli.BoolFlag{ RPCEnableAdmin = &cli.BoolFlag{
Name: "rpc.enable-admin", Name: "rpc.enable-admin",
Usage: "Enable the admin API (experimental)", Usage: "Enable the admin API (experimental)",
EnvVars: prefixEnvVars("RPC_ENABLE_ADMIN"), EnvVars: prefixEnvVars("RPC_ENABLE_ADMIN"),
} }
RPCAdminPersistence = &cli.StringFlag{
/* Optional Flags */ Name: "rpc.admin-state",
Usage: "File path used to persist state changes made via the admin API so they persist across restarts. Disabled if not set.",
EnvVars: prefixEnvVars("RPC_ADMIN_STATE"),
}
L1TrustRPC = &cli.BoolFlag{ L1TrustRPC = &cli.BoolFlag{
Name: "l1.trustrpc", Name: "l1.trustrpc",
Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data", Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data",
...@@ -233,6 +238,7 @@ var optionalFlags = []cli.Flag{ ...@@ -233,6 +238,7 @@ var optionalFlags = []cli.Flag{
SequencerL1Confs, SequencerL1Confs,
L1EpochPollIntervalFlag, L1EpochPollIntervalFlag,
RPCEnableAdmin, RPCEnableAdmin,
RPCAdminPersistence,
MetricsEnabledFlag, MetricsEnabledFlag,
MetricsAddrFlag, MetricsAddrFlag,
MetricsPortFlag, MetricsPortFlag,
......
...@@ -6,10 +6,12 @@ import ( ...@@ -6,10 +6,12 @@ import (
"math" "math"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum/go-ethereum/log"
) )
type Config struct { type Config struct {
...@@ -36,6 +38,8 @@ type Config struct { ...@@ -36,6 +38,8 @@ type Config struct {
// Used to poll the L1 for new finalized or safe blocks // Used to poll the L1 for new finalized or safe blocks
L1EpochPollInterval time.Duration L1EpochPollInterval time.Duration
ConfigPersistence ConfigPersistence
// Optional // Optional
Tracer Tracer Tracer Tracer
Heartbeat HeartbeatConfig Heartbeat HeartbeatConfig
...@@ -75,6 +79,24 @@ type HeartbeatConfig struct { ...@@ -75,6 +79,24 @@ type HeartbeatConfig struct {
URL string URL string
} }
func (cfg *Config) LoadPersisted(log log.Logger) error {
if !cfg.Driver.SequencerEnabled {
return nil
}
if state, err := cfg.ConfigPersistence.SequencerState(); err != nil {
return err
} else if state != StateUnset {
stopped := state == StateStopped
if stopped != cfg.Driver.SequencerStopped {
log.Warn(fmt.Sprintf("Overriding %v with persisted state", flags.SequencerStoppedFlag.Name), "stopped", stopped)
}
cfg.Driver.SequencerStopped = stopped
} else {
log.Info("No persisted sequencer state loaded")
}
return nil
}
// Check verifies that the given configuration makes sense // Check verifies that the given configuration makes sense
func (cfg *Config) Check() error { func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil { if err := cfg.L2.Check(); err != nil {
......
package node
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
)
type RunningState int
const (
StateUnset RunningState = iota
StateStarted
StateStopped
)
type persistedState struct {
SequencerStarted *bool `json:"sequencerStarted,omitempty"`
}
type ConfigPersistence interface {
SequencerStarted() error
SequencerStopped() error
SequencerState() (RunningState, error)
}
var _ ConfigPersistence = (*ActiveConfigPersistence)(nil)
var _ ConfigPersistence = DisabledConfigPersistence{}
type ActiveConfigPersistence struct {
lock sync.Mutex
file string
}
func NewConfigPersistence(file string) *ActiveConfigPersistence {
return &ActiveConfigPersistence{file: file}
}
func (p *ActiveConfigPersistence) SequencerStarted() error {
return p.persist(true)
}
func (p *ActiveConfigPersistence) SequencerStopped() error {
return p.persist(false)
}
// persist writes the new config state to the file as safely as possible.
// It uses sync to ensure the data is actually persisted to disk and initially writes to a temp file
// before renaming it into place. On UNIX systems this rename is typically atomic, ensuring the
// actual file isn't corrupted if IO errors occur during writing.
func (p *ActiveConfigPersistence) persist(sequencerStarted bool) error {
p.lock.Lock()
defer p.lock.Unlock()
data, err := json.Marshal(persistedState{SequencerStarted: &sequencerStarted})
if err != nil {
return fmt.Errorf("marshall new config: %w", err)
}
dir := filepath.Dir(p.file)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("create config dir (%v): %w", p.file, err)
}
// Write the new content to a temp file first, then rename into place
// Avoids corrupting the content if the disk is full or there are IO errors
tmpFile := p.file + ".tmp"
file, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("open file (%v) for writing: %w", tmpFile, err)
}
defer file.Close() // Ensure file is closed even if write or sync fails
if _, err = file.Write(data); err != nil {
return fmt.Errorf("write new config to temp file (%v): %w", tmpFile, err)
}
if err := file.Sync(); err != nil {
return fmt.Errorf("sync new config temp file (%v): %w", tmpFile, err)
}
if err := file.Close(); err != nil {
return fmt.Errorf("close new config temp file (%v): %w", tmpFile, err)
}
// Rename to replace the previous file
if err := os.Rename(tmpFile, p.file); err != nil {
return fmt.Errorf("rename temp config file to final destination: %w", err)
}
return nil
}
func (p *ActiveConfigPersistence) SequencerState() (RunningState, error) {
config, err := p.read()
if err != nil {
return StateUnset, err
}
if config.SequencerStarted == nil {
return StateUnset, nil
} else if *config.SequencerStarted {
return StateStarted, nil
} else {
return StateStopped, nil
}
}
func (p *ActiveConfigPersistence) read() (persistedState, error) {
p.lock.Lock()
defer p.lock.Unlock()
data, err := os.ReadFile(p.file)
if errors.Is(err, os.ErrNotExist) {
// persistedState.SequencerStarted == nil: SequencerState() will return StateUnset if no state is found
return persistedState{}, nil
} else if err != nil {
return persistedState{}, fmt.Errorf("read config file (%v): %w", p.file, err)
}
var config persistedState
dec := json.NewDecoder(bytes.NewReader(data))
dec.DisallowUnknownFields()
if err = dec.Decode(&config); err != nil {
return persistedState{}, fmt.Errorf("invalid config file (%v): %w", p.file, err)
}
if config.SequencerStarted == nil {
return persistedState{}, fmt.Errorf("missing sequencerStarted value in config file (%v)", p.file)
}
return config, nil
}
// DisabledConfigPersistence provides an implementation of config persistence
// that does not persist anything and reports unset for all values
type DisabledConfigPersistence struct {
}
func (d DisabledConfigPersistence) SequencerState() (RunningState, error) {
return StateUnset, nil
}
func (d DisabledConfigPersistence) SequencerStarted() error {
return nil
}
func (d DisabledConfigPersistence) SequencerStopped() error {
return nil
}
package node
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestActive(t *testing.T) {
create := func() *ActiveConfigPersistence {
dir := t.TempDir()
config := NewConfigPersistence(dir + "/state")
return config
}
t.Run("SequencerStateUnsetWhenFileDoesNotExist", func(t *testing.T) {
config := create()
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateUnset, state)
})
t.Run("PersistSequencerStarted", func(t *testing.T) {
config1 := create()
require.NoError(t, config1.SequencerStarted())
state, err := config1.SequencerState()
require.NoError(t, err)
require.Equal(t, StateStarted, state)
config2 := NewConfigPersistence(config1.file)
state, err = config2.SequencerState()
require.NoError(t, err)
require.Equal(t, StateStarted, state)
})
t.Run("PersistSequencerStopped", func(t *testing.T) {
config1 := create()
require.NoError(t, config1.SequencerStopped())
state, err := config1.SequencerState()
require.NoError(t, err)
require.Equal(t, StateStopped, state)
config2 := NewConfigPersistence(config1.file)
state, err = config2.SequencerState()
require.NoError(t, err)
require.Equal(t, StateStopped, state)
})
t.Run("PersistMultipleChanges", func(t *testing.T) {
config := create()
require.NoError(t, config.SequencerStarted())
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateStarted, state)
require.NoError(t, config.SequencerStopped())
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateStopped, state)
})
t.Run("CreateParentDirs", func(t *testing.T) {
dir := t.TempDir()
config := NewConfigPersistence(dir + "/some/dir/state")
// Should be unset before file exists
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateUnset, state)
require.NoFileExists(t, config.file)
// Should create directories when updating
require.NoError(t, config.SequencerStarted())
require.FileExists(t, config.file)
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateStarted, state)
})
}
func TestDisabledConfigPersistence_AlwaysUnset(t *testing.T) {
config := DisabledConfigPersistence{}
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateUnset, state)
require.NoError(t, config.SequencerStarted())
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateUnset, state)
require.NoError(t, config.SequencerStopped())
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, StateUnset, state)
}
...@@ -199,7 +199,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -199,7 +199,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics) n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence)
return nil return nil
} }
......
...@@ -102,8 +102,13 @@ type AltSync interface { ...@@ -102,8 +102,13 @@ type AltSync interface {
RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error
} }
type SequencerStateListener interface {
SequencerStarted() error
SequencerStopped() error
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener) *Driver {
l1 = NewMeteredL1Fetcher(l1, metrics) l1 = NewMeteredL1Fetcher(l1, metrics)
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
...@@ -122,6 +127,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, al ...@@ -122,6 +127,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, al
forceReset: make(chan chan struct{}, 10), forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10), startSequencer: make(chan hashAndErrorChannel, 10),
stopSequencer: make(chan chan hashAndError, 10), stopSequencer: make(chan chan hashAndError, 10),
sequencerNotifs: sequencerStateListener,
config: cfg, config: cfg,
driverConfig: driverCfg, driverConfig: driverCfg,
done: make(chan struct{}), done: make(chan struct{}),
......
...@@ -47,6 +47,9 @@ type Driver struct { ...@@ -47,6 +47,9 @@ type Driver struct {
// It tells the caller that the sequencer stopped by returning the latest sequenced L2 block hash. // It tells the caller that the sequencer stopped by returning the latest sequenced L2 block hash.
stopSequencer chan chan hashAndError stopSequencer chan chan hashAndError
// sequencerNotifs is notified when the sequencer is started or stopped
sequencerNotifs SequencerStateListener
// Rollup config: rollup chain configuration // Rollup config: rollup chain configuration
config *rollup.Config config *rollup.Config
...@@ -88,6 +91,21 @@ type Driver struct { ...@@ -88,6 +91,21 @@ type Driver struct {
func (s *Driver) Start() error { func (s *Driver) Start() error {
s.derivation.Reset() s.derivation.Reset()
log.Info("Starting driver", "sequencerEnabled", s.driverConfig.SequencerEnabled, "sequencerStopped", s.driverConfig.SequencerStopped)
if s.driverConfig.SequencerEnabled {
// Notify the initial sequencer state
// This ensures persistence can write the state correctly and that the state file exists
var err error
if s.driverConfig.SequencerStopped {
err = s.sequencerNotifs.SequencerStopped()
} else {
err = s.sequencerNotifs.SequencerStarted()
}
if err != nil {
return fmt.Errorf("persist initial sequencer state: %w", err)
}
}
s.wg.Add(1) s.wg.Add(1)
go s.eventLoop() go s.eventLoop()
...@@ -334,6 +352,10 @@ func (s *Driver) eventLoop() { ...@@ -334,6 +352,10 @@ func (s *Driver) eventLoop() {
} else if !bytes.Equal(unsafeHead[:], resp.hash[:]) { } else if !bytes.Equal(unsafeHead[:], resp.hash[:]) {
resp.err <- fmt.Errorf("block hash does not match: head %s, received %s", unsafeHead.String(), resp.hash.String()) resp.err <- fmt.Errorf("block hash does not match: head %s, received %s", unsafeHead.String(), resp.hash.String())
} else { } else {
if err := s.sequencerNotifs.SequencerStarted(); err != nil {
resp.err <- fmt.Errorf("sequencer start notification: %w", err)
continue
}
s.log.Info("Sequencer has been started") s.log.Info("Sequencer has been started")
s.driverConfig.SequencerStopped = false s.driverConfig.SequencerStopped = false
close(resp.err) close(resp.err)
...@@ -343,6 +365,10 @@ func (s *Driver) eventLoop() { ...@@ -343,6 +365,10 @@ func (s *Driver) eventLoop() {
if s.driverConfig.SequencerStopped { if s.driverConfig.SequencerStopped {
respCh <- hashAndError{err: errors.New("sequencer not running")} respCh <- hashAndError{err: errors.New("sequencer not running")}
} else { } else {
if err := s.sequencerNotifs.SequencerStopped(); err != nil {
respCh <- hashAndError{err: fmt.Errorf("sequencer start notification: %w", err)}
continue
}
s.log.Warn("Sequencer has been stopped") s.log.Warn("Sequencer has been stopped")
s.driverConfig.SequencerStopped = true s.driverConfig.SequencerStopped = true
respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash} respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash}
......
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -36,6 +35,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -36,6 +35,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, err return nil, err
} }
configPersistence := NewConfigPersistence(ctx)
driverConfig := NewDriverConfig(ctx) driverConfig := NewDriverConfig(ctx)
p2pSignerSetup, err := p2pcli.LoadSignerSetup(ctx) p2pSignerSetup, err := p2pcli.LoadSignerSetup(ctx)
...@@ -86,7 +87,13 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -86,7 +87,13 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
Moniker: ctx.String(flags.HeartbeatMonikerFlag.Name), Moniker: ctx.String(flags.HeartbeatMonikerFlag.Name),
URL: ctx.String(flags.HeartbeatURLFlag.Name), URL: ctx.String(flags.HeartbeatURLFlag.Name),
}, },
ConfigPersistence: configPersistence,
}
if err := cfg.LoadPersisted(log); err != nil {
return nil, fmt.Errorf("failed to load driver config: %w", err)
} }
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return nil, err return nil, err
} }
...@@ -143,6 +150,14 @@ func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig { ...@@ -143,6 +150,14 @@ func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
} }
} }
func NewConfigPersistence(ctx *cli.Context) node.ConfigPersistence {
stateFile := ctx.String(flags.RPCAdminPersistence.Name)
if stateFile == "" {
return node.DisabledConfigPersistence{}
}
return node.NewConfigPersistence(stateFile)
}
func NewDriverConfig(ctx *cli.Context) *driver.Config { func NewDriverConfig(ctx *cli.Context) *driver.Config {
return &driver.Config{ return &driver.Config{
VerifierConfDepth: ctx.Uint64(flags.VerifierL1Confs.Name), VerifierConfDepth: ctx.Uint64(flags.VerifierL1Confs.Name),
......
...@@ -3,6 +3,7 @@ package sources ...@@ -3,6 +3,7 @@ package sources
import ( import (
"context" "context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
...@@ -41,3 +42,13 @@ func (r *RollupClient) Version(ctx context.Context) (string, error) { ...@@ -41,3 +42,13 @@ func (r *RollupClient) Version(ctx context.Context) (string, error) {
err := r.rpc.CallContext(ctx, &output, "optimism_version") err := r.rpc.CallContext(ctx, &output, "optimism_version")
return output, err return output, err
} }
func (r *RollupClient) StartSequencer(ctx context.Context, unsafeHead common.Hash) error {
return r.rpc.CallContext(ctx, nil, "admin_startSequencer", unsafeHead)
}
func (r *RollupClient) StopSequencer(ctx context.Context) (common.Hash, error) {
var result common.Hash
err := r.rpc.CallContext(ctx, &result, "admin_stopSequencer")
return result, err
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment