Commit 8f7f94d5 authored by protolambda's avatar protolambda

op-batcher: service lifecycle cleanup

parent 848ae875
......@@ -3,110 +3,32 @@ package batcher
import (
"context"
"fmt"
_ "net/http/pprof"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
)
// Main is the entrypoint into the Batch Submitter. This method returns a
// closure that executes the service and blocks until the service exits. The use
// of a closure allows the parameters bound to the top-level main package, e.g.
// GitVersion, to be captured and used once the function is executed.
func Main(version string, cliCtx *cli.Context) error {
// Main is the entrypoint into the Batch Submitter.
// This method returns a cliapp.LifecycleAction, to create an op-service CLI-lifecycle-managed batch-submitter with.
func Main(version string) cliapp.LifecycleAction {
return func(cliCtx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) {
if err := flags.CheckRequired(cliCtx); err != nil {
return err
return nil, err
}
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid CLI flags: %w", err)
return nil, fmt.Errorf("invalid CLI flags: %w", err)
}
l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.GetHandler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
procName := "default"
m := metrics.NewMetrics(procName)
l.Info("Initializing Batch Submitter")
batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m)
if err != nil {
l.Error("Unable to create Batch Submitter", "error", err)
return err
}
if !cfg.Stopped {
if err := batchSubmitter.Start(); err != nil {
l.Error("Unable to start Batch Submitter", "error", err)
return err
}
}
defer batchSubmitter.StopIfRunning(context.Background())
pprofConfig := cfg.PprofConfig
if pprofConfig.Enabled {
l.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
l.Error("failed to start pprof server", "err", err)
return err
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
defer func() {
if err := pprofSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
}
metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
l.Info("started metrics server", "addr", metricsSrv.Addr())
defer func() {
if err := metricsSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.TxManager.From())
}
server := oprpc.NewServer(
cfg.RPCFlag.ListenAddr,
cfg.RPCFlag.ListenPort,
version,
oprpc.WithLogger(l),
)
if cfg.RPCFlag.EnableAdmin {
adminAPI := rpc.NewAdminAPI(batchSubmitter, &m.RPCMetrics, l)
server.AddAPI(rpc.GetAdminAPI(adminAPI))
l.Info("Admin RPC enabled")
}
if err := server.Start(); err != nil {
return fmt.Errorf("error starting RPC server: %w", err)
}
m.RecordInfo(version)
m.RecordUp()
opio.BlockOnInterrupts()
if err := server.Stop(); err != nil {
l.Error("Error shutting down http server: %w", err)
l.Info("Initializing Batch Submitter")
return BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l)
}
return nil
}
......@@ -3,52 +3,17 @@ package batcher
import (
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
type Config struct {
log log.Logger
metr metrics.Metricer
L1Client *ethclient.Client
L2Client *ethclient.Client
RollupNode *sources.RollupClient
TxManager txmgr.TxManager
NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64
// RollupConfig is queried at startup
Rollup *rollup.Config
// Channel builder parameters
Channel ChannelConfig
}
// Check ensures that the [Config] is valid.
func (c *Config) Check() error {
if err := c.Rollup.Check(); err != nil {
return err
}
if err := c.Channel.Check(); err != nil {
return err
}
return nil
}
type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
......@@ -92,7 +57,7 @@ type CLIConfig struct {
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
RPCFlag oprpc.CLIConfig
RPC oprpc.CLIConfig
}
func (c CLIConfig) Check() error {
......@@ -107,15 +72,15 @@ func (c CLIConfig) Check() error {
if err := c.TxMgrConfig.Check(); err != nil {
return err
}
if err := c.RPCFlag.Check(); err != nil {
if err := c.RPC.Check(); err != nil {
return err
}
return nil
}
// NewConfig parses the Config from the provided flags or environment variables.
func NewConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
func NewConfig(ctx *cli.Context) *CLIConfig {
return &CLIConfig{
/* Required Flags */
L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.String(flags.L2EthRpcFlag.Name),
......@@ -133,6 +98,6 @@ func NewConfig(ctx *cli.Context) CLIConfig {
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPCFlag: oprpc.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
}
}
......@@ -10,22 +10,49 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
var ErrBatcherNotRunning = errors.New("batcher is not running")
type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}
type L2Client interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
}
type RollupClient interface {
SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
}
// DriverSetup is the collection of input/output interfaces and configuration that the driver operates on.
type DriverSetup struct {
Log log.Logger
Metr metrics.Metricer
RollupCfg *rollup.Config
Cfg BatcherConfig
Txmgr txmgr.TxManager
L1Client L1Client
L2Client L2Client
RollupClient RollupClient
Channel ChannelConfig
}
// BatchSubmitter encapsulates a service responsible for submitting L2 tx
// batches to L1 for availability.
type BatchSubmitter struct {
Config // directly embed the config + sources
DriverSetup
txMgr txmgr.TxManager
wg sync.WaitGroup
shutdownCtx context.Context
......@@ -43,88 +70,16 @@ type BatchSubmitter struct {
state *channelManager
}
// NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources
// that will be needed during operation.
func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) {
ctx := context.Background()
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := dial.DialEthClientWithTimeout(dial.DefaultDialTimeout, l, cfg.L1EthRpc)
if err != nil {
return nil, err
}
l2Client, err := dial.DialEthClientWithTimeout(dial.DefaultDialTimeout, l, cfg.L2EthRpc)
if err != nil {
return nil, err
}
rollupClient, err := dial.DialRollupClientWithTimeout(dial.DefaultDialTimeout, l, cfg.RollupRpc)
if err != nil {
return nil, err
}
rcfg, err := rollupClient.RollupConfig(ctx)
if err != nil {
return nil, fmt.Errorf("querying rollup config: %w", err)
}
txManager, err := txmgr.NewSimpleTxManager("batcher", l, m, cfg.TxMgrConfig)
if err != nil {
return nil, err
}
batcherCfg := Config{
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
MaxPendingTransactions: cfg.MaxPendingTransactions,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
TxManager: txManager,
Rollup: rcfg,
Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorConfig: cfg.CompressorConfig.Config(),
},
}
// Validate the batcher config
if err := batcherCfg.Check(); err != nil {
return nil, err
}
return NewBatchSubmitter(ctx, batcherCfg, l, m)
}
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation.
func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) {
balance, err := cfg.L1Client.BalanceAt(ctx, cfg.TxManager.From(), nil)
if err != nil {
return nil, err
}
cfg.log = l
cfg.log.Info("creating batch submitter", "submitter_addr", cfg.TxManager.From(), "submitter_bal", balance)
cfg.metr = m
// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
return &BatchSubmitter{
Config: cfg,
txMgr: cfg.TxManager,
state: NewChannelManager(l, m, cfg.Channel),
}, nil
DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel),
}
}
func (l *BatchSubmitter) Start() error {
l.log.Info("Starting Batch Submitter")
func (l *BatchSubmitter) StartBatchSubmitting() error {
l.Log.Info("Starting Batch Submitter")
l.mutex.Lock()
defer l.mutex.Unlock()
......@@ -142,23 +97,27 @@ func (l *BatchSubmitter) Start() error {
l.wg.Add(1)
go l.loop()
l.log.Info("Batch Submitter started")
l.Log.Info("Batch Submitter started")
return nil
}
func (l *BatchSubmitter) StopIfRunning(ctx context.Context) {
_ = l.Stop(ctx)
func (l *BatchSubmitter) StopBatchSubmittingIfRunning(ctx context.Context) error {
err := l.StopBatchSubmitting(ctx)
if errors.Is(err, ErrBatcherNotRunning) {
return nil
}
return err
}
func (l *BatchSubmitter) Stop(ctx context.Context) error {
l.log.Info("Stopping Batch Submitter")
// StopBatchSubmitting stops the batch-submitter loop, and force-kills if the provided ctx is done.
func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error {
l.Log.Info("Stopping Batch Submitter")
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.running {
return errors.New("batcher is not running")
return ErrBatcherNotRunning
}
l.running = false
......@@ -175,8 +134,7 @@ func (l *BatchSubmitter) Stop(ctx context.Context) error {
l.wg.Wait()
l.cancelKillCtx()
l.log.Info("Batch Submitter stopped")
l.Log.Info("Batch Submitter stopped")
return nil
}
......@@ -191,7 +149,7 @@ func (l *BatchSubmitter) Stop(ctx context.Context) error {
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
start, end, err := l.calculateL2BlockRangeToStore(ctx)
if err != nil {
l.log.Warn("Error calculating L2 block range", "err", err)
l.Log.Warn("Error calculating L2 block range", "err", err)
return err
} else if start.Number >= end.Number {
return errors.New("start number is >= end number")
......@@ -202,30 +160,30 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
for i := start.Number + 1; i < end.Number+1; i++ {
block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.log.Warn("Found L2 reorg", "block_number", i)
l.Log.Warn("Found L2 reorg", "block_number", i)
l.lastStoredBlock = eth.BlockID{}
return err
} else if err != nil {
l.log.Warn("failed to load block into state", "err", err)
l.Log.Warn("failed to load block into state", "err", err)
return err
}
l.lastStoredBlock = eth.ToBlockID(block)
latestBlock = block
}
l2ref, err := derive.L2BlockToBlockRef(latestBlock, &l.Rollup.Genesis)
l2ref, err := derive.L2BlockToBlockRef(latestBlock, &l.RollupCfg.Genesis)
if err != nil {
l.log.Warn("Invalid L2 block loaded into state", "err", err)
l.Log.Warn("Invalid L2 block loaded into state", "err", err)
return err
}
l.metr.RecordL2BlocksLoaded(l2ref)
l.Metr.RecordL2BlocksLoaded(l2ref)
return nil
}
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
ctx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil {
......@@ -236,16 +194,16 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
return nil, fmt.Errorf("adding L2 block to state: %w", err)
}
l.log.Info("added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time())
l.Log.Info("added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time())
return block, nil
}
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
ctx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
syncStatus, err := l.RollupNode.SyncStatus(ctx)
syncStatus, err := l.RollupClient.SyncStatus(ctx)
// Ensure that we have the sync status
if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err)
......@@ -257,10 +215,10 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
// Check last stored to see if it needs to be set on startup OR set if is lagged behind.
// It lagging implies that the op-node processed some batches that were submitted prior to the current instance of the batcher being alive.
if l.lastStoredBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.Log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
} else if l.lastStoredBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastStoredBlock, "safe", syncStatus.SafeL2)
l.Log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastStoredBlock, "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
}
......@@ -286,11 +244,11 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
ticker := time.NewTicker(l.PollInterval)
ticker := time.NewTicker(l.Cfg.PollInterval)
defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)
queue := txmgr.NewQueue[txData](l.killCtx, l.Txmgr, l.Cfg.MaxPendingTransactions)
for {
select {
......@@ -298,7 +256,7 @@ func (l *BatchSubmitter) loop() {
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
l.state.Clear()
......@@ -310,7 +268,7 @@ func (l *BatchSubmitter) loop() {
case <-l.shutdownCtx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
l.Log.Error("error closing the channel manager", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
return
......@@ -335,7 +293,7 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
if drain && err != io.EOF {
l.log.Error("error sending tx while draining state", "err", err)
l.Log.Error("error sending tx while draining state", "err", err)
}
return
}
......@@ -357,7 +315,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
l.Log.Error("Failed to query L1 tip", "error", err)
return err
}
l.recordL1Tip(l1tip)
......@@ -365,10 +323,10 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
// Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
l.Log.Trace("no transaction data available")
return err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
l.Log.Error("unable to get tx data", "err", err)
return err
}
......@@ -384,12 +342,12 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txDat
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
l.log.Error("Failed to calculate intrinsic gas", "error", err)
l.Log.Error("Failed to calculate intrinsic gas", "error", err)
return
}
candidate := txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
To: &l.RollupCfg.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
}
......@@ -399,10 +357,10 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txDat
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.Log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.recordFailedTx(r.ID.ID(), r.Err)
} else {
l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
l.Log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
l.recordConfirmedTx(r.ID.ID(), r.Receipt)
}
}
......@@ -412,16 +370,16 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
return
}
l.lastL1Tip = l1tip
l.metr.RecordLatestL1Block(l1tip)
l.Metr.RecordLatestL1Block(l1tip)
}
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.log.Warn("Failed to send transaction", "err", err)
l.Log.Warn("Failed to send transaction", "err", err)
l.state.TxFailed(id)
}
func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
l.Log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
l1block := eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash}
l.state.TxConfirmed(id, l1block)
}
......@@ -429,7 +387,7 @@ func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
// to be a lifetime context, so it is internally wrapped with a network timeout.
func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
tctx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
tctx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
head, err := l.L1Client.HeaderByNumber(tctx, nil)
if err != nil {
......
package batcher
import (
"context"
"errors"
"fmt"
"io"
"net"
_ "net/http/pprof"
"strconv"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
type BatcherConfig struct {
NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64
}
// BatcherService represents a full batch-submitter instance and its resources,
// and conforms to the op-service CLI Lifecycle interface.
type BatcherService struct {
Log log.Logger
Metrics metrics.Metricer
L1Client *ethclient.Client
L2Client *ethclient.Client
RollupNode *sources.RollupClient
TxManager txmgr.TxManager
BatcherConfig
RollupConfig *rollup.Config
// Channel builder parameters
Channel ChannelConfig
driver *BatchSubmitter
Version string
pprofSrv *httputil.HTTPServer
metricsSrv *httputil.HTTPServer
rpcServer *oprpc.Server
balanceMetricer io.Closer
stopped atomic.Bool
NotSubmittingOnStart bool
}
// BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig.
// The service components are fully started, except for the driver,
// which will not be submitting batches (if it was configured to) until the Start part of the lifecycle.
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) (*BatcherService, error) {
var bs BatcherService
if err := bs.initFromCLIConfig(ctx, version, cfg, log); err != nil {
return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt
}
return &bs, nil
}
func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) error {
bs.Version = version
bs.Log = log
bs.NotSubmittingOnStart = cfg.Stopped
bs.initMetrics(cfg)
bs.PollInterval = cfg.PollInterval
bs.MaxPendingTransactions = cfg.MaxPendingTransactions
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
if err := bs.initRPCClients(ctx, cfg); err != nil {
return err
}
if err := bs.initRollupCfg(ctx); err != nil {
return fmt.Errorf("failed to load rollup config: %w", err)
}
if err := bs.initChannelConfig(cfg); err != nil {
return fmt.Errorf("failed to init channel config: %w", err)
}
if err := bs.initTxManager(cfg); err != nil {
return fmt.Errorf("failed to init Tx manager: %w", err)
}
bs.initBalanceMonitor(cfg)
if err := bs.initMetricsServer(cfg); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to start pprof server: %w", err)
}
bs.initDriver()
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
// Validate the setup
if err := bs.Check(); err != nil {
return fmt.Errorf("failed post-initialization check: %w", err)
}
bs.Metrics.RecordInfo(bs.Version)
bs.Metrics.RecordUp()
return nil
}
func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) error {
l1Client, err := dial.DialEthClientWithTimeout(ctx, dial.DefaultDialTimeout, bs.Log, cfg.L1EthRpc)
if err != nil {
return fmt.Errorf("failed to dial L1 RPC: %w", err)
}
bs.L1Client = l1Client
l2Client, err := dial.DialEthClientWithTimeout(ctx, dial.DefaultDialTimeout, bs.Log, cfg.L2EthRpc)
if err != nil {
return fmt.Errorf("failed to dial L2 engine RPC: %w", err)
}
bs.L2Client = l2Client
rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, bs.Log, cfg.RollupRpc)
if err != nil {
return fmt.Errorf("failed to dial L2 rollup-client RPC: %w", err)
}
bs.RollupNode = rollupClient
return nil
}
func (bs *BatcherService) initMetrics(cfg *CLIConfig) {
if cfg.MetricsConfig.Enabled {
procName := "default"
bs.Metrics = metrics.NewMetrics(procName)
} else {
bs.Metrics = metrics.NoopMetrics
}
}
// initBalanceMonitor depends on Metrics, L1Client and TxManager to start background-monitoring of the batcher balance.
func (bs *BatcherService) initBalanceMonitor(cfg *CLIConfig) {
if cfg.MetricsConfig.Enabled {
bs.balanceMetricer = bs.Metrics.StartBalanceMetrics(bs.Log, bs.L1Client, bs.TxManager.From())
}
}
func (bs *BatcherService) initRollupCfg(ctx context.Context) error {
rollupCfg, err := bs.RollupNode.RollupConfig(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve rollup config: %w", err)
}
bs.RollupConfig = rollupCfg
if err := bs.RollupConfig.Check(); err != nil {
return fmt.Errorf("invalid rollup config: %w", err)
}
return nil
}
func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
bs.Channel = ChannelConfig{
SeqWindowSize: bs.RollupConfig.SeqWindowSize,
ChannelTimeout: bs.RollupConfig.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorConfig: cfg.CompressorConfig.Config(),
}
if err := bs.Channel.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
}
return nil
}
func (bs *BatcherService) initTxManager(cfg *CLIConfig) error {
txManager, err := txmgr.NewSimpleTxManager("batcher", bs.Log, bs.Metrics, cfg.TxMgrConfig)
if err != nil {
return err
}
bs.TxManager = txManager
return nil
}
func (bs *BatcherService) initPProf(cfg *CLIConfig) error {
if !cfg.PprofConfig.Enabled {
return nil
}
log.Debug("starting pprof server", "addr", net.JoinHostPort(cfg.PprofConfig.ListenAddr, strconv.Itoa(cfg.PprofConfig.ListenPort)))
srv, err := oppprof.StartServer(cfg.PprofConfig.ListenAddr, cfg.PprofConfig.ListenPort)
if err != nil {
return err
}
bs.pprofSrv = srv
log.Info("started pprof server", "addr", srv.Addr())
return nil
}
func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
if !cfg.MetricsConfig.Enabled {
bs.Log.Info("metrics disabled")
return nil
}
m, ok := bs.Metrics.(opmetrics.RegistryMetricer)
if !ok {
return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", bs.Metrics)
}
bs.Log.Debug("starting metrics server", "addr", cfg.MetricsConfig.ListenAddr, "port", cfg.MetricsConfig.ListenPort)
metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.MetricsConfig.ListenAddr, cfg.MetricsConfig.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
bs.Log.Info("started metrics server", "addr", metricsSrv.Addr())
bs.metricsSrv = metricsSrv
return nil
}
func (bs *BatcherService) initDriver() {
bs.driver = NewBatchSubmitter(DriverSetup{
Log: bs.Log,
Metr: bs.Metrics,
RollupCfg: bs.RollupConfig,
Cfg: bs.BatcherConfig,
Txmgr: bs.TxManager,
L1Client: bs.L1Client,
L2Client: bs.L2Client,
RollupClient: bs.RollupNode,
Channel: bs.Channel,
})
}
func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
server := oprpc.NewServer(
cfg.RPC.ListenAddr,
cfg.RPC.ListenPort,
bs.Version,
oprpc.WithLogger(bs.Log),
)
if cfg.RPC.EnableAdmin {
adminAPI := rpc.NewAdminAPI(bs.driver, bs.Metrics, bs.Log)
server.AddAPI(rpc.GetAdminAPI(adminAPI))
bs.Log.Info("Admin RPC enabled")
}
bs.Log.Info("Starting JSON-RPC server")
if err := server.Start(); err != nil {
return fmt.Errorf("unable to start RPC server: %w", err)
}
bs.rpcServer = server
return nil
}
// Check ensures that the [BatcherService] is valid
func (bs *BatcherService) Check() error {
return nil
}
// Start runs once upon start of the batcher lifecycle,
// and starts batch-submission work if the batcher is configured to start submit data on startup.
func (bs *BatcherService) Start(_ context.Context) error {
bs.driver.Log.Info("Starting batcher", "notSubmittingOnStart", bs.NotSubmittingOnStart)
if !bs.NotSubmittingOnStart {
return bs.driver.StartBatchSubmitting()
}
return nil
}
// Stopped returns if the service as a whole is stopped.
func (bs *BatcherService) Stopped() bool {
return bs.stopped.Load()
}
// Kill is a convenience method to forcefully, non-gracefully, stop the BatcherService.
func (bs *BatcherService) Kill() error {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return bs.Stop(ctx)
}
// Stop fully stops the batch-submitter and all its resources gracefully. After stopping, it cannot be restarted.
// See driver.StopBatchSubmitting to temporarily stop the batch submitter.
// If the provided ctx is cancelled, the stopping is forced, i.e. the batching work is killed non-gracefully.
func (bs *BatcherService) Stop(ctx context.Context) error {
if bs.stopped.Load() {
return errors.New("already stopped")
}
bs.Log.Info("Stopping batcher")
var result error
if err := bs.driver.StopBatchSubmittingIfRunning(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop batch submitting: %w", err))
}
if bs.rpcServer != nil {
// TODO(7685): the op-service RPC server is not built on top of op-service httputil Server, and has poor shutdown
if err := bs.rpcServer.Stop(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err))
}
}
if bs.pprofSrv != nil {
if err := bs.pprofSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err))
}
}
if bs.balanceMetricer != nil {
if err := bs.balanceMetricer.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close balance metricer: %w", err))
}
}
if bs.metricsSrv != nil {
if err := bs.metricsSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
}
}
if bs.L1Client != nil {
bs.L1Client.Close()
}
if bs.L2Client != nil {
bs.L2Client.Close()
}
if bs.RollupNode != nil {
bs.RollupNode.Close()
}
if result == nil {
bs.stopped.Store(true)
bs.driver.Log.Info("Batch Submitter stopped")
}
return result
}
var _ cliapp.Lifecycle = (*BatcherService)(nil)
// Driver returns the handler on the batch-submitter driver element,
// to start/stop/restart the batch-submission work, for use in testing.
func (bs *BatcherService) Driver() rpc.BatcherDriver {
return bs.driver
}
......@@ -30,7 +30,7 @@ func main() {
app.Name = "op-batcher"
app.Usage = "Batch Submitter Service"
app.Description = "Service for generating and submitting L2 tx batches to L1"
app.Action = curryMain(Version)
app.Action = cliapp.LifecycleCmd(batcher.Main(Version))
app.Commands = []*cli.Command{
{
Name: "doc",
......@@ -43,11 +43,3 @@ func main() {
log.Crit("Application failed", "message", err)
}
}
// curryMain transforms the batcher.Main function into an app.Action
// This is done to capture the Version of the batcher.
func curryMain(version string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error {
return batcher.Main(version, ctx)
}
}
package metrics
import (
"context"
"io"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)
......@@ -28,6 +28,10 @@ type Metricer interface {
// Record Tx metrics
txmetrics.TxMetricer
opmetrics.RPCMetricer
StartBalanceMetrics(l log.Logger, client *ethclient.Client, account common.Address) io.Closer
RecordLatestL1Block(l1ref eth.L1BlockRef)
RecordL2BlocksLoaded(l2ref eth.L2BlockRef)
RecordChannelOpened(id derive.ChannelID, numPendingBlocks int)
......@@ -79,6 +83,9 @@ type Metrics struct {
var _ Metricer = (*Metrics)(nil)
// implements the Registry getter, for metrics HTTP server to hook into
var _ opmetrics.RegistryMetricer = (*Metrics)(nil)
func NewMetrics(procName string) *Metrics {
if procName == "" {
procName = "default"
......@@ -179,17 +186,16 @@ func NewMetrics(procName string) *Metrics {
}
}
func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {
return opmetrics.StartServer(m.registry, host, port)
func (m *Metrics) Registry() *prometheus.Registry {
return m.registry
}
func (m *Metrics) Document() []opmetrics.DocumentedMetric {
return m.factory.Document()
}
func (m *Metrics) StartBalanceMetrics(ctx context.Context,
l log.Logger, client *ethclient.Client, account common.Address) {
opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account)
func (m *Metrics) StartBalanceMetrics(l log.Logger, client *ethclient.Client, account common.Address) io.Closer {
return opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
}
// RecordInfo sets a pseudo-metric that contains versioning and
......
package metrics
import (
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
"github.com/ethereum/go-ethereum/core/types"
)
type noopMetrics struct {
opmetrics.NoopRefMetrics
txmetrics.NoopTxMetrics
opmetrics.NoopRPCMetrics
}
var NoopMetrics Metricer = new(noopMetrics)
......@@ -35,3 +42,6 @@ func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {}
func (*noopMetrics) RecordBatchTxSubmitted() {}
func (*noopMetrics) RecordBatchTxSuccess() {}
func (*noopMetrics) RecordBatchTxFailed() {}
func (*noopMetrics) StartBalanceMetrics(log.Logger, *ethclient.Client, common.Address) io.Closer {
return nil
}
......@@ -10,17 +10,17 @@ import (
"github.com/ethereum-optimism/optimism/op-service/rpc"
)
type batcherClient interface {
Start() error
Stop(ctx context.Context) error
type BatcherDriver interface {
StartBatchSubmitting() error
StopBatchSubmitting(ctx context.Context) error
}
type adminAPI struct {
*rpc.CommonAdminAPI
b batcherClient
b BatcherDriver
}
func NewAdminAPI(dr batcherClient, m metrics.RPCMetricer, log log.Logger) *adminAPI {
func NewAdminAPI(dr BatcherDriver, m metrics.RPCMetricer, log log.Logger) *adminAPI {
return &adminAPI{
CommonAdminAPI: rpc.NewCommonAdminAPI(m, log),
b: dr,
......@@ -35,9 +35,9 @@ func GetAdminAPI(api *adminAPI) gethrpc.API {
}
func (a *adminAPI) StartBatcher(_ context.Context) error {
return a.b.Start()
return a.b.StartBatchSubmitting()
}
func (a *adminAPI) StopBatcher(ctx context.Context) error {
return a.b.Stop(ctx)
return a.b.StopBatchSubmitting(ctx)
}
......@@ -34,7 +34,7 @@ type OutputTraceProvider struct {
}
func NewTraceProvider(ctx context.Context, logger log.Logger, rollupRpc string, gameDepth, prestateBlock, poststateBlock uint64) (*OutputTraceProvider, error) {
rollupClient, err := dial.DialRollupClientWithTimeout(dial.DefaultDialTimeout, logger, rollupRpc)
rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, logger, rollupRpc)
if err != nil {
return nil, err
}
......
......@@ -55,7 +55,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
return nil, fmt.Errorf("failed to create the transaction manager: %w", err)
}
l1Client, err := dial.DialEthClientWithTimeout(dial.DefaultDialTimeout, logger, cfg.L1EthRpc)
l1Client, err := dial.DialEthClientWithTimeout(ctx, dial.DefaultDialTimeout, logger, cfg.L1EthRpc)
if err != nil {
return nil, fmt.Errorf("failed to dial L1: %w", err)
}
......
......@@ -133,7 +133,12 @@ func (m *Metrics) StartBalanceMetrics(
client *ethclient.Client,
account common.Address,
) {
opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account)
// TODO(7684): util was refactored to close, but ctx is still being used by caller for shutdown
balanceMetric := opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
go func() {
<-ctx.Done()
_ = balanceMetric.Close()
}()
}
// RecordInfo sets a pseudo-metric that contains versioning and
......
......@@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"crypto/rand"
"errors"
"fmt"
"math/big"
"net"
......@@ -37,7 +38,6 @@ import (
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-e2e/config"
......@@ -252,7 +252,7 @@ type System struct {
RawClients map[string]*rpc.Client
RollupNodes map[string]*rollupNode.OpNode
L2OutputSubmitter *l2os.L2OutputSubmitter
BatchSubmitter *bss.BatchSubmitter
BatchSubmitter *bss.BatcherService
Mocknet mocknet.Mocknet
// TimeTravelClock is nil unless SystemConfig.SupportL1TimeTravel was set to true
......@@ -268,18 +268,16 @@ func (sys *System) NodeEndpoint(name string) string {
}
func (sys *System) Close() {
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling
if sys.L2OutputSubmitter != nil {
sys.L2OutputSubmitter.Stop()
}
if sys.BatchSubmitter != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sys.BatchSubmitter.StopIfRunning(ctx)
_ = sys.BatchSubmitter.Kill()
}
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling
for _, node := range sys.RollupNodes {
_ = node.Stop(postCtx)
}
......@@ -678,8 +676,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
}
// Batch Submitter
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
......@@ -698,17 +695,17 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Level: log.LvlInfo,
Format: oplog.FormatText,
},
}, sys.cfg.Loggers["batcher"], batchermetrics.NoopMetrics)
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"])
if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
}
// Batcher may be enabled later
if !sys.cfg.DisableBatcher {
if err := sys.BatchSubmitter.Start(); err != nil {
return nil, fmt.Errorf("unable to start batch submitter: %w", err)
}
if err := batcher.Start(context.Background()); err != nil {
return nil, errors.Join(fmt.Errorf("failed to start batch submitter: %w", err), batcher.Stop(context.Background()))
}
sys.BatchSubmitter = batcher
return sys, nil
}
......
......@@ -93,7 +93,7 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool) {
l2OutputRoot := agreedL2Output.OutputRoot
t.Log("=====Stopping batch submitter=====")
err = sys.BatchSubmitter.Stop(ctx)
err = sys.BatchSubmitter.Driver().StopBatchSubmitting(context.Background())
require.NoError(t, err, "could not stop batch submitter")
// Wait for the sequencer to catch up with the current L1 head so we know all submitted batches are processed
......@@ -121,7 +121,7 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool) {
l2Claim := l2Output.OutputRoot
t.Log("=====Restarting batch submitter=====")
err = sys.BatchSubmitter.Start()
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
require.NoError(t, err, "could not start batch submitter")
t.Log("Add a transaction to the next batch after sequence of empty blocks")
......@@ -258,7 +258,7 @@ func testFaultProofProgramScenario(t *testing.T, ctx context.Context, sys *Syste
t.Log("Shutting down network")
// Shutdown the nodes from the actual chain. Should now be able to run using only the pre-fetched data.
sys.BatchSubmitter.StopIfRunning(context.Background())
require.NoError(t, sys.BatchSubmitter.Kill())
sys.L2OutputSubmitter.Stop()
sys.L2OutputSubmitter = nil
for _, node := range sys.EthInstances {
......
......@@ -1262,7 +1262,7 @@ func TestStopStartBatcher(t *testing.T) {
require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance")
// stop the batch submission
err = sys.BatchSubmitter.Stop(context.Background())
err = sys.BatchSubmitter.Driver().StopBatchSubmitting(context.Background())
require.Nil(t, err)
// wait for any old safe blocks being submitted / derived
......@@ -1282,7 +1282,7 @@ func TestStopStartBatcher(t *testing.T) {
require.Equal(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain advanced while batcher was stopped")
// start the batch submission
err = sys.BatchSubmitter.Start()
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
require.Nil(t, err)
time.Sleep(safeBlockInclusionDuration)
......@@ -1321,7 +1321,7 @@ func TestBatcherMultiTx(t *testing.T) {
require.Nil(t, err)
// start batch submission
err = sys.BatchSubmitter.Start()
err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
require.Nil(t, err)
totalTxCount := 0
......
......@@ -604,7 +604,9 @@ func (m *Metrics) ReportProtocolVersions(local, engine, recommended, required pa
m.ProtocolVersions.WithLabelValues(local.String(), engine.String(), recommended.String(), required.String()).Set(1)
}
type noopMetricer struct{}
type noopMetricer struct {
metrics.NoopRPCMetrics
}
var NoopMetrics Metricer = new(noopMetricer)
......@@ -614,17 +616,6 @@ func (n *noopMetricer) RecordInfo(version string) {
func (n *noopMetricer) RecordUp() {
}
func (n *noopMetricer) RecordRPCServerRequest(method string) func() {
return func() {}
}
func (n *noopMetricer) RecordRPCClientRequest(method string) func(err error) {
return func(err error) {}
}
func (n *noopMetricer) RecordRPCClientResponse(method string, err error) {
}
func (n *noopMetricer) SetDerivationIdle(status bool) {
}
......
......@@ -84,7 +84,12 @@ func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {
func (m *Metrics) StartBalanceMetrics(ctx context.Context,
l log.Logger, client *ethclient.Client, account common.Address) {
opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account)
// TODO(7684): util was refactored to close, but ctx is still being used by caller for shutdown
balanceMetric := opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
go func() {
<-ctx.Done()
_ = balanceMetric.Close()
}()
}
// RecordInfo sets a pseudo-metric that contains versioning and
......
......@@ -172,12 +172,12 @@ func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger, m metr
}
// Connect to L1 and L2 providers. Perform these last since they are the most expensive.
l1Client, err := dial.DialEthClientWithTimeout(dial.DefaultDialTimeout, l, cfg.L1EthRpc)
l1Client, err := dial.DialEthClientWithTimeout(context.Background(), dial.DefaultDialTimeout, l, cfg.L1EthRpc)
if err != nil {
return nil, err
}
rollupClient, err := dial.DialRollupClientWithTimeout(dial.DefaultDialTimeout, l, cfg.RollupRpc)
rollupClient, err := dial.DialRollupClientWithTimeout(context.Background(), dial.DefaultDialTimeout, l, cfg.RollupRpc)
if err != nil {
return nil, err
}
......
package clock
import (
"context"
"sync"
"time"
)
// LoopFn is a simple ticker-loop with io.Closer support.
// Note that ticks adapt; slow function calls may result in lost ticks.
type LoopFn struct {
ctx context.Context
cancel context.CancelFunc
ticker Ticker
fn func(ctx context.Context)
onClose func() error
wg sync.WaitGroup
}
// Close cancels the context of the ongoing function call, waits for the call to complete, and cancels further calls.
// Close is safe to call again or concurrently. The onClose callback will be called for each Close call.
func (lf *LoopFn) Close() error {
lf.cancel() // stop any ongoing function call, and close the main loop
lf.wg.Wait() // wait for completion
if lf.onClose != nil {
return lf.onClose() // optional: user can specify function to close resources with
}
return nil
}
func (lf *LoopFn) work() {
defer lf.wg.Done()
defer lf.ticker.Stop() // clean up the timer
for {
select {
case <-lf.ctx.Done():
return
case <-lf.ticker.Ch():
ctx, cancel := context.WithCancel(lf.ctx)
lf.fn(ctx)
cancel()
}
}
}
// NewLoopFn creates a periodic function call, which can be closed,
// with an optional onClose callback to clean up resources.
func NewLoopFn(clock Clock, fn func(ctx context.Context), onClose func() error, interval time.Duration) *LoopFn {
ctx, cancel := context.WithCancel(context.Background())
lf := &LoopFn{
ctx: ctx,
cancel: cancel,
fn: fn,
ticker: clock.NewTicker(interval),
onClose: onClose,
}
lf.wg.Add(1)
go lf.work()
return lf
}
package clock
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestLoopFn(t *testing.T) {
cl := NewDeterministicClock(time.Now())
calls := make(chan struct{}, 10)
testErr := errors.New("test close error")
loopFn := NewLoopFn(cl, func(ctx context.Context) {
calls <- struct{}{}
}, func() error {
close(calls)
return testErr
}, time.Second*10)
cl.AdvanceTime(time.Second * 15)
<-calls
cl.AdvanceTime(time.Second * 10)
<-calls
require.ErrorIs(t, loopFn.Close(), testErr)
}
......@@ -21,8 +21,8 @@ const defaultRetryTime = 2 * time.Second
// DialEthClientWithTimeout attempts to dial the L1 provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func DialEthClientWithTimeout(timeout time.Duration, log log.Logger, url string) (*ethclient.Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func DialEthClientWithTimeout(ctx context.Context, timeout time.Duration, log log.Logger, url string) (*ethclient.Client, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
c, err := dialRPCClientWithBackoff(ctx, log, url)
......@@ -35,8 +35,8 @@ func DialEthClientWithTimeout(timeout time.Duration, log log.Logger, url string)
// DialRollupClientWithTimeout attempts to dial the RPC provider using the provided URL.
// If the dial doesn't complete within timeout seconds, this method will return an error.
func DialRollupClientWithTimeout(timeout time.Duration, log log.Logger, url string) (*sources.RollupClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func DialRollupClientWithTimeout(ctx context.Context, timeout time.Duration, log log.Logger, url string) (*sources.RollupClient, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
rpcCl, err := dialRPCClientWithBackoff(ctx, log, url)
......
......@@ -5,12 +5,15 @@ import (
"math/big"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
// weiToEther divides the wei value by 10^18 to get a number in ether as a float64
......@@ -22,38 +25,27 @@ func weiToEther(wei *big.Int) float64 {
return f
}
// LaunchBalanceMetrics fires off a go rountine that queries the balance of the supplied account & periodically records it
// to the balance metric of the namespace. The balance of the account is recorded in Ether (not Wei).
// LaunchBalanceMetrics starts a periodic query of the balance of the supplied account and records it
// to the "balance" metric of the namespace. The balance of the account is recorded in Ether (not Wei).
// Cancel the supplied context to shut down the go routine
func LaunchBalanceMetrics(ctx context.Context, log log.Logger, r *prometheus.Registry, ns string, client *ethclient.Client, account common.Address) {
go func() {
func LaunchBalanceMetrics(log log.Logger, r *prometheus.Registry, ns string, client *ethclient.Client, account common.Address) *clock.LoopFn {
balanceGuage := promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "balance",
Help: "balance (in ether) of account " + account.String(),
})
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
return clock.NewLoopFn(clock.SystemClock, func(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
bigBal, err := client.BalanceAt(ctx, account, nil)
if err != nil {
log.Warn("failed to get balance of account", "err", err, "address", account)
cancel()
continue
return
}
bal := weiToEther(bigBal)
balanceGuage.Set(bal)
cancel()
case <-ctx.Done():
}, func() error {
log.Info("balance metrics shutting down")
return
}
}
}()
return nil
}, 10*time.Second)
}
......@@ -11,3 +11,7 @@ func NewRegistry() *prometheus.Registry {
registry.MustRegister(collectors.NewGoCollector())
return registry
}
type RegistryMetricer interface {
Registry() *prometheus.Registry
}
......@@ -125,3 +125,17 @@ func (m *RPCMetrics) RecordRPCClientResponse(method string, err error) {
}
m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}
type NoopRPCMetrics struct{}
func (n *NoopRPCMetrics) RecordRPCServerRequest(method string) func() {
return func() {}
}
func (n *NoopRPCMetrics) RecordRPCClientRequest(method string) func(err error) {
return func(err error) {}
}
func (n *NoopRPCMetrics) RecordRPCClientResponse(method string, err error) {
}
var _ RPCMetricer = (*NoopRPCMetrics)(nil)
......@@ -63,3 +63,7 @@ func (r *RollupClient) SequencerActive(ctx context.Context) (bool, error) {
func (r *RollupClient) SetLogLevel(ctx context.Context, lvl log.Lvl) error {
return r.rpc.CallContext(ctx, nil, "admin_setLogLevel", lvl.String())
}
func (r *RollupClient) Close() {
r.rpc.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment