Commit 75352aee authored by Mark Tyneway's avatar Mark Tyneway Committed by GitHub

Merge pull request #4719 from mdehoog/sequencer-stop-start

Add support for stopping / starting sequencer from admin API
parents e7b66fd7 600b1f1c
......@@ -113,6 +113,14 @@ func (s *l2VerifierBackend) ResetDerivationPipeline(ctx context.Context) error {
return nil
}
func (s *l2VerifierBackend) StartSequencer(ctx context.Context, blockHash common.Hash) error {
return nil
}
func (s *l2VerifierBackend) StopSequencer(ctx context.Context) (common.Hash, error) {
return common.Hash{}, errors.New("stopping the L2Verifier sequencer is not supported")
}
func (s *L2Verifier) L2Finalized() eth.L2BlockRef {
return s.derivation.Finalized()
}
......
......@@ -1129,6 +1129,58 @@ func TestFees(t *testing.T) {
require.Equal(t, balanceDiff, totalFee, "balances should add up")
}
func TestStopStartSequencer(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
rollupNode := sys.RollupNodes["sequencer"]
nodeRPC, err := rpc.DialContext(context.Background(), rollupNode.HTTPEndpoint())
require.Nil(t, err, "Error dialing node")
blockBefore := latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter := latestBlock(t, l2Seq)
require.Greaterf(t, blockAfter, blockBefore, "Chain did not advance")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
blockHash := common.Hash{}
err = nodeRPC.CallContext(ctx, &blockHash, "admin_stopSequencer")
require.Nil(t, err, "Error stopping sequencer")
blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Equal(t, blockAfter, blockBefore, "Chain advanced after stopping sequencer")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = nodeRPC.CallContext(ctx, nil, "admin_startSequencer", blockHash)
require.Nil(t, err, "Error starting sequencer")
blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Greater(t, blockAfter, blockBefore, "Chain did not advance after starting sequencer")
}
func safeAddBig(a *big.Int, b *big.Int) *big.Int {
return new(big.Int).Add(a, b)
}
func latestBlock(t *testing.T, client *ethclient.Client) uint64 {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
blockAfter, err := client.BlockNumber(ctx)
require.Nil(t, err, "Error getting latest block")
return blockAfter
}
......@@ -94,6 +94,11 @@ var (
Usage: "Enable sequencing of new L2 blocks. A separate batch submitter has to be deployed to publish the data for verifiers.",
EnvVar: prefixEnvVar("SEQUENCER_ENABLED"),
}
SequencerStoppedFlag = cli.BoolFlag{
Name: "sequencer.stopped",
Usage: "Initialize the sequencer in a stopped state. The sequencer can be started using the admin_startSequencer RPC",
EnvVar: prefixEnvVar("SEQUENCER_STOPPED"),
}
SequencerL1Confs = cli.Uint64Flag{
Name: "sequencer.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head as a sequencer for picking an L1 origin.",
......@@ -197,6 +202,7 @@ var optionalFlags = append([]cli.Flag{
L2EngineJWTSecret,
VerifierL1Confs,
SequencerEnabledFlag,
SequencerStoppedFlag,
SequencerL1Confs,
L1EpochPollIntervalFlag,
LogLevelFlag,
......
......@@ -26,6 +26,8 @@ type driverClient interface {
SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2BlockRef, *eth.SyncStatus, error)
ResetDerivationPipeline(context.Context) error
StartSequencer(ctx context.Context, blockHash common.Hash) error
StopSequencer(context.Context) (common.Hash, error)
}
type rpcMetrics interface {
......@@ -51,6 +53,18 @@ func (n *adminAPI) ResetDerivationPipeline(ctx context.Context) error {
return n.dr.ResetDerivationPipeline(ctx)
}
func (n *adminAPI) StartSequencer(ctx context.Context, blockHash common.Hash) error {
recordDur := n.m.RecordRPCServerRequest("admin_startSequencer")
defer recordDur()
return n.dr.StartSequencer(ctx, blockHash)
}
func (n *adminAPI) StopSequencer(ctx context.Context) (common.Hash, error) {
recordDur := n.m.RecordRPCServerRequest("admin_stopSequencer")
defer recordDur()
return n.dr.StopSequencer(ctx)
}
type nodeAPI struct {
config *rollup.Config
client l2EthClient
......
......@@ -218,3 +218,11 @@ func (c *mockDriverClient) SyncStatus(ctx context.Context) (*eth.SyncStatus, err
func (c *mockDriverClient) ResetDerivationPipeline(ctx context.Context) error {
return c.Mock.MethodCalled("ResetDerivationPipeline").Get(0).(error)
}
func (c *mockDriverClient) StartSequencer(ctx context.Context, blockHash common.Hash) error {
return c.Mock.MethodCalled("StartSequencer").Get(0).(error)
}
func (c *mockDriverClient) StopSequencer(ctx context.Context) (common.Hash, error) {
return c.Mock.MethodCalled("StopSequencer").Get(0).(common.Hash), nil
}
......@@ -13,4 +13,7 @@ type Config struct {
// SequencerEnabled is true when the driver should sequence new blocks.
SequencerEnabled bool `json:"sequencer_enabled"`
// SequencerStopped is false when the driver should sequence new blocks.
SequencerStopped bool `json:"sequencer_stopped"`
}
......@@ -97,6 +97,8 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne
idleDerivation: false,
stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10),
stopSequencer: make(chan chan hashAndError, 10),
config: cfg,
driverConfig: driverCfg,
done: make(chan struct{}),
......
package driver
import (
"bytes"
"context"
"encoding/json"
"errors"
......@@ -9,6 +10,7 @@ import (
gosync "sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth"
......@@ -42,6 +44,14 @@ type Driver struct {
// It tells the caller that the reset occurred by closing the passed in channel.
forceReset chan chan struct{}
// Upon receiving a hash in this channel, the sequencer is started at the given hash.
// It tells the caller that the sequencer started by closing the passed in channel (or returning an error).
startSequencer chan hashAndErrorChannel
// Upon receiving a channel in this channel, the sequencer is stopped.
// It tells the caller that the sequencer stopped by returning the latest sequenced L2 block hash.
stopSequencer chan chan hashAndError
// Rollup config: rollup chain configuration
config *rollup.Config
......@@ -274,7 +284,7 @@ func (s *Driver) eventLoop() {
for {
// If we are sequencing, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors.
if s.driverConfig.SequencerEnabled {
if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped {
// update sequencer time if the head changed
if sequencingPlannedOnto != s.derivation.UnsafeL2Head().ID() {
planSequencerAction()
......@@ -367,6 +377,26 @@ func (s *Driver) eventLoop() {
s.derivation.Reset()
s.metrics.RecordPipelineReset()
close(respCh)
case resp := <-s.startSequencer:
unsafeHead := s.derivation.UnsafeL2Head().Hash
if !s.driverConfig.SequencerStopped {
resp.err <- errors.New("sequencer already running")
} else if !bytes.Equal(unsafeHead[:], resp.hash[:]) {
resp.err <- fmt.Errorf("block hash does not match: head %s, received %s", unsafeHead.String(), resp.hash.String())
} else {
s.log.Info("Sequencer has been started")
s.driverConfig.SequencerStopped = false
sequencingPlannedOnto = eth.BlockID{}
close(resp.err)
}
case respCh := <-s.stopSequencer:
if s.driverConfig.SequencerStopped {
respCh <- hashAndError{err: errors.New("sequencer not running")}
} else {
s.log.Warn("Sequencer has been stopped")
s.driverConfig.SequencerStopped = true
respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash}
}
case <-s.done:
return
}
......@@ -391,6 +421,45 @@ func (s *Driver) ResetDerivationPipeline(ctx context.Context) error {
}
}
func (s *Driver) StartSequencer(ctx context.Context, blockHash common.Hash) error {
if !s.driverConfig.SequencerEnabled {
return errors.New("sequencer is not enabled")
}
h := hashAndErrorChannel{
hash: blockHash,
err: make(chan error, 1),
}
select {
case <-ctx.Done():
return ctx.Err()
case s.startSequencer <- h:
select {
case <-ctx.Done():
return ctx.Err()
case e := <-h.err:
return e
}
}
}
func (s *Driver) StopSequencer(ctx context.Context) (common.Hash, error) {
if !s.driverConfig.SequencerEnabled {
return common.Hash{}, errors.New("sequencer is not enabled")
}
respCh := make(chan hashAndError, 1)
select {
case <-ctx.Done():
return common.Hash{}, ctx.Err()
case s.stopSequencer <- respCh:
select {
case <-ctx.Done():
return common.Hash{}, ctx.Err()
case he := <-respCh:
return he.hash, he.err
}
}
}
// syncStatus returns the current sync status, and should only be called synchronously with
// the driver event loop to avoid retrieval of an inconsistent status.
func (s *Driver) syncStatus() *eth.SyncStatus {
......@@ -455,3 +524,13 @@ func (s *Driver) snapshot(event string) {
"l2Safe", deferJSONString{s.derivation.SafeL2Head()},
"l2FinalizedHead", deferJSONString{s.derivation.Finalized()})
}
type hashAndError struct {
hash common.Hash
err error
}
type hashAndErrorChannel struct {
hash common.Hash
err chan error
}
......@@ -138,6 +138,7 @@ func NewDriverConfig(ctx *cli.Context) (*driver.Config, error) {
VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name),
SequencerConfDepth: ctx.GlobalUint64(flags.SequencerL1Confs.Name),
SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name),
SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name),
}, nil
}
......
......@@ -65,6 +65,7 @@ services:
--metrics.addr=0.0.0.0
--metrics.port=7300
--pprof.enabled
--rpc.enable-admin
ports:
- "7545:8545"
- "9003:9003"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment