Commit f1cd6c8b authored by Michael de Hoog's avatar Michael de Hoog

Add support for stopping / starting the batcher

parent 0f1692af
...@@ -3,6 +3,7 @@ package batcher ...@@ -3,6 +3,7 @@ package batcher
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
...@@ -15,6 +16,7 @@ import ( ...@@ -15,6 +16,7 @@ import (
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
gethrpc "github.com/ethereum/go-ethereum/rpc"
) )
const ( const (
...@@ -44,9 +46,11 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -44,9 +46,11 @@ func Main(version string, cliCtx *cli.Context) error {
l.Info("Starting Batch Submitter") l.Info("Starting Batch Submitter")
if err := batchSubmitter.Start(); err != nil { if !cfg.Stopped {
l.Error("Unable to start Batch Submitter", "error", err) if err := batchSubmitter.Start(); err != nil {
return err l.Error("Unable to start Batch Submitter", "error", err)
return err
}
} }
defer batchSubmitter.Stop() defer batchSubmitter.Stop()
...@@ -81,6 +85,12 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -81,6 +85,12 @@ func Main(version string, cliCtx *cli.Context) error {
rpcCfg.ListenPort, rpcCfg.ListenPort,
version, version,
) )
if rpcCfg.EnableAdmin {
server.AddAPI(gethrpc.API{
Namespace: "admin",
Service: rpc.NewAdminAPI(batchSubmitter),
})
}
if err := server.Start(); err != nil { if err := server.Start(); err != nil {
cancel() cancel()
return fmt.Errorf("error starting RPC server: %w", err) return fmt.Errorf("error starting RPC server: %w", err)
...@@ -97,5 +107,4 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -97,5 +107,4 @@ func Main(version string, cliCtx *cli.Context) error {
cancel() cancel()
_ = server.Stop() _ = server.Stop()
return nil return nil
} }
package batcher package batcher
import ( import (
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -14,7 +15,6 @@ import ( ...@@ -14,7 +15,6 @@ import (
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
opsigner "github.com/ethereum-optimism/optimism/op-signer/client" opsigner "github.com/ethereum-optimism/optimism/op-signer/client"
) )
...@@ -81,7 +81,7 @@ type CLIConfig struct { ...@@ -81,7 +81,7 @@ type CLIConfig struct {
// PrivateKey is the private key used to submit sequencer transactions. // PrivateKey is the private key used to submit sequencer transactions.
PrivateKey string PrivateKey string
RPCConfig oprpc.CLIConfig RPCConfig rpc.CLIConfig
/* Optional Params */ /* Optional Params */
...@@ -98,6 +98,8 @@ type CLIConfig struct { ...@@ -98,6 +98,8 @@ type CLIConfig struct {
// compression algorithm. // compression algorithm.
ApproxComprRatio float64 ApproxComprRatio float64
Stopped bool
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
...@@ -145,10 +147,11 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -145,10 +147,11 @@ func NewConfig(ctx *cli.Context) CLIConfig {
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name), Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name), SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name), PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
......
...@@ -29,6 +29,9 @@ type BatchSubmitter struct { ...@@ -29,6 +29,9 @@ type BatchSubmitter struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
mutex sync.Mutex
running bool
// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head. // lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID lastStoredBlock eth.BlockID
...@@ -95,17 +98,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte ...@@ -95,17 +98,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
}, },
} }
return NewBatchSubmitter(batcherCfg, l) return NewBatchSubmitter(ctx, batcherCfg, l)
} }
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources // NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation. // that will be needed during operation.
func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) { func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSubmitter, error) {
ctx, cancel := context.WithCancel(context.Background())
balance, err := cfg.L1Client.BalanceAt(ctx, cfg.From, nil) balance, err := cfg.L1Client.BalanceAt(ctx, cfg.From, nil)
if err != nil { if err != nil {
cancel()
return nil, err return nil, err
} }
...@@ -117,26 +117,45 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) { ...@@ -117,26 +117,45 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
txMgr: NewTransactionManager(l, txMgr: NewTransactionManager(l,
cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID, cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID,
cfg.From, cfg.L1Client), cfg.From, cfg.L1Client),
done: make(chan struct{}), state: NewChannelManager(l, cfg.Channel),
// TODO: this context only exists because the event loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
cancel: cancel,
state: NewChannelManager(l, cfg.Channel),
}, nil }, nil
} }
func (l *BatchSubmitter) Start() error { func (l *BatchSubmitter) Start() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.running {
return errors.New("batcher is already running")
}
l.running = true
l.done = make(chan struct{})
// TODO: this context only exists because the event loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
l.ctx, l.cancel = context.WithCancel(context.Background())
l.state.Clear()
l.lastStoredBlock = eth.BlockID{}
l.wg.Add(1) l.wg.Add(1)
go l.loop() go l.loop()
return nil return nil
} }
func (l *BatchSubmitter) Stop() { func (l *BatchSubmitter) Stop() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.running {
return errors.New("batcher is not running")
}
l.running = false
l.cancel() l.cancel()
close(l.done) close(l.done)
l.wg.Wait() l.wg.Wait()
return nil
} }
// loadBlocksIntoState loads all blocks since the previous stored block // loadBlocksIntoState loads all blocks since the previous stored block
...@@ -199,7 +218,7 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. ...@@ -199,7 +218,7 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
} }
// Check last stored to see if it needs to be set on startup OR set if is lagged behind. // Check last stored to see if it needs to be set on startup OR set if is lagged behind.
// It lagging implies that the op-node processed some batches that where submitted prior to the current instance of the batcher being alive. // It lagging implies that the op-node processed some batches that were submitted prior to the current instance of the batcher being alive.
if l.lastStoredBlock == (eth.BlockID{}) { if l.lastStoredBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2) l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID() l.lastStoredBlock = syncStatus.SafeL2.ID()
...@@ -263,7 +282,7 @@ func (l *BatchSubmitter) loop() { ...@@ -263,7 +282,7 @@ func (l *BatchSubmitter) loop() {
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending // hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
// from the channel manager rather than sending the channel in a loop. This stalls b/c if the // from the channel manager rather than sending the channel in a loop. This stalls b/c if the
// context is cancelled while sending, it will never fuilly clearing the pending txns. // context is cancelled while sending, it will never fully clear the pending txns.
select { select {
case <-l.ctx.Done(): case <-l.ctx.Done():
break blockLoop break blockLoop
......
package flags package flags
import ( import (
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/urfave/cli" "github.com/urfave/cli"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
...@@ -98,6 +99,11 @@ var ( ...@@ -98,6 +99,11 @@ var (
Value: 1.0, Value: 1.0,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "APPROX_COMPR_RATIO"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "APPROX_COMPR_RATIO"),
} }
StoppedFlag = cli.BoolFlag{
Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "STOPPED"),
}
MnemonicFlag = cli.StringFlag{ MnemonicFlag = cli.StringFlag{
Name: "mnemonic", Name: "mnemonic",
Usage: "The mnemonic used to derive the wallets for either the " + Usage: "The mnemonic used to derive the wallets for either the " +
...@@ -133,6 +139,7 @@ var optionalFlags = []cli.Flag{ ...@@ -133,6 +139,7 @@ var optionalFlags = []cli.Flag{
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
TargetNumFramesFlag, TargetNumFramesFlag,
ApproxComprRatioFlag, ApproxComprRatioFlag,
StoppedFlag,
MnemonicFlag, MnemonicFlag,
SequencerHDPathFlag, SequencerHDPathFlag,
PrivateKeyFlag, PrivateKeyFlag,
...@@ -145,6 +152,7 @@ func init() { ...@@ -145,6 +152,7 @@ func init() {
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, opsigner.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, opsigner.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, rpc.CLIFlags(envVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) Flags = append(requiredFlags, optionalFlags...)
} }
......
package rpc
import (
"context"
)
type batcherClient interface {
Start() error
Stop() error
}
type adminAPI struct {
b batcherClient
}
func NewAdminAPI(dr batcherClient) *adminAPI {
return &adminAPI{
b: dr,
}
}
func (a *adminAPI) StartBatcher(_ context.Context) error {
return a.b.Start()
}
func (a *adminAPI) StopBatcher(_ context.Context) error {
return a.b.Stop()
}
package rpc
import (
opservice "github.com/ethereum-optimism/optimism/op-service"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/urfave/cli"
)
const (
EnableAdminFlagName = "rpc.enable-admin"
)
func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
cli.BoolFlag{
Name: EnableAdminFlagName,
Usage: "Enable the admin API (experimental)",
EnvVar: opservice.PrefixEnvVar(envPrefix, "RPC_ENABLE_ADMIN"),
},
}
}
type CLIConfig struct {
oprpc.CLIConfig
EnableAdmin bool
}
func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
CLIConfig: oprpc.ReadCLIConfig(ctx),
EnableAdmin: ctx.GlobalBool(EnableAdminFlagName),
}
}
...@@ -115,6 +115,7 @@ services: ...@@ -115,6 +115,7 @@ services:
ports: ports:
- "6061:6060" - "6061:6060"
- "7301:7300" - "7301:7300"
- "6545:8545"
environment: environment:
OP_BATCHER_L1_ETH_RPC: http://l1:8545 OP_BATCHER_L1_ETH_RPC: http://l1:8545
OP_BATCHER_L2_ETH_RPC: http://l2:8545 OP_BATCHER_L2_ETH_RPC: http://l2:8545
...@@ -130,10 +131,10 @@ services: ...@@ -130,10 +131,10 @@ services:
OP_BATCHER_RESUBMISSION_TIMEOUT: 30s OP_BATCHER_RESUBMISSION_TIMEOUT: 30s
OP_BATCHER_MNEMONIC: test test test test test test test test test test test junk OP_BATCHER_MNEMONIC: test test test test test test test test test test test junk
OP_BATCHER_SEQUENCER_HD_PATH: "m/44'/60'/0'/0/2" OP_BATCHER_SEQUENCER_HD_PATH: "m/44'/60'/0'/0/2"
OP_BATCHER_SEQUENCER_BATCH_INBOX_ADDRESS: "${SEQUENCER_BATCH_INBOX_ADDRESS}"
OP_BATCHER_LOG_TERMINAL: "true" OP_BATCHER_LOG_TERMINAL: "true"
OP_BATCHER_PPROF_ENABLED: "true" OP_BATCHER_PPROF_ENABLED: "true"
OP_BATCHER_METRICS_ENABLED: "true" OP_BATCHER_METRICS_ENABLED: "true"
OP_BATCHER_RPC_ENABLE_ADMIN: "true"
stateviz: stateviz:
build: build:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment