Commit b1391700 authored by joohhnnn's avatar joohhnnn

op-proposer: service lifecycle cleanup

parent 3c10d0c7
...@@ -59,18 +59,22 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt, ...@@ -59,18 +59,22 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt,
} }
func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer { func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer {
proposerCfg := proposer.Config{ proposerConfig := proposer.ProposerConfig{
L2OutputOracleAddr: cfg.OutputOracleAddr,
PollInterval: time.Second, PollInterval: time.Second,
NetworkTimeout: time.Second, NetworkTimeout: time.Second,
L1Client: l1, L2OutputOracleAddr: cfg.OutputOracleAddr,
RollupClient: rollupCl,
AllowNonFinalized: cfg.AllowNonFinalized, AllowNonFinalized: cfg.AllowNonFinalized,
// We use custom signing here instead of using the transaction manager. }
TxManager: fakeTxMgr{from: crypto.PubkeyToAddress(cfg.ProposerKey.PublicKey)}, driverSetup := proposer.DriverSetup{
Log: log,
Metr: metrics.NoopMetrics,
Cfg: proposerConfig,
Txmgr: fakeTxMgr{from: crypto.PubkeyToAddress(cfg.ProposerKey.PublicKey)},
L1Client: l1,
RollupClient: rollupCl,
} }
dr, err := proposer.NewL2OutputSubmitter(proposerCfg, log, metrics.NoopMetrics) dr, err := proposer.NewL2OutputSubmitter(driverSetup)
require.NoError(t, err) require.NoError(t, err)
contract, err := bindings.NewL2OutputOracleCaller(cfg.OutputOracleAddr, l1) contract, err := bindings.NewL2OutputOracleCaller(cfg.OutputOracleAddr, l1)
require.NoError(t, err) require.NoError(t, err)
......
...@@ -532,7 +532,7 @@ func setupDisputeGameForInvalidOutputRoot(t *testing.T, outputRoot common.Hash) ...@@ -532,7 +532,7 @@ func setupDisputeGameForInvalidOutputRoot(t *testing.T, outputRoot common.Hash)
l2oo.WaitForProposals(ctx, 1) l2oo.WaitForProposals(ctx, 1)
// Stop the honest output submitter so we can publish invalid outputs // Stop the honest output submitter so we can publish invalid outputs
sys.L2OutputSubmitter.Stop() sys.L2OutputSubmitter.Driver().StopL2OutputSubmitting()
sys.L2OutputSubmitter = nil sys.L2OutputSubmitter = nil
// Submit an invalid output root // Submit an invalid output root
......
...@@ -51,7 +51,6 @@ import ( ...@@ -51,7 +51,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
...@@ -257,7 +256,7 @@ type System struct { ...@@ -257,7 +256,7 @@ type System struct {
Clients map[string]*ethclient.Client Clients map[string]*ethclient.Client
RawClients map[string]*rpc.Client RawClients map[string]*rpc.Client
RollupNodes map[string]*rollupNode.OpNode RollupNodes map[string]*rollupNode.OpNode
L2OutputSubmitter *l2os.L2OutputSubmitter L2OutputSubmitter *l2os.ProposerService
BatchSubmitter *bss.BatcherService BatchSubmitter *bss.BatcherService
Mocknet mocknet.Mocknet Mocknet mocknet.Mocknet
...@@ -278,7 +277,7 @@ func (sys *System) Close() { ...@@ -278,7 +277,7 @@ func (sys *System) Close() {
postCancel() // immediate shutdown, no allowance for idling postCancel() // immediate shutdown, no allowance for idling
if sys.L2OutputSubmitter != nil { if sys.L2OutputSubmitter != nil {
sys.L2OutputSubmitter.Stop() _ = sys.L2OutputSubmitter.Kill()
} }
if sys.BatchSubmitter != nil { if sys.BatchSubmitter != nil {
_ = sys.BatchSubmitter.Kill() _ = sys.BatchSubmitter.Kill()
...@@ -663,7 +662,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -663,7 +662,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
} }
// L2Output Submitter // L2Output Submitter
sys.L2OutputSubmitter, err = l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{ proposerCLIConfig := &l2os.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(), L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
L2OOAddress: config.L1Deployments.L2OutputOracleProxy.Hex(), L2OOAddress: config.L1Deployments.L2OutputOracleProxy.Hex(),
...@@ -674,15 +673,18 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -674,15 +673,18 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Level: log.LvlInfo, Level: log.LvlInfo,
Format: oplog.FormatText, Format: oplog.FormatText,
}, },
}, sys.cfg.Loggers["proposer"], proposermetrics.NoopMetrics) }
proposer, err := l2os.ProposerServiceFromCLIConfig(context.Background(), "0.0.1", proposerCLIConfig, sys.cfg.Loggers["proposer"])
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err) return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err)
} }
if err := sys.L2OutputSubmitter.Start(); err != nil { if err := proposer.Start(context.Background()); err != nil {
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err) return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
} }
sys.L2OutputSubmitter = proposer
batchType := derive.SingularBatchType batchType := derive.SingularBatchType
if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" { if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" {
batchType = derive.SpanBatchType batchType = derive.SpanBatchType
......
...@@ -259,7 +259,7 @@ func testFaultProofProgramScenario(t *testing.T, ctx context.Context, sys *Syste ...@@ -259,7 +259,7 @@ func testFaultProofProgramScenario(t *testing.T, ctx context.Context, sys *Syste
t.Log("Shutting down network") t.Log("Shutting down network")
// Shutdown the nodes from the actual chain. Should now be able to run using only the pre-fetched data. // Shutdown the nodes from the actual chain. Should now be able to run using only the pre-fetched data.
require.NoError(t, sys.BatchSubmitter.Kill()) require.NoError(t, sys.BatchSubmitter.Kill())
sys.L2OutputSubmitter.Stop() sys.L2OutputSubmitter.Driver().StopL2OutputSubmitting()
sys.L2OutputSubmitter = nil sys.L2OutputSubmitter = nil
for _, node := range sys.EthInstances { for _, node := range sys.EthInstances {
node.Close() node.Close()
......
...@@ -30,7 +30,7 @@ func main() { ...@@ -30,7 +30,7 @@ func main() {
app.Name = "op-proposer" app.Name = "op-proposer"
app.Usage = "L2Output Submitter" app.Usage = "L2Output Submitter"
app.Description = "Service for generating and submitting L2 Output checkpoints to the L2OutputOracle contract" app.Description = "Service for generating and submitting L2 Output checkpoints to the L2OutputOracle contract"
app.Action = curryMain(Version) app.Action = cliapp.LifecycleCmd(proposer.Main(Version))
app.Commands = []*cli.Command{ app.Commands = []*cli.Command{
{ {
Name: "doc", Name: "doc",
...@@ -44,10 +44,3 @@ func main() { ...@@ -44,10 +44,3 @@ func main() {
} }
} }
// curryMain transforms the proposer.Main function into an app.Action
// This is done to capture the Version of the proposer.
func curryMain(version string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error {
return proposer.Main(version, ctx)
}
}
package metrics package metrics
import ( import (
"context" "io"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
...@@ -10,13 +10,15 @@ import ( ...@@ -10,13 +10,15 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics" txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
) )
const Namespace = "op_proposer" const Namespace = "op_proposer"
// implements the Registry getter, for metrics HTTP server to hook into
var _ opmetrics.RegistryMetricer = (*Metrics)(nil)
type Metricer interface { type Metricer interface {
RecordInfo(version string) RecordInfo(version string)
RecordUp() RecordUp()
...@@ -27,6 +29,10 @@ type Metricer interface { ...@@ -27,6 +29,10 @@ type Metricer interface {
// Record Tx metrics // Record Tx metrics
txmetrics.TxMetricer txmetrics.TxMetricer
opmetrics.RPCMetricer
StartBalanceMetrics(l log.Logger, client *ethclient.Client, account common.Address) io.Closer
RecordL2BlocksProposed(l2ref eth.L2BlockRef) RecordL2BlocksProposed(l2ref eth.L2BlockRef)
} }
...@@ -78,18 +84,12 @@ func NewMetrics(procName string) *Metrics { ...@@ -78,18 +84,12 @@ func NewMetrics(procName string) *Metrics {
} }
} }
func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) { func (m *Metrics) Registry() *prometheus.Registry {
return opmetrics.StartServer(m.registry, host, port) return m.registry
} }
func (m *Metrics) StartBalanceMetrics(ctx context.Context, func (m *Metrics) StartBalanceMetrics(l log.Logger, client *ethclient.Client, account common.Address) io.Closer {
l log.Logger, client *ethclient.Client, account common.Address) { return opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
// TODO(7684): util was refactored to close, but ctx is still being used by caller for shutdown
balanceMetric := opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
go func() {
<-ctx.Done()
_ = balanceMetric.Close()
}()
} }
// RecordInfo sets a pseudo-metric that contains versioning and // RecordInfo sets a pseudo-metric that contains versioning and
......
package metrics package metrics
import ( import (
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics" txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
...@@ -9,6 +15,7 @@ import ( ...@@ -9,6 +15,7 @@ import (
type noopMetrics struct { type noopMetrics struct {
opmetrics.NoopRefMetrics opmetrics.NoopRefMetrics
txmetrics.NoopTxMetrics txmetrics.NoopTxMetrics
opmetrics.NoopRPCMetrics
} }
var NoopMetrics Metricer = new(noopMetrics) var NoopMetrics Metricer = new(noopMetrics)
...@@ -17,3 +24,7 @@ func (*noopMetrics) RecordInfo(version string) {} ...@@ -17,3 +24,7 @@ func (*noopMetrics) RecordInfo(version string) {}
func (*noopMetrics) RecordUp() {} func (*noopMetrics) RecordUp() {}
func (*noopMetrics) RecordL2BlocksProposed(l2ref eth.L2BlockRef) {} func (*noopMetrics) RecordL2BlocksProposed(l2ref eth.L2BlockRef) {}
func (*noopMetrics) StartBalanceMetrics(log.Logger, *ethclient.Client, common.Address) io.Closer {
return nil
}
\ No newline at end of file
...@@ -3,13 +3,10 @@ package proposer ...@@ -3,13 +3,10 @@ package proposer
import ( import (
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-proposer/flags" "github.com/ethereum-optimism/optimism/op-proposer/flags"
"github.com/ethereum-optimism/optimism/op-service/sources"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
...@@ -17,18 +14,6 @@ import ( ...@@ -17,18 +14,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
// Config contains the well typed fields that are used to initialize the output submitter.
// It is intended for programmatic use.
type Config struct {
L2OutputOracleAddr common.Address
PollInterval time.Duration
NetworkTimeout time.Duration
TxManager txmgr.TxManager
L1Client *ethclient.Client
RollupClient *sources.RollupClient
AllowNonFinalized bool
}
// CLIConfig is a well typed config that is parsed from the CLI params. // CLIConfig is a well typed config that is parsed from the CLI params.
// This also contains config options for auxiliary services. // This also contains config options for auxiliary services.
// It is transformed into a `Config` before the L2 output submitter is started. // It is transformed into a `Config` before the L2 output submitter is started.
...@@ -63,7 +48,7 @@ type CLIConfig struct { ...@@ -63,7 +48,7 @@ type CLIConfig struct {
PprofConfig oppprof.CLIConfig PprofConfig oppprof.CLIConfig
} }
func (c CLIConfig) Check() error { func (c *CLIConfig) Check() error {
if err := c.RPCConfig.Check(); err != nil { if err := c.RPCConfig.Check(); err != nil {
return err return err
} }
...@@ -80,8 +65,8 @@ func (c CLIConfig) Check() error { ...@@ -80,8 +65,8 @@ func (c CLIConfig) Check() error {
} }
// NewConfig parses the Config from the provided flags or environment variables. // NewConfig parses the Config from the provided flags or environment variables.
func NewConfig(ctx *cli.Context) CLIConfig { func NewConfig(ctx *cli.Context) *CLIConfig {
return CLIConfig{ return &CLIConfig{
// Required Flags // Required Flags
L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name), L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name),
RollupRpc: ctx.String(flags.RollupRpcFlag.Name), RollupRpc: ctx.String(flags.RollupRpcFlag.Name),
......
package proposer
import (
"context"
"errors"
"fmt"
"math/big"
_ "net/http/pprof"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-proposer/metrics"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
var supportedL2OutputVersion = eth.Bytes32{}
var ErrProposerNotRunning = errors.New("proposer is not running")
type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
// CodeAt returns the code of the given account. This is needed to differentiate
// between contract internal errors and the local chain being out of sync.
CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error)
// CallContract executes an Ethereum contract call with the specified data as the
// input.
CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error)
}
type RollupClient interface {
SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error)
}
type DriverSetup struct {
Log log.Logger
Metr metrics.Metricer
Cfg ProposerConfig //what need to be contented
Txmgr txmgr.TxManager
L1Client L1Client
// RollupClient is used to retrieve output roots from
RollupClient RollupClient
}
// L2OutputSubmitter is responsible for proposing outputs
type L2OutputSubmitter struct {
DriverSetup
wg sync.WaitGroup
done chan struct{}
ctx context.Context
cancel context.CancelFunc
mutex sync.Mutex
running bool
l2ooContract *bindings.L2OutputOracleCaller
l2ooABI *abi.ABI
}
// NewL2OutputSubmitter creates a new L2 Output Submitter
func NewL2OutputSubmitter(setup DriverSetup) (*L2OutputSubmitter, error) {
ctx, cancel := context.WithCancel(context.Background())
l2ooContract, err := bindings.NewL2OutputOracleCaller(setup.Cfg.L2OutputOracleAddr, setup.L1Client)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create L2OO at address %s: %w", setup.Cfg.L2OutputOracleAddr, err)
}
cCtx, cCancel := context.WithTimeout(ctx, setup.Cfg.NetworkTimeout)
defer cCancel()
version, err := l2ooContract.Version(&bind.CallOpts{Context: cCtx})
if err != nil {
cancel()
return nil, err
}
log.Info("Connected to L2OutputOracle", "address", setup.Cfg.L2OutputOracleAddr, "version", version)
parsed, err := bindings.L2OutputOracleMetaData.GetAbi()
if err != nil {
cancel()
return nil, err
}
return &L2OutputSubmitter{
DriverSetup: setup,
done: make(chan struct{}),
ctx: ctx,
cancel: cancel,
l2ooContract: l2ooContract,
l2ooABI: parsed,
}, nil
}
func (l *L2OutputSubmitter) StartL2OutputSubmitting() error {
l.Log.Info("Starting Proposer")
l.mutex.Lock()
defer l.mutex.Unlock()
if l.running {
return errors.New("Proposer is already running")
}
l.running = true
l.wg.Add(1)
go l.loop()
l.Log.Info("Proposer started")
return nil
}
func (l *L2OutputSubmitter) StopL2OutputSubmittingIfRunning() error {
err := l.StopL2OutputSubmitting()
if errors.Is(err, ErrProposerNotRunning) {
return nil
}
return err
}
func (l *L2OutputSubmitter) StopL2OutputSubmitting() error{
l.Log.Info("Stopping Proposer")
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.running {
return ErrProposerNotRunning
}
l.running = false
l.cancel()
close(l.done)
l.wg.Wait()
l.Log.Info("Proposer stopped")
return nil
}
// FetchNextOutputInfo gets the block number of the next proposal.
// It returns: the next block number, if the proposal should be made, error
func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.OutputResponse, bool, error) {
cCtx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
callOpts := &bind.CallOpts{
From: l.Txmgr.From(),
Context: cCtx,
}
nextCheckpointBlock, err := l.l2ooContract.NextBlockNumber(callOpts)
if err != nil {
l.Log.Error("proposer unable to get next block number", "err", err)
return nil, false, err
}
// Fetch the current L2 heads
cCtx, cancel = context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
status, err := l.RollupClient.SyncStatus(cCtx)
if err != nil {
l.Log.Error("proposer unable to get sync status", "err", err)
return nil, false, err
}
// Use either the finalized or safe head depending on the config. Finalized head is default & safer.
var currentBlockNumber *big.Int
if l.Cfg.AllowNonFinalized {
currentBlockNumber = new(big.Int).SetUint64(status.SafeL2.Number)
} else {
currentBlockNumber = new(big.Int).SetUint64(status.FinalizedL2.Number)
}
// Ensure that we do not submit a block in the future
if currentBlockNumber.Cmp(nextCheckpointBlock) < 0 {
l.Log.Debug("proposer submission interval has not elapsed", "currentBlockNumber", currentBlockNumber, "nextBlockNumber", nextCheckpointBlock)
return nil, false, nil
}
return l.fetchOutput(ctx, nextCheckpointBlock)
}
func (l *L2OutputSubmitter) fetchOutput(ctx context.Context, block *big.Int) (*eth.OutputResponse, bool, error) {
ctx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
output, err := l.RollupClient.OutputAtBlock(ctx, block.Uint64())
if err != nil {
l.Log.Error("failed to fetch output at block %d: %w", block, err)
return nil, false, err
}
if output.Version != supportedL2OutputVersion {
l.Log.Error("unsupported l2 output version: %s", output.Version)
return nil, false, errors.New("unsupported l2 output version")
}
if output.BlockRef.Number != block.Uint64() { // sanity check, e.g. in case of bad RPC caching
l.Log.Error("invalid blockNumber: next blockNumber is %v, blockNumber of block is %v", block, output.BlockRef.Number)
return nil, false, errors.New("invalid blockNumber")
}
// Always propose if it's part of the Finalized L2 chain. Or if allowed, if it's part of the safe L2 chain.
if !(output.BlockRef.Number <= output.Status.FinalizedL2.Number || (l.Cfg.AllowNonFinalized && output.BlockRef.Number <= output.Status.SafeL2.Number)) {
l.Log.Debug("not proposing yet, L2 block is not ready for proposal",
"l2_proposal", output.BlockRef,
"l2_safe", output.Status.SafeL2,
"l2_finalized", output.Status.FinalizedL2,
"allow_non_finalized", l.Cfg.AllowNonFinalized)
return nil, false, nil
}
return output, true, nil
}
// ProposeL2OutputTxData creates the transaction data for the ProposeL2Output function
func (l *L2OutputSubmitter) ProposeL2OutputTxData(output *eth.OutputResponse) ([]byte, error) {
return proposeL2OutputTxData(l.l2ooABI, output)
}
// proposeL2OutputTxData creates the transaction data for the ProposeL2Output function
func proposeL2OutputTxData(abi *abi.ABI, output *eth.OutputResponse) ([]byte, error) {
return abi.Pack(
"proposeL2Output",
output.OutputRoot,
new(big.Int).SetUint64(output.BlockRef.Number),
output.Status.CurrentL1.Hash,
new(big.Int).SetUint64(output.Status.CurrentL1.Number))
}
// We wait until l1head advances beyond blocknum. This is used to make sure proposal tx won't
// immediately fail when checking the l1 blockhash. Note that EstimateGas uses "latest" state to
// execute the transaction by default, meaning inside the call, the head block is considered
// "pending" instead of committed. In the case l1blocknum == l1head then, blockhash(l1blocknum)
// will produce a value of 0 within EstimateGas, and the call will fail when the contract checks
// that l1blockhash matches blockhash(l1blocknum).
func (l *L2OutputSubmitter) waitForL1Head(ctx context.Context, blockNum uint64) error {
ticker := time.NewTicker(l.Cfg.PollInterval)
defer ticker.Stop()
l1head, err := l.Txmgr.BlockNumber(ctx)
if err != nil {
return err
}
for l1head <= blockNum {
l.Log.Debug("waiting for l1 head > l1blocknum1+1", "l1head", l1head, "l1blocknum", blockNum)
select {
case <-ticker.C:
l1head, err = l.Txmgr.BlockNumber(ctx)
if err != nil {
return err
}
break
case <-l.done:
return fmt.Errorf("L2OutputSubmitter is done()")
}
}
return nil
}
// sendTransaction creates & sends transactions through the underlying transaction manager.
func (l *L2OutputSubmitter) sendTransaction(ctx context.Context, output *eth.OutputResponse) error {
err := l.waitForL1Head(ctx, output.Status.HeadL1.Number+1)
if err != nil {
return err
}
data, err := l.ProposeL2OutputTxData(output)
if err != nil {
return err
}
receipt, err := l.Txmgr.Send(ctx, txmgr.TxCandidate{
TxData: data,
To: &l.Cfg.L2OutputOracleAddr,
GasLimit: 0,
})
if err != nil {
return err
}
if receipt.Status == types.ReceiptStatusFailed {
l.Log.Error("proposer tx successfully published but reverted", "tx_hash", receipt.TxHash)
} else {
l.Log.Info("proposer tx successfully published",
"tx_hash", receipt.TxHash,
"l1blocknum", output.Status.CurrentL1.Number,
"l1blockhash", output.Status.CurrentL1.Hash)
}
return nil
}
// loop is responsible for creating & submitting the next outputs
func (l *L2OutputSubmitter) loop() {
defer l.wg.Done()
ctx := l.ctx
ticker := time.NewTicker(l.Cfg.PollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
output, shouldPropose, err := l.FetchNextOutputInfo(ctx)
if err != nil {
break
}
if !shouldPropose {
break
}
cCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
if err := l.sendTransaction(cCtx, output); err != nil {
l.Log.Error("Failed to send proposal transaction",
"err", err,
"l1blocknum", output.Status.CurrentL1.Number,
"l1blockhash", output.Status.CurrentL1.Hash,
"l1head", output.Status.HeadL1.Number)
cancel()
break
}
l.Metr.RecordL2BlocksProposed(output.BlockRef)
cancel()
case <-l.done:
return
}
}
}
...@@ -2,429 +2,33 @@ package proposer ...@@ -2,429 +2,33 @@ package proposer
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math/big"
_ "net/http/pprof"
"sync"
"time"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-proposer/flags" "github.com/ethereum-optimism/optimism/op-proposer/flags"
"github.com/ethereum-optimism/optimism/op-proposer/metrics"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/eth"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
var supportedL2OutputVersion = eth.Bytes32{} // Main is the entrypoint into the L2OutputSubmitter.
// This method returns a cliapp.LifecycleAction, to create an op-service CLI-lifecycle-managed L2Output-submitter
// Main is the entrypoint into the L2 Output Submitter. This method executes the func Main(version string) cliapp.LifecycleAction {
// service and blocks until the service exits. return func(cliCtx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) {
func Main(version string, cliCtx *cli.Context) error { if err := flags.CheckRequired(cliCtx); err != nil {
if err := flags.CheckRequired(cliCtx); err != nil { return nil, err
return err
}
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid CLI flags: %w", err)
}
l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.GetHandler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
m := metrics.NewMetrics("default")
l.Info("Initializing L2 Output Submitter")
proposerConfig, err := NewL2OutputSubmitterConfigFromCLIConfig(cfg, l, m)
if err != nil {
l.Error("Unable to create the L2 Output Submitter", "error", err)
return err
}
l2OutputSubmitter, err := NewL2OutputSubmitter(*proposerConfig, l, m)
if err != nil {
l.Error("Unable to create the L2 Output Submitter", "error", err)
return err
}
l.Info("Starting L2 Output Submitter")
if err := l2OutputSubmitter.Start(); err != nil {
l.Error("Unable to start L2 Output Submitter", "error", err)
return err
}
defer l2OutputSubmitter.Stop()
l.Info("L2 Output Submitter started")
pprofConfig := cfg.PprofConfig
if pprofConfig.Enabled {
l.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
l.Error("failed to start pprof server", "err", err)
return err
} }
l.Info("started pprof server", "addr", pprofSrv.Addr()) cfg := NewConfig(cliCtx)
defer func() { if err := cfg.Check(); err != nil {
if err := pprofSrv.Stop(context.Background()); err != nil { return nil, fmt.Errorf("invalid CLI flags: %w", err)
l.Error("failed to stop pprof server", "err", err)
}
}()
}
metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
l.Info("started metrics server", "addr", metricsSrv.Addr())
defer func() {
if err := metricsSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop metrics server", "err", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m.StartBalanceMetrics(ctx, l, proposerConfig.L1Client, proposerConfig.TxManager.From())
}
rpcCfg := cfg.RPCConfig
server := oprpc.NewServer(rpcCfg.ListenAddr, rpcCfg.ListenPort, version, oprpc.WithLogger(l))
if rpcCfg.EnableAdmin {
server.AddAPI(oprpc.ToGethAdminAPI(oprpc.NewCommonAdminAPI(&m.RPCMetrics, l)))
l.Info("Admin RPC enabled")
}
if err := server.Start(); err != nil {
return fmt.Errorf("error starting RPC server: %w", err)
}
m.RecordInfo(version)
m.RecordUp()
opio.BlockOnInterrupts()
return nil
}
// L2OutputSubmitter is responsible for proposing outputs
type L2OutputSubmitter struct {
txMgr txmgr.TxManager
wg sync.WaitGroup
done chan struct{}
log log.Logger
metr metrics.Metricer
ctx context.Context
cancel context.CancelFunc
// RollupClient is used to retrieve output roots from
rollupClient *sources.RollupClient
l2ooContract *bindings.L2OutputOracleCaller
l2ooContractAddr common.Address
l2ooABI *abi.ABI
// AllowNonFinalized enables the proposal of safe, but non-finalized L2 blocks.
// The L1 block-hash embedded in the proposal TX is checked and should ensure the proposal
// is never valid on an alternative L1 chain that would produce different L2 data.
// This option is not necessary when higher proposal latency is acceptable and L1 is healthy.
allowNonFinalized bool
// How frequently to poll L2 for new finalized outputs
pollInterval time.Duration
networkTimeout time.Duration
}
// NewL2OutputSubmitterFromCLIConfig creates a new L2 Output Submitter given the CLI Config
func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*L2OutputSubmitter, error) {
proposerConfig, err := NewL2OutputSubmitterConfigFromCLIConfig(cfg, l, m)
if err != nil {
return nil, err
}
return NewL2OutputSubmitter(*proposerConfig, l, m)
}
// NewL2OutputSubmitterConfigFromCLIConfig creates the proposer config from the CLI config.
func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*Config, error) {
l2ooAddress, err := opservice.ParseAddress(cfg.L2OOAddress)
if err != nil {
return nil, err
}
txManager, err := txmgr.NewSimpleTxManager("proposer", l, m, cfg.TxMgrConfig)
if err != nil {
return nil, err
}
// Connect to L1 and L2 providers. Perform these last since they are the most expensive.
l1Client, err := dial.DialEthClientWithTimeout(context.Background(), dial.DefaultDialTimeout, l, cfg.L1EthRpc)
if err != nil {
return nil, err
}
rollupClient, err := dial.DialRollupClientWithTimeout(context.Background(), dial.DefaultDialTimeout, l, cfg.RollupRpc)
if err != nil {
return nil, err
}
return &Config{
L2OutputOracleAddr: l2ooAddress,
PollInterval: cfg.PollInterval,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
L1Client: l1Client,
RollupClient: rollupClient,
AllowNonFinalized: cfg.AllowNonFinalized,
TxManager: txManager,
}, nil
}
// NewL2OutputSubmitter creates a new L2 Output Submitter
func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2OutputSubmitter, error) {
ctx, cancel := context.WithCancel(context.Background())
l2ooContract, err := bindings.NewL2OutputOracleCaller(cfg.L2OutputOracleAddr, cfg.L1Client)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create L2OO at address %s: %w", cfg.L2OutputOracleAddr, err)
}
cCtx, cCancel := context.WithTimeout(ctx, cfg.NetworkTimeout)
defer cCancel()
version, err := l2ooContract.Version(&bind.CallOpts{Context: cCtx})
if err != nil {
cancel()
return nil, err
}
log.Info("Connected to L2OutputOracle", "address", cfg.L2OutputOracleAddr, "version", version)
parsed, err := bindings.L2OutputOracleMetaData.GetAbi()
if err != nil {
cancel()
return nil, err
}
return &L2OutputSubmitter{
txMgr: cfg.TxManager,
done: make(chan struct{}),
log: l,
ctx: ctx,
cancel: cancel,
metr: m,
rollupClient: cfg.RollupClient,
l2ooContract: l2ooContract,
l2ooContractAddr: cfg.L2OutputOracleAddr,
l2ooABI: parsed,
allowNonFinalized: cfg.AllowNonFinalized,
pollInterval: cfg.PollInterval,
networkTimeout: cfg.NetworkTimeout,
}, nil
}
func (l *L2OutputSubmitter) Start() error {
l.wg.Add(1)
go l.loop()
return nil
}
func (l *L2OutputSubmitter) Stop() {
l.cancel()
close(l.done)
l.wg.Wait()
}
// FetchNextOutputInfo gets the block number of the next proposal.
// It returns: the next block number, if the proposal should be made, error
func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.OutputResponse, bool, error) {
cCtx, cancel := context.WithTimeout(ctx, l.networkTimeout)
defer cancel()
callOpts := &bind.CallOpts{
From: l.txMgr.From(),
Context: cCtx,
}
nextCheckpointBlock, err := l.l2ooContract.NextBlockNumber(callOpts)
if err != nil {
l.log.Error("proposer unable to get next block number", "err", err)
return nil, false, err
}
// Fetch the current L2 heads
cCtx, cancel = context.WithTimeout(ctx, l.networkTimeout)
defer cancel()
status, err := l.rollupClient.SyncStatus(cCtx)
if err != nil {
l.log.Error("proposer unable to get sync status", "err", err)
return nil, false, err
}
// Use either the finalized or safe head depending on the config. Finalized head is default & safer.
var currentBlockNumber *big.Int
if l.allowNonFinalized {
currentBlockNumber = new(big.Int).SetUint64(status.SafeL2.Number)
} else {
currentBlockNumber = new(big.Int).SetUint64(status.FinalizedL2.Number)
}
// Ensure that we do not submit a block in the future
if currentBlockNumber.Cmp(nextCheckpointBlock) < 0 {
l.log.Debug("proposer submission interval has not elapsed", "currentBlockNumber", currentBlockNumber, "nextBlockNumber", nextCheckpointBlock)
return nil, false, nil
}
return l.fetchOutput(ctx, nextCheckpointBlock)
}
func (l *L2OutputSubmitter) fetchOutput(ctx context.Context, block *big.Int) (*eth.OutputResponse, bool, error) {
ctx, cancel := context.WithTimeout(ctx, l.networkTimeout)
defer cancel()
output, err := l.rollupClient.OutputAtBlock(ctx, block.Uint64())
if err != nil {
l.log.Error("failed to fetch output at block %d: %w", block, err)
return nil, false, err
}
if output.Version != supportedL2OutputVersion {
l.log.Error("unsupported l2 output version: %s", output.Version)
return nil, false, errors.New("unsupported l2 output version")
}
if output.BlockRef.Number != block.Uint64() { // sanity check, e.g. in case of bad RPC caching
l.log.Error("invalid blockNumber: next blockNumber is %v, blockNumber of block is %v", block, output.BlockRef.Number)
return nil, false, errors.New("invalid blockNumber")
}
// Always propose if it's part of the Finalized L2 chain. Or if allowed, if it's part of the safe L2 chain.
if !(output.BlockRef.Number <= output.Status.FinalizedL2.Number || (l.allowNonFinalized && output.BlockRef.Number <= output.Status.SafeL2.Number)) {
l.log.Debug("not proposing yet, L2 block is not ready for proposal",
"l2_proposal", output.BlockRef,
"l2_safe", output.Status.SafeL2,
"l2_finalized", output.Status.FinalizedL2,
"allow_non_finalized", l.allowNonFinalized)
return nil, false, nil
}
return output, true, nil
}
// ProposeL2OutputTxData creates the transaction data for the ProposeL2Output function
func (l *L2OutputSubmitter) ProposeL2OutputTxData(output *eth.OutputResponse) ([]byte, error) {
return proposeL2OutputTxData(l.l2ooABI, output)
}
// proposeL2OutputTxData creates the transaction data for the ProposeL2Output function
func proposeL2OutputTxData(abi *abi.ABI, output *eth.OutputResponse) ([]byte, error) {
return abi.Pack(
"proposeL2Output",
output.OutputRoot,
new(big.Int).SetUint64(output.BlockRef.Number),
output.Status.CurrentL1.Hash,
new(big.Int).SetUint64(output.Status.CurrentL1.Number))
}
// We wait until l1head advances beyond blocknum. This is used to make sure proposal tx won't
// immediately fail when checking the l1 blockhash. Note that EstimateGas uses "latest" state to
// execute the transaction by default, meaning inside the call, the head block is considered
// "pending" instead of committed. In the case l1blocknum == l1head then, blockhash(l1blocknum)
// will produce a value of 0 within EstimateGas, and the call will fail when the contract checks
// that l1blockhash matches blockhash(l1blocknum).
func (l *L2OutputSubmitter) waitForL1Head(ctx context.Context, blockNum uint64) error {
ticker := time.NewTicker(l.pollInterval)
defer ticker.Stop()
l1head, err := l.txMgr.BlockNumber(ctx)
if err != nil {
return err
}
for l1head <= blockNum {
l.log.Debug("waiting for l1 head > l1blocknum1+1", "l1head", l1head, "l1blocknum", blockNum)
select {
case <-ticker.C:
l1head, err = l.txMgr.BlockNumber(ctx)
if err != nil {
return err
}
break
case <-l.done:
return fmt.Errorf("L2OutputSubmitter is done()")
} }
}
return nil
}
// sendTransaction creates & sends transactions through the underlying transaction manager. l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
func (l *L2OutputSubmitter) sendTransaction(ctx context.Context, output *eth.OutputResponse) error { oplog.SetGlobalLogHandler(l.GetHandler())
err := l.waitForL1Head(ctx, output.Status.HeadL1.Number+1) opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
if err != nil {
return err
}
data, err := l.ProposeL2OutputTxData(output)
if err != nil {
return err
}
receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
TxData: data,
To: &l.l2ooContractAddr,
GasLimit: 0,
})
if err != nil {
return err
}
if receipt.Status == types.ReceiptStatusFailed {
l.log.Error("proposer tx successfully published but reverted", "tx_hash", receipt.TxHash)
} else {
l.log.Info("proposer tx successfully published",
"tx_hash", receipt.TxHash,
"l1blocknum", output.Status.CurrentL1.Number,
"l1blockhash", output.Status.CurrentL1.Hash)
}
return nil
}
// loop is responsible for creating & submitting the next outputs
func (l *L2OutputSubmitter) loop() {
defer l.wg.Done()
ctx := l.ctx
ticker := time.NewTicker(l.pollInterval) l.Info("Initializing L2Output Submitter")
defer ticker.Stop() return ProposerServiceFromCLIConfig(cliCtx.Context, version, cfg, l)
for {
select {
case <-ticker.C:
output, shouldPropose, err := l.FetchNextOutputInfo(ctx)
if err != nil {
break
}
if !shouldPropose {
break
}
cCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
if err := l.sendTransaction(cCtx, output); err != nil {
l.log.Error("Failed to send proposal transaction",
"err", err,
"l1blocknum", output.Status.CurrentL1.Number,
"l1blockhash", output.Status.CurrentL1.Hash,
"l1head", output.Status.HeadL1.Number)
cancel()
break
}
l.metr.RecordL2BlocksProposed(output.BlockRef)
cancel()
case <-l.done:
return
}
} }
} }
package rpc
import (
"context"
"github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/rpc"
)
type ProposerDriver interface {
StartL2OutputSubmitting() error
StopL2OutputSubmitting() error
}
type adminAPI struct {
*rpc.CommonAdminAPI
b ProposerDriver
}
func NewAdminAPI(dr ProposerDriver, m metrics.RPCMetricer, log log.Logger) *adminAPI {
return &adminAPI{
CommonAdminAPI: rpc.NewCommonAdminAPI(m, log),
b: dr,
}
}
func GetAdminAPI(api *adminAPI) gethrpc.API {
return gethrpc.API{
Namespace: "admin",
Service: api,
}
}
func (a *adminAPI) StartProposer(_ context.Context) error {
return a.b.StartL2OutputSubmitting()
}
func (a *adminAPI) StopProposer(ctx context.Context) error {
return a.b.StopL2OutputSubmitting()
}
package proposer
import (
"context"
"errors"
"fmt"
"io"
"net"
"strconv"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-proposer/metrics"
"github.com/ethereum-optimism/optimism/op-proposer/proposer/rpc"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
type ProposerConfig struct{
// How frequently to poll L2 for new finalized outputs
PollInterval time.Duration
NetworkTimeout time.Duration
L2OutputOracleAddr common.Address
// AllowNonFinalized enables the proposal of safe, but non-finalized L2 blocks.
// The L1 block-hash embedded in the proposal TX is checked and should ensure the proposal
// is never valid on an alternative L1 chain that would produce different L2 data.
// This option is not necessary when higher proposal latency is acceptable and L1 is healthy.
AllowNonFinalized bool
}
type ProposerService struct{
Log log.Logger
Metrics metrics.Metricer
ProposerConfig
TxManager txmgr.TxManager
L1Client *ethclient.Client
RollupClient *sources.RollupClient
driver *L2OutputSubmitter
Version string
pprofSrv *httputil.HTTPServer
metricsSrv *httputil.HTTPServer
rpcServer *oprpc.Server
balanceMetricer io.Closer
stopped atomic.Bool
}
// ProposerServiceFromCLIConfig creates a new ProposerService from a CLIConfig.
// The service components are fully started, except for the driver,
// which will not be submitting state (if it was configured to) until the Start part of the lifecycle.
func ProposerServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) (*ProposerService, error) {
var ps ProposerService
if err := ps.initFromCLIConfig(ctx, version, cfg, log); err != nil {
return nil, errors.Join(err, ps.Stop(ctx)) // try to clean up our failed initialization attempt
}
return &ps, nil
}
func (ps *ProposerService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) error {
ps.Version = version
ps.Log = log
ps.initMetrics(cfg)
ps.PollInterval = cfg.PollInterval
ps.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
ps.AllowNonFinalized = cfg.AllowNonFinalized
if err := ps.initRPCClients(ctx, cfg); err != nil {
return err
}
if err := ps.initTxManager(cfg); err != nil {
return fmt.Errorf("failed to init Tx manager: %w", err)
}
ps.initBalanceMonitor(cfg)
if err := ps.initMetricsServer(cfg); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
if err := ps.initPProf(cfg); err != nil {
return fmt.Errorf("failed to start pprof server: %w", err)
}
if err := ps.initL2ooAddress(cfg); err != nil {
return fmt.Errorf("failed to init L2ooAddress: %w", err)
}
if err := ps.initDriver(); err != nil {
return fmt.Errorf("failed to init Driver: %w", err)
}
if err := ps.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
ps.Metrics.RecordInfo(ps.Version)
ps.Metrics.RecordUp()
return nil
}
func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) error {
l1Client, err := dial.DialEthClientWithTimeout(ctx, dial.DefaultDialTimeout, ps.Log, cfg.L1EthRpc)
if err != nil {
return fmt.Errorf("failed to dial L1 RPC: %w", err)
}
ps.L1Client = l1Client
rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, ps.Log, cfg.RollupRpc)
if err != nil {
return fmt.Errorf("failed to dial L2 rollup-client RPC: %w", err)
}
ps.RollupClient = rollupClient
return nil
}
func (ps *ProposerService) initMetrics(cfg *CLIConfig) {
if cfg.MetricsConfig.Enabled {
procName := "default"
ps.Metrics = metrics.NewMetrics(procName)
} else {
ps.Metrics = metrics.NoopMetrics
}
}
// initBalanceMonitor depends on Metrics, L1Client and TxManager to start background-monitoring of the Proposer balance.
func (ps *ProposerService) initBalanceMonitor(cfg *CLIConfig) {
if cfg.MetricsConfig.Enabled {
ps.balanceMetricer = ps.Metrics.StartBalanceMetrics(ps.Log, ps.L1Client, ps.TxManager.From())
}
}
func (ps *ProposerService) initTxManager(cfg *CLIConfig) error {
txManager, err := txmgr.NewSimpleTxManager("proposer", ps.Log, ps.Metrics, cfg.TxMgrConfig)
if err != nil {
return err
}
ps.TxManager = txManager
return nil
}
func (ps *ProposerService) initPProf(cfg *CLIConfig) error {
if !cfg.PprofConfig.Enabled {
return nil
}
log.Debug("starting pprof server", "addr", net.JoinHostPort(cfg.PprofConfig.ListenAddr, strconv.Itoa(cfg.PprofConfig.ListenPort)))
srv, err := oppprof.StartServer(cfg.PprofConfig.ListenAddr, cfg.PprofConfig.ListenPort)
if err != nil {
return err
}
ps.pprofSrv = srv
log.Info("started pprof server", "addr", srv.Addr())
return nil
}
func (ps *ProposerService) initMetricsServer(cfg *CLIConfig) error {
if !cfg.MetricsConfig.Enabled {
ps.Log.Info("metrics disabled")
return nil
}
m, ok := ps.Metrics.(opmetrics.RegistryMetricer)
if !ok {
return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", ps.Metrics)
}
ps.Log.Debug("starting metrics server", "addr", cfg.MetricsConfig.ListenAddr, "port", cfg.MetricsConfig.ListenPort)
metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.MetricsConfig.ListenAddr, cfg.MetricsConfig.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
ps.Log.Info("started metrics server", "addr", metricsSrv.Addr())
ps.metricsSrv = metricsSrv
return nil
}
func (ps *ProposerService) initL2ooAddress(cfg *CLIConfig) error {
l2ooAddress, err := opservice.ParseAddress(cfg.L2OOAddress)
if err != nil {
return nil
}
ps.L2OutputOracleAddr = l2ooAddress
return nil
}
func (ps *ProposerService) initDriver() error {
driver, err := NewL2OutputSubmitter(DriverSetup{
Log: ps.Log,
Metr: ps.Metrics,
Cfg: ps.ProposerConfig,
Txmgr: ps.TxManager,
L1Client: ps.L1Client,
RollupClient: ps.RollupClient,
})
if err != nil {
return err
}
ps.driver = driver
return nil
}
func (ps *ProposerService) initRPCServer(cfg *CLIConfig) error {
server := oprpc.NewServer(
cfg.RPCConfig.ListenAddr,
cfg.RPCConfig.ListenPort,
ps.Version,
oprpc.WithLogger(ps.Log),
)
if cfg.RPCConfig.EnableAdmin {
adminAPI := rpc.NewAdminAPI(ps.driver, ps.Metrics, ps.Log)
server.AddAPI(rpc.GetAdminAPI(adminAPI))
ps.Log.Info("Admin RPC enabled")
}
ps.Log.Info("Starting JSON-RPC server")
if err := server.Start(); err != nil {
return fmt.Errorf("unable to start RPC server: %w", err)
}
ps.rpcServer = server
return nil
}
// Start runs once upon start of the proposer lifecycle,
// and starts L2Output-submission work if the proposer is configured to start submit data on startup.
func (ps *ProposerService) Start(_ context.Context) error {
ps.driver.Log.Info("Starting Proposer")
return ps.driver.StartL2OutputSubmitting()
}
func (ps *ProposerService) Stopped() bool {
return ps.stopped.Load()
}
// Kill is a convenience method to forcefully, non-gracefully, stop the ProposerService.
func (ps *ProposerService) Kill() error {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ps.Stop(ctx)
}
// Stop fully stops the L2Output-submitter and all its resources gracefully. After stopping, it cannot be restarted.
// See driver.StopL2OutputSubmitting to temporarily stop the L2Output submitter.
func (ps *ProposerService) Stop(ctx context.Context) error {
if ps.stopped.Load() {
return errors.New("already stopped")
}
ps.Log.Info("Stopping Proposer")
var result error
if ps.driver != nil {
if err := ps.driver.StopL2OutputSubmittingIfRunning(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop L2Output submitting: %w", err))
}
}
if ps.rpcServer != nil {
// TODO(7685): the op-service RPC server is not built on top of op-service httputil Server, and has poor shutdown
if err := ps.rpcServer.Stop(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err))
}
}
if ps.pprofSrv != nil {
if err := ps.pprofSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err))
}
}
if ps.balanceMetricer != nil {
if err := ps.balanceMetricer.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close balance metricer: %w", err))
}
}
if ps.metricsSrv != nil {
if err := ps.metricsSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
}
}
if ps.L1Client != nil {
ps.L1Client.Close()
}
if ps.RollupClient != nil {
ps.RollupClient.Close()
}
if result == nil {
ps.stopped.Store(true)
ps.Log.Info("L2Output Submitter stopped")
}
return result
}
var _ cliapp.Lifecycle = (*ProposerService)(nil)
// Driver returns the handler on the L2Output-submitter driver element,
// to start/stop/restart the L2Output-submission work, for use in testing.
func (ps *ProposerService) Driver() rpc.ProposerDriver {
return ps.driver
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment