Commit b4929fa1 authored by Sebastian Stammler's avatar Sebastian Stammler

op-batcher: Add metrics to batcher

parent 10d27cc1
...@@ -12,9 +12,9 @@ import ( ...@@ -12,9 +12,9 @@ import (
gethrpc "github.com/ethereum/go-ethereum/rpc" gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
) )
...@@ -36,9 +36,10 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -36,9 +36,10 @@ func Main(version string, cliCtx *cli.Context) error {
} }
l := oplog.NewLogger(cfg.LogConfig) l := oplog.NewLogger(cfg.LogConfig)
m := metrics.NewMetrics("default")
l.Info("Initializing Batch Submitter") l.Info("Initializing Batch Submitter")
batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l) batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m)
if err != nil { if err != nil {
l.Error("Unable to create Batch Submitter", "error", err) l.Error("Unable to create Batch Submitter", "error", err)
return err return err
...@@ -64,16 +65,15 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -64,16 +65,15 @@ func Main(version string, cliCtx *cli.Context) error {
}() }()
} }
registry := opmetrics.NewRegistry()
metricsCfg := cfg.MetricsConfig metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled { if metricsCfg.Enabled {
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort) l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() { go func() {
if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil { if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", err) l.Error("error starting metrics server", err)
} }
}() }()
opmetrics.LaunchBalanceMetrics(ctx, l, registry, "", batchSubmitter.L1Client, batchSubmitter.From) m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.From)
} }
rpcCfg := cfg.RPCConfig rpcCfg := cfg.RPCConfig
...@@ -95,6 +95,9 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -95,6 +95,9 @@ func Main(version string, cliCtx *cli.Context) error {
return fmt.Errorf("error starting RPC server: %w", err) return fmt.Errorf("error starting RPC server: %w", err)
} }
m.RecordInfo(version)
m.RecordUp()
interruptChannel := make(chan os.Signal, 1) interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, []os.Signal{ signal.Notify(interruptChannel, []os.Signal{
os.Interrupt, os.Interrupt,
......
...@@ -68,7 +68,7 @@ func addNonsenseBlock(cb *channelBuilder) error { ...@@ -68,7 +68,7 @@ func addNonsenseBlock(cb *channelBuilder) error {
a := types.NewBlock(&types.Header{ a := types.NewBlock(&types.Header{
Number: big.NewInt(0), Number: big.NewInt(0),
}, txs, nil, nil, trie.NewStackTrie(nil)) }, txs, nil, nil, trie.NewStackTrie(nil))
err = cb.AddBlock(a) _, err = cb.AddBlock(a)
return err return err
} }
...@@ -98,7 +98,7 @@ func buildTooLargeRlpEncodedBlockBatch(cb *channelBuilder) error { ...@@ -98,7 +98,7 @@ func buildTooLargeRlpEncodedBlockBatch(cb *channelBuilder) error {
// When a batch is constructed from the block and // When a batch is constructed from the block and
// then rlp encoded in the channel out, the size // then rlp encoded in the channel out, the size
// will exceed [derive.MaxRLPBytesPerChannel] // will exceed [derive.MaxRLPBytesPerChannel]
err := cb.AddBlock(block) _, err := cb.AddBlock(block)
return err return err
} }
...@@ -462,7 +462,7 @@ func TestOutputFramesMaxFrameIndex(t *testing.T) { ...@@ -462,7 +462,7 @@ func TestOutputFramesMaxFrameIndex(t *testing.T) {
a := types.NewBlock(&types.Header{ a := types.NewBlock(&types.Header{
Number: big.NewInt(0), Number: big.NewInt(0),
}, txs, nil, nil, trie.NewStackTrie(nil)) }, txs, nil, nil, trie.NewStackTrie(nil))
err = cb.AddBlock(a) _, err = cb.AddBlock(a)
if cb.IsFull() { if cb.IsFull() {
fullErr := cb.FullErr() fullErr := cb.FullErr()
require.ErrorIs(t, fullErr, ErrMaxFrameIndex) require.ErrorIs(t, fullErr, ErrMaxFrameIndex)
......
...@@ -6,7 +6,9 @@ import ( ...@@ -6,7 +6,9 @@ import (
"io" "io"
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -22,8 +24,9 @@ var ErrReorg = errors.New("block does not extend existing chain") ...@@ -22,8 +24,9 @@ var ErrReorg = errors.New("block does not extend existing chain")
// channel. // channel.
// Functions on channelManager are not safe for concurrent access. // Functions on channelManager are not safe for concurrent access.
type channelManager struct { type channelManager struct {
log log.Logger log log.Logger
cfg ChannelConfig metr metrics.Metricer
cfg ChannelConfig
// All blocks since the last request for new tx data. // All blocks since the last request for new tx data.
blocks []*types.Block blocks []*types.Block
...@@ -40,10 +43,12 @@ type channelManager struct { ...@@ -40,10 +43,12 @@ type channelManager struct {
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
} }
func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
return &channelManager{ return &channelManager{
log: log, log: log,
cfg: cfg, metr: metr,
cfg: cfg,
pendingTransactions: make(map[txID]txData), pendingTransactions: make(map[txID]txData),
confirmedTransactions: make(map[txID]eth.BlockID), confirmedTransactions: make(map[txID]eth.BlockID),
} }
...@@ -71,6 +76,8 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -71,6 +76,8 @@ func (s *channelManager) TxFailed(id txID) {
} else { } else {
s.log.Warn("unknown transaction marked as failed", "id", id) s.log.Warn("unknown transaction marked as failed", "id", id)
} }
s.metr.RecordBatchTxFailed()
} }
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in // TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
...@@ -78,7 +85,8 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -78,7 +85,8 @@ func (s *channelManager) TxFailed(id txID) {
// resubmitted. // resubmitted.
// This function may reset the pending channel if the pending channel has timed out. // This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock) s.metr.RecordBatchTxSubmitted()
s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok { if _, ok := s.pendingTransactions[id]; !ok {
s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock) s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions // TODO: This can occur if we clear the channel while there are still pending transactions
...@@ -92,13 +100,15 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { ...@@ -92,13 +100,15 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// If this channel timed out, put the pending blocks back into the local saved blocks // If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel. // and then reset this state so it can try to build a new channel.
if s.pendingChannelIsTimedOut() { if s.pendingChannelIsTimedOut() {
s.log.Warn("Channel timed out", "chID", s.pendingChannel.ID()) s.metr.RecordChannelTimedOut(s.pendingChannel.ID())
s.log.Warn("Channel timed out", "id", s.pendingChannel.ID())
s.blocks = append(s.pendingChannel.Blocks(), s.blocks...) s.blocks = append(s.pendingChannel.Blocks(), s.blocks...)
s.clearPendingChannel() s.clearPendingChannel()
} }
// If we are done with this channel, record that. // If we are done with this channel, record that.
if s.pendingChannelIsFullySubmitted() { if s.pendingChannelIsFullySubmitted() {
s.log.Info("Channel is fully submitted", "chID", s.pendingChannel.ID()) s.metr.RecordChannelFullySubmitted(s.pendingChannel.ID())
s.log.Info("Channel is fully submitted", "id", s.pendingChannel.ID())
s.clearPendingChannel() s.clearPendingChannel()
} }
} }
...@@ -194,8 +204,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -194,8 +204,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
// all pending blocks be included in this channel for submission. // all pending blocks be included in this channel for submission.
s.registerL1Block(l1Head) s.registerL1Block(l1Head)
if err := s.pendingChannel.OutputFrames(); err != nil { if err := s.outputFrames(); err != nil {
return txData{}, fmt.Errorf("creating frames with channel builder: %w", err) return txData{}, err
} }
return s.nextTxData() return s.nextTxData()
...@@ -211,7 +221,11 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error { ...@@ -211,7 +221,11 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating new channel: %w", err)
} }
s.pendingChannel = cb s.pendingChannel = cb
s.log.Info("Created channel", "chID", cb.ID(), "l1Head", l1Head) s.log.Info("Created channel",
"id", cb.ID(),
"l1Head", l1Head,
"blocks_pending", len(s.blocks))
s.metr.RecordChannelOpened(cb.ID(), len(s.blocks))
return nil return nil
} }
...@@ -229,28 +243,27 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) { ...@@ -229,28 +243,27 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
// processBlocks adds blocks from the blocks queue to the pending channel until // processBlocks adds blocks from the blocks queue to the pending channel until
// either the queue got exhausted or the channel is full. // either the queue got exhausted or the channel is full.
func (s *channelManager) processBlocks() error { func (s *channelManager) processBlocks() error {
var blocksAdded int var (
var _chFullErr *ChannelFullError // throw away, just for type checking blocksAdded int
_chFullErr *ChannelFullError // throw away, just for type checking
latestL2ref eth.L2BlockRef
)
for i, block := range s.blocks { for i, block := range s.blocks {
if err := s.pendingChannel.AddBlock(block); errors.As(err, &_chFullErr) { l1info, err := s.pendingChannel.AddBlock(block)
if errors.As(err, &_chFullErr) {
// current block didn't get added because channel is already full // current block didn't get added because channel is already full
break break
} else if err != nil { } else if err != nil {
return fmt.Errorf("adding block[%d] to channel builder: %w", i, err) return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
} }
blocksAdded += 1 blocksAdded += 1
latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
// current block got added but channel is now full // current block got added but channel is now full
if s.pendingChannel.IsFull() { if s.pendingChannel.IsFull() {
break break
} }
} }
s.log.Debug("Added blocks to channel",
"blocks_added", blocksAdded,
"channel_full", s.pendingChannel.IsFull(),
"blocks_pending", len(s.blocks)-blocksAdded,
"input_bytes", s.pendingChannel.InputBytes(),
)
if blocksAdded == len(s.blocks) { if blocksAdded == len(s.blocks) {
// all blocks processed, reuse slice // all blocks processed, reuse slice
s.blocks = s.blocks[:0] s.blocks = s.blocks[:0]
...@@ -258,6 +271,53 @@ func (s *channelManager) processBlocks() error { ...@@ -258,6 +271,53 @@ func (s *channelManager) processBlocks() error {
// remove processed blocks // remove processed blocks
s.blocks = s.blocks[blocksAdded:] s.blocks = s.blocks[blocksAdded:]
} }
s.metr.RecordL2BlocksAdded(latestL2ref,
blocksAdded,
len(s.blocks),
s.pendingChannel.InputBytes(),
s.pendingChannel.ReadyBytes())
s.log.Debug("Added blocks to channel",
"blocks_added", blocksAdded,
"blocks_pending", len(s.blocks),
"channel_full", s.pendingChannel.IsFull(),
"input_bytes", s.pendingChannel.InputBytes(),
"ready_bytes", s.pendingChannel.ReadyBytes(),
)
return nil
}
func (s *channelManager) outputFrames() error {
if err := s.pendingChannel.OutputFrames(); err != nil {
return fmt.Errorf("creating frames with channel builder: %w", err)
}
if !s.pendingChannel.IsFull() {
return nil
}
inBytes, outBytes := s.pendingChannel.InputBytes(), s.pendingChannel.OutputBytes()
s.metr.RecordChannelClosed(
s.pendingChannel.ID(),
len(s.blocks),
s.pendingChannel.NumFrames(),
inBytes,
outBytes,
s.pendingChannel.FullErr(),
)
var comprRatio float64
if inBytes > 0 {
comprRatio = float64(outBytes) / float64(inBytes)
}
s.log.Info("Channel closed",
"id", s.pendingChannel.ID(),
"blocks_pending", len(s.blocks),
"num_frames", s.pendingChannel.NumFrames(),
"input_bytes", inBytes,
"output_bytes", outBytes,
"full_reason", s.pendingChannel.FullErr(),
"compr_ratio", comprRatio,
)
return nil return nil
} }
...@@ -273,3 +333,14 @@ func (s *channelManager) AddL2Block(block *types.Block) error { ...@@ -273,3 +333,14 @@ func (s *channelManager) AddL2Block(block *types.Block) error {
return nil return nil
} }
func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) eth.L2BlockRef {
return eth.L2BlockRef{
Hash: block.Hash(),
Number: block.NumberU64(),
ParentHash: block.ParentHash(),
Time: block.Time(),
L1Origin: eth.BlockID{Hash: l1info.BlockHash, Number: l1info.Number},
SequenceNumber: l1info.SequenceNumber,
}
}
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test" derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
...@@ -23,7 +24,7 @@ import ( ...@@ -23,7 +24,7 @@ import (
func TestPendingChannelTimeout(t *testing.T) { func TestPendingChannelTimeout(t *testing.T) {
// Create a new channel manager with a ChannelTimeout // Create a new channel manager with a ChannelTimeout
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100, ChannelTimeout: 100,
}) })
...@@ -67,7 +68,7 @@ func TestPendingChannelTimeout(t *testing.T) { ...@@ -67,7 +68,7 @@ func TestPendingChannelTimeout(t *testing.T) {
// detects a reorg when it has cached L1 blocks. // detects a reorg when it has cached L1 blocks.
func TestChannelManagerReturnsErrReorg(t *testing.T) { func TestChannelManagerReturnsErrReorg(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, ChannelConfig{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
a := types.NewBlock(&types.Header{ a := types.NewBlock(&types.Header{
Number: big.NewInt(0), Number: big.NewInt(0),
...@@ -101,11 +102,12 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) { ...@@ -101,11 +102,12 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) {
// detects a reorg even if it does not have any blocks inside it. // detects a reorg even if it does not have any blocks inside it.
func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics,
TargetFrameSize: 0, ChannelConfig{
MaxFrameSize: 120_000, TargetFrameSize: 0,
ApproxComprRatio: 1.0, MaxFrameSize: 120_000,
}) ApproxComprRatio: 1.0,
})
l1Block := types.NewBlock(&types.Header{ l1Block := types.NewBlock(&types.Header{
BaseFee: big.NewInt(10), BaseFee: big.NewInt(10),
Difficulty: common.Big0, Difficulty: common.Big0,
...@@ -138,7 +140,7 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -138,7 +140,7 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
// TestChannelManagerNextTxData checks the nextTxData function. // TestChannelManagerNextTxData checks the nextTxData function.
func TestChannelManagerNextTxData(t *testing.T) { func TestChannelManagerNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, ChannelConfig{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
// Nil pending channel should return EOF // Nil pending channel should return EOF
returnedTxData, err := m.nextTxData() returnedTxData, err := m.nextTxData()
...@@ -181,7 +183,7 @@ func TestClearChannelManager(t *testing.T) { ...@@ -181,7 +183,7 @@ func TestClearChannelManager(t *testing.T) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
m := NewChannelManager(log, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
// Need to set the channel timeout here so we don't clear pending // Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed] // channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map // clearing confirmed transactions, and reseting the pendingChannels map
...@@ -254,7 +256,7 @@ func TestClearChannelManager(t *testing.T) { ...@@ -254,7 +256,7 @@ func TestClearChannelManager(t *testing.T) {
func TestChannelManagerTxConfirmed(t *testing.T) { func TestChannelManagerTxConfirmed(t *testing.T) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
// Need to set the channel timeout here so we don't clear pending // Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed] // channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map // clearing confirmed transactions, and reseting the pendingChannels map
...@@ -308,7 +310,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) { ...@@ -308,7 +310,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) {
func TestChannelManagerTxFailed(t *testing.T) { func TestChannelManagerTxFailed(t *testing.T) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, ChannelConfig{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
// Let's add a valid pending transaction to the channel // Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness // manager so we can demonstrate correctness
...@@ -351,11 +353,12 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -351,11 +353,12 @@ func TestChannelManager_TxResend(t *testing.T) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics,
TargetFrameSize: 0, ChannelConfig{
MaxFrameSize: 120_000, TargetFrameSize: 0,
ApproxComprRatio: 1.0, MaxFrameSize: 120_000,
}) ApproxComprRatio: 1.0,
})
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
...@@ -20,18 +21,21 @@ import ( ...@@ -20,18 +21,21 @@ import (
) )
type Config struct { type Config struct {
log log.Logger log log.Logger
L1Client *ethclient.Client metr metrics.Metricer
L2Client *ethclient.Client L1Client *ethclient.Client
RollupNode *sources.RollupClient L2Client *ethclient.Client
PollInterval time.Duration RollupNode *sources.RollupClient
PollInterval time.Duration
From common.Address
TxManagerConfig txmgr.Config TxManagerConfig txmgr.Config
From common.Address
// RollupConfig is queried at startup // RollupConfig is queried at startup
Rollup *rollup.Config Rollup *rollup.Config
// Channel creation parameters // Channel builder parameters
Channel ChannelConfig Channel ChannelConfig
} }
......
...@@ -10,7 +10,9 @@ import ( ...@@ -10,7 +10,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -34,13 +36,14 @@ type BatchSubmitter struct { ...@@ -34,13 +36,14 @@ type BatchSubmitter struct {
// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head. // lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef
state *channelManager state *channelManager
} }
// NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources // NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources
// that will be needed during operation. // that will be needed during operation.
func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitter, error) { func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) {
ctx := context.Background() ctx := context.Background()
signer, fromAddress, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, cfg.SequencerHDPath, cfg.SignerConfig) signer, fromAddress, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, cfg.SequencerHDPath, cfg.SignerConfig)
...@@ -104,12 +107,12 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte ...@@ -104,12 +107,12 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
return nil, err return nil, err
} }
return NewBatchSubmitter(ctx, batcherCfg, l) return NewBatchSubmitter(ctx, batcherCfg, l, m)
} }
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources // NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation. // that will be needed during operation.
func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSubmitter, error) { func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) {
balance, err := cfg.L1Client.BalanceAt(ctx, cfg.From, nil) balance, err := cfg.L1Client.BalanceAt(ctx, cfg.From, nil)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -118,12 +121,14 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSub ...@@ -118,12 +121,14 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSub
cfg.log = l cfg.log = l
cfg.log.Info("creating batch submitter", "submitter_addr", cfg.From, "submitter_bal", balance) cfg.log.Info("creating batch submitter", "submitter_addr", cfg.From, "submitter_bal", balance)
cfg.metr = m
return &BatchSubmitter{ return &BatchSubmitter{
Config: cfg, Config: cfg,
txMgr: NewTransactionManager(l, txMgr: NewTransactionManager(l,
cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID, cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID,
cfg.From, cfg.L1Client), cfg.From, cfg.L1Client),
state: NewChannelManager(l, cfg.Channel), state: NewChannelManager(l, m, cfg.Channel),
}, nil }, nil
} }
...@@ -187,13 +192,16 @@ func (l *BatchSubmitter) Stop() error { ...@@ -187,13 +192,16 @@ func (l *BatchSubmitter) Stop() error {
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
start, end, err := l.calculateL2BlockRangeToStore(ctx) start, end, err := l.calculateL2BlockRangeToStore(ctx)
if err != nil { if err != nil {
l.log.Trace("was not able to calculate L2 block range", "err", err) l.log.Warn("Error calculating L2 block range", "err", err)
return
} else if start.Number == end.Number {
return return
} }
var latestBlock *types.Block
// Add all blocks to "state" // Add all blocks to "state"
for i := start.Number + 1; i < end.Number+1; i++ { for i := start.Number + 1; i < end.Number+1; i++ {
id, err := l.loadBlockIntoState(ctx, i) block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) { if errors.Is(err, ErrReorg) {
l.log.Warn("Found L2 reorg", "block_number", i) l.log.Warn("Found L2 reorg", "block_number", i)
l.state.Clear() l.state.Clear()
...@@ -203,24 +211,34 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { ...@@ -203,24 +211,34 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
l.log.Warn("failed to load block into state", "err", err) l.log.Warn("failed to load block into state", "err", err)
return return
} }
l.lastStoredBlock = id l.lastStoredBlock = eth.ToBlockID(block)
latestBlock = block
} }
l2ref, err := derive.L2BlockToBlockRef(latestBlock, &l.Rollup.Genesis)
if err != nil {
l.log.Warn("Invalid L2 block loaded into state", "err", err)
return
}
l.metr.RecordL2BlocksLoaded(l2ref)
} }
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded. // loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (eth.BlockID, error) { func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(ctx, networkTimeout) ctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
cancel()
if err != nil { if err != nil {
return eth.BlockID{}, err return nil, fmt.Errorf("getting L2 block: %w", err)
} }
if err := l.state.AddL2Block(block); err != nil { if err := l.state.AddL2Block(block); err != nil {
return eth.BlockID{}, err return nil, fmt.Errorf("adding L2 block to state: %w", err)
} }
id := eth.ToBlockID(block)
l.log.Info("added L2 block to local state", "block", id, "tx_count", len(block.Transactions()), "time", block.Time()) l.log.Info("added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time())
return id, nil return block, nil
} }
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
...@@ -283,6 +301,7 @@ func (l *BatchSubmitter) loop() { ...@@ -283,6 +301,7 @@ func (l *BatchSubmitter) loop() {
l.log.Error("Failed to query L1 tip", "error", err) l.log.Error("Failed to query L1 tip", "error", err)
break break
} }
l.recordL1Tip(l1tip)
// Collect next transaction data // Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID()) txdata, err := l.state.TxData(l1tip.ID())
...@@ -316,6 +335,14 @@ func (l *BatchSubmitter) loop() { ...@@ -316,6 +335,14 @@ func (l *BatchSubmitter) loop() {
} }
} }
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
if l.lastL1Tip == l1tip {
return
}
l.lastL1Tip = l1tip
l.metr.RecordLatestL1Block(l1tip)
}
func (l *BatchSubmitter) recordFailedTx(id txID, err error) { func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.log.Warn("Failed to send transaction", "err", err) l.log.Warn("Failed to send transaction", "err", err)
l.state.TxFailed(id) l.state.TxFailed(id)
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"time" "time"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
...@@ -341,7 +342,7 @@ func TestMigration(t *testing.T) { ...@@ -341,7 +342,7 @@ func TestMigration(t *testing.T) {
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(secrets.Batcher), PrivateKey: hexPriv(secrets.Batcher),
}, lgr.New("module", "batcher")) }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
batcher.StopIfRunning() batcher.StopIfRunning()
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
...@@ -600,7 +601,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -600,7 +601,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(cfg.Secrets.Batcher), PrivateKey: hexPriv(cfg.Secrets.Batcher),
}, sys.cfg.Loggers["batcher"]) }, sys.cfg.Loggers["batcher"], batchermetrics.NoopMetrics)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err) return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment