Commit 5c1e1983 authored by Michael de Hoog's avatar Michael de Hoog Committed by GitHub

[batcher] derive.ChannelOut factory (#12344)

* Add support for a derive.ChannelOut factory

* Add DriverSetupOption for injecting custom options into the DriverSetup

* Remove factory from NewChannelManager and NewChannelBuilder

* Add ChannelOut factory test

* Add comment about why we use a wrapper
parent 1495f6d0
package batcher package batcher
import ( import (
"fmt"
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
...@@ -34,12 +33,8 @@ type channel struct { ...@@ -34,12 +33,8 @@ type channel struct {
maxInclusionBlock uint64 maxInclusionBlock uint64
} }
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*channel, error) { func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64, channelOut derive.ChannelOut) *channel {
cb, err := NewChannelBuilder(cfg, rollupCfg, latestL1OriginBlockNum) cb := NewChannelBuilderWithChannelOut(cfg, rollupCfg, latestL1OriginBlockNum, channelOut)
if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err)
}
return &channel{ return &channel{
log: log, log: log,
metr: metr, metr: metr,
...@@ -47,7 +42,7 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup ...@@ -47,7 +42,7 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
channelBuilder: cb, channelBuilder: cb,
pendingTransactions: make(map[string]txData), pendingTransactions: make(map[string]txData),
confirmedTransactions: make(map[string]eth.BlockID), confirmedTransactions: make(map[string]eth.BlockID),
}, nil }
} }
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
......
...@@ -86,24 +86,28 @@ type ChannelBuilder struct { ...@@ -86,24 +86,28 @@ type ChannelBuilder struct {
// channel out could not be created. // channel out could not be created.
// it acts as a factory for either a span or singular channel out // it acts as a factory for either a span or singular channel out
func NewChannelBuilder(cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error) { func NewChannelBuilder(cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error) {
co, err := newChannelOut(cfg, rollupCfg) co, err := NewChannelOut(cfg, rollupCfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating channel out: %w", err) return nil, fmt.Errorf("creating channel out: %w", err)
} }
return NewChannelBuilderWithChannelOut(cfg, rollupCfg, latestL1OriginBlockNum, co), nil
}
func NewChannelBuilderWithChannelOut(cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64, channelOut derive.ChannelOut) *ChannelBuilder {
cb := &ChannelBuilder{ cb := &ChannelBuilder{
cfg: cfg, cfg: cfg,
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
co: co, co: channelOut,
} }
cb.updateDurationTimeout(latestL1OriginBlockNum) cb.updateDurationTimeout(latestL1OriginBlockNum)
return cb, nil return cb
} }
// newChannelOut creates a new channel out based on the given configuration. // NewChannelOut creates a new channel out based on the given configuration.
func newChannelOut(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) { func NewChannelOut(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) {
spec := rollup.NewChainSpec(rollupCfg) spec := rollup.NewChainSpec(rollupCfg)
if cfg.BatchType == derive.SpanBatchType { if cfg.BatchType == derive.SpanBatchType {
return derive.NewSpanChannelOut( return derive.NewSpanChannelOut(
......
...@@ -18,6 +18,8 @@ import ( ...@@ -18,6 +18,8 @@ import (
var ErrReorg = errors.New("block does not extend existing chain") var ErrReorg = errors.New("block does not extend existing chain")
type ChannelOutFactory func(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error)
// channelManager stores a contiguous set of blocks & turns them into channels. // channelManager stores a contiguous set of blocks & turns them into channels.
// Upon receiving tx confirmation (or a tx failure), it does channel error handling. // Upon receiving tx confirmation (or a tx failure), it does channel error handling.
// //
...@@ -32,6 +34,8 @@ type channelManager struct { ...@@ -32,6 +34,8 @@ type channelManager struct {
cfgProvider ChannelConfigProvider cfgProvider ChannelConfigProvider
rollupCfg *rollup.Config rollupCfg *rollup.Config
outFactory ChannelOutFactory
// All blocks since the last request for new tx data. // All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block] blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel // The latest L1 block from all the L2 blocks in the most recently closed channel
...@@ -59,10 +63,15 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe ...@@ -59,10 +63,15 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe
cfgProvider: cfgProvider, cfgProvider: cfgProvider,
defaultCfg: cfgProvider.ChannelConfig(), defaultCfg: cfgProvider.ChannelConfig(),
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
outFactory: NewChannelOut,
txChannels: make(map[string]*channel), txChannels: make(map[string]*channel),
} }
} }
func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) {
s.outFactory = outFactory
}
// Clear clears the entire state of the channel manager. // Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg. // It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) { func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
...@@ -265,11 +274,14 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -265,11 +274,14 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
// This will be reassessed at channel submission-time, // This will be reassessed at channel submission-time,
// but this is our best guess at the appropriate values for now. // but this is our best guess at the appropriate values for now.
cfg := s.defaultCfg cfg := s.defaultCfg
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
channelOut, err := s.outFactory(cfg, s.rollupCfg)
if err != nil { if err != nil {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating channel out: %w", err)
} }
pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number, channelOut)
s.currentChannel = pc s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc) s.channelQueue = append(s.channelQueue, pc)
......
...@@ -668,3 +668,25 @@ func TestChannelManager_Requeue(t *testing.T) { ...@@ -668,3 +668,25 @@ func TestChannelManager_Requeue(t *testing.T) {
require.NotContains(t, m.blocks, blockA) require.NotContains(t, m.blocks, blockA)
} }
func TestChannelManager_ChannelOutFactory(t *testing.T) {
type ChannelOutWrapper struct {
derive.ChannelOut
}
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.SetChannelOutFactory(func(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) {
co, err := NewChannelOut(cfg, rollupCfg)
if err != nil {
return nil, err
}
// return a wrapper type, to validate that the factory was correctly used by checking the type below
return &ChannelOutWrapper{
ChannelOut: co,
}, nil
})
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co)
}
package batcher package batcher
import ( import (
"fmt"
"io" "io"
"testing" "testing"
...@@ -23,6 +24,14 @@ func zeroFrameTxID(fn uint16) txID { ...@@ -23,6 +24,14 @@ func zeroFrameTxID(fn uint16) txID {
return txID{frameID{frameNumber: fn}} return txID{frameID{frameNumber: fn}}
} }
func newChannelWithChannelOut(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*channel, error) {
channelOut, err := NewChannelOut(cfg, rollupCfg)
if err != nil {
return nil, fmt.Errorf("creating channel out: %w", err)
}
return newChannel(log, metr, cfg, rollupCfg, latestL1OriginBlockNum, channelOut), nil
}
// TestChannelTimeout tests that the channel manager // TestChannelTimeout tests that the channel manager
// correctly identifies when a pending channel is timed out. // correctly identifies when a pending channel is timed out.
func TestChannelTimeout(t *testing.T) { func TestChannelTimeout(t *testing.T) {
...@@ -121,7 +130,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { ...@@ -121,7 +130,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
require := require.New(t) require := require.New(t)
const n = 6 const n = 6
lgr := testlog.Logger(t, log.LevelWarn) lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: false, UseBlobs: false,
TargetNumFrames: n, TargetNumFrames: n,
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
...@@ -162,7 +171,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { ...@@ -162,7 +171,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
require := require.New(t) require := require.New(t)
const n = eth.MaxBlobsPerBlobTx const n = eth.MaxBlobsPerBlobTx
lgr := testlog.Logger(t, log.LevelWarn) lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: true, UseBlobs: true,
TargetNumFrames: n, TargetNumFrames: n,
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
......
...@@ -76,15 +76,16 @@ type RollupClient interface { ...@@ -76,15 +76,16 @@ type RollupClient interface {
// DriverSetup is the collection of input/output interfaces and configuration that the driver operates on. // DriverSetup is the collection of input/output interfaces and configuration that the driver operates on.
type DriverSetup struct { type DriverSetup struct {
Log log.Logger Log log.Logger
Metr metrics.Metricer Metr metrics.Metricer
RollupConfig *rollup.Config RollupConfig *rollup.Config
Config BatcherConfig Config BatcherConfig
Txmgr txmgr.TxManager Txmgr txmgr.TxManager
L1Client L1Client L1Client L1Client
EndpointProvider dial.L2EndpointProvider EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfigProvider ChannelConfig ChannelConfigProvider
AltDA *altda.DAClient AltDA *altda.DAClient
ChannelOutFactory ChannelOutFactory
} }
// BatchSubmitter encapsulates a service responsible for submitting L2 tx // BatchSubmitter encapsulates a service responsible for submitting L2 tx
...@@ -115,9 +116,13 @@ type BatchSubmitter struct { ...@@ -115,9 +116,13 @@ type BatchSubmitter struct {
// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup // NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
state := NewChannelManager(setup.Log, setup.Metr, setup.ChannelConfig, setup.RollupConfig)
if setup.ChannelOutFactory != nil {
state.SetChannelOutFactory(setup.ChannelOutFactory)
}
return &BatchSubmitter{ return &BatchSubmitter{
DriverSetup: setup, DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.ChannelConfig, setup.RollupConfig), state: state,
} }
} }
......
...@@ -75,18 +75,20 @@ type BatcherService struct { ...@@ -75,18 +75,20 @@ type BatcherService struct {
NotSubmittingOnStart bool NotSubmittingOnStart bool
} }
type DriverSetupOption func(setup *DriverSetup)
// BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig. // BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig.
// The service components are fully started, except for the driver, // The service components are fully started, except for the driver,
// which will not be submitting batches (if it was configured to) until the Start part of the lifecycle. // which will not be submitting batches (if it was configured to) until the Start part of the lifecycle.
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) (*BatcherService, error) { func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) (*BatcherService, error) {
var bs BatcherService var bs BatcherService
if err := bs.initFromCLIConfig(ctx, version, cfg, log); err != nil { if err := bs.initFromCLIConfig(ctx, version, cfg, log, opts...); err != nil {
return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt
} }
return &bs, nil return &bs, nil
} }
func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) error { func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) error {
bs.Version = version bs.Version = version
bs.Log = log bs.Log = log
bs.NotSubmittingOnStart = cfg.Stopped bs.NotSubmittingOnStart = cfg.Stopped
...@@ -122,7 +124,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, ...@@ -122,7 +124,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPProf(cfg); err != nil { if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err) return fmt.Errorf("failed to init profiling: %w", err)
} }
bs.initDriver() bs.initDriver(opts...)
if err := bs.initRPCServer(cfg); err != nil { if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err) return fmt.Errorf("failed to start RPC server: %w", err)
} }
...@@ -315,8 +317,8 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error { ...@@ -315,8 +317,8 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
return nil return nil
} }
func (bs *BatcherService) initDriver() { func (bs *BatcherService) initDriver(opts ...DriverSetupOption) {
bs.driver = NewBatchSubmitter(DriverSetup{ ds := DriverSetup{
Log: bs.Log, Log: bs.Log,
Metr: bs.Metrics, Metr: bs.Metrics,
RollupConfig: bs.RollupConfig, RollupConfig: bs.RollupConfig,
...@@ -326,7 +328,11 @@ func (bs *BatcherService) initDriver() { ...@@ -326,7 +328,11 @@ func (bs *BatcherService) initDriver() {
EndpointProvider: bs.EndpointProvider, EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig, ChannelConfig: bs.ChannelConfig,
AltDA: bs.AltDA, AltDA: bs.AltDA,
}) }
for _, opt := range opts {
opt(&ds)
}
bs.driver = NewBatchSubmitter(ds)
} }
func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error { func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment