Commit 515de0eb authored by protolambda's avatar protolambda Committed by GitHub

Merge pull request #7290 from testinprod-io/tip/span-batch-batcher

op-batcher: Span Batch Submission
parents 4bc28dae ea8ffece
......@@ -5,6 +5,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
......@@ -26,8 +27,8 @@ type channel struct {
confirmedTransactions map[txID]eth.BlockID
}
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) (*channel, error) {
cb, err := newChannelBuilder(cfg)
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) (*channel, error) {
cb, err := newChannelBuilder(cfg, rcfg)
if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -8,6 +8,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
......@@ -58,6 +59,9 @@ type ChannelConfig struct {
// CompressorConfig contains the configuration for creating new compressors.
CompressorConfig compressor.Config
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
}
// Check validates the [ChannelConfig] parameters.
......@@ -83,6 +87,10 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
}
if cc.BatchType > derive.SpanBatchType {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
return nil
}
......@@ -114,7 +122,7 @@ type channelBuilder struct {
// guaranteed to be a ChannelFullError wrapping the specific reason.
fullErr error
// current channel
co *derive.ChannelOut
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
......@@ -127,12 +135,16 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
func newChannelBuilder(cfg ChannelConfig, rcfg *rollup.Config) (*channelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor()
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c)
var spanBatchBuilder *derive.SpanBatchBuilder
if cfg.BatchType == derive.SpanBatchType {
spanBatchBuilder = derive.NewSpanBatchBuilder(rcfg.Genesis.L2Time, rcfg.L2ChainID)
}
co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatchBuilder)
if err != nil {
return nil, err
}
......@@ -194,12 +206,12 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return derive.L1BlockInfo{}, c.FullErr()
}
batch, l1info, err := derive.BlockToBatch(block)
batch, l1info, err := derive.BlockToSingularBatch(block)
if err != nil {
return l1info, fmt.Errorf("converting block to batch: %w", err)
}
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
if _, err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err)
return l1info, c.FullErr()
} else if err != nil {
......@@ -252,7 +264,7 @@ func (c *channelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set
// timeout.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
func (c *channelBuilder) updateSwTimeout(batch *derive.SingularBatch) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose)
}
......
This diff is collapsed.
......@@ -7,6 +7,7 @@ import (
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
......@@ -28,6 +29,7 @@ type channelManager struct {
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
rcfg *rollup.Config
// All blocks since the last request for new tx data.
blocks []*types.Block
......@@ -45,17 +47,18 @@ type channelManager struct {
closed bool
}
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) *channelManager {
return &channelManager{
log: log,
metr: metr,
cfg: cfg,
rcfg: rcfg,
txChannels: make(map[txID]*channel),
}
}
// Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
......@@ -195,7 +198,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}
pc, err := newChannel(s.log, s.metr, s.cfg)
pc, err := newChannel(s.log, s.metr, s.cfg, s.rcfg)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -19,11 +20,41 @@ import (
"github.com/stretchr/testify/require"
)
// TestChannelManagerReturnsErrReorg ensures that the channel manager
func TestChannelManagerBatchType(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, batchType uint)
}{
{"ChannelManagerReturnsErrReorg", ChannelManagerReturnsErrReorg},
{"ChannelManagerReturnsErrReorgWhenDrained", ChannelManagerReturnsErrReorgWhenDrained},
{"ChannelManager_Clear", ChannelManager_Clear},
{"ChannelManager_TxResend", ChannelManager_TxResend},
{"ChannelManagerCloseBeforeFirstUse", ChannelManagerCloseBeforeFirstUse},
{"ChannelManagerCloseNoPendingChannel", ChannelManagerCloseNoPendingChannel},
{"ChannelManagerClosePendingChannel", ChannelManagerClosePendingChannel},
{"ChannelManagerCloseAllTxsFailed", ChannelManagerCloseAllTxsFailed},
}
for _, test := range tests {
test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, derive.SingularBatchType)
})
}
for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, derive.SpanBatchType)
})
}
}
// ChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks.
func TestChannelManagerReturnsErrReorg(t *testing.T) {
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m.Clear()
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
......@@ -49,9 +80,9 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) {
require.Equal(t, []*types.Block{a, b, c}, m.blocks)
}
// TestChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// detects a reorg even if it does not have any blocks inside it.
func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -61,7 +92,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&rollup.Config{},
)
m.Clear()
a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
......@@ -76,8 +111,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
}
// TestChannelManager_Clear tests clearing the channel manager.
func TestChannelManager_Clear(t *testing.T) {
// ChannelManager_Clear tests clearing the channel manager.
func ChannelManager_Clear(t *testing.T, batchType uint) {
require := require.New(t)
// Create a channel manager
......@@ -96,7 +131,10 @@ func TestChannelManager_Clear(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
// Channel Manager state should be empty by default
require.Empty(m.blocks)
......@@ -104,9 +142,11 @@ func TestChannelManager_Clear(t *testing.T) {
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
require.Empty(m.txChannels)
// Set the last block
m.Clear()
// Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
newL1Tip := a.Hash()
l1BlockID := eth.BlockID{
Hash: a.Hash(),
......@@ -153,7 +193,7 @@ func TestChannelManager_Clear(t *testing.T) {
require.Empty(m.txChannels)
}
func TestChannelManager_TxResend(t *testing.T) {
func ChannelManager_TxResend(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlError)
......@@ -165,9 +205,13 @@ func TestChannelManager_TxResend(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
require.NoError(m.AddL2Block(a))
......@@ -195,9 +239,9 @@ func TestChannelManager_TxResend(t *testing.T) {
require.Len(fs, 1)
}
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// ChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
......@@ -209,9 +253,13 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
m.Close()
......@@ -222,10 +270,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
......@@ -237,7 +285,11 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
......@@ -261,25 +313,35 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: 1000,
MaxFrameSize: 10000,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: 1000,
TargetNumFrames: 1,
TargetFrameSize: 10000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......@@ -306,11 +368,12 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// ChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -321,9 +384,12 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
}, &defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......
......@@ -5,6 +5,7 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -20,7 +21,8 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
})
}, &rollup.Config{})
m.Clear()
// Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel)
......@@ -61,7 +63,8 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelNextTxData checks the nextTxData function.
func TestChannelNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil)
......@@ -109,7 +112,8 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10,
})
}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
......@@ -157,7 +161,8 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
......
......@@ -52,6 +52,8 @@ type CLIConfig struct {
Stopped bool
BatchType uint
TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
......@@ -93,6 +95,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
......
......@@ -74,7 +74,7 @@ type BatchSubmitter struct {
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
return &BatchSubmitter{
DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel),
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel, setup.RollupCfg),
}
}
......
......@@ -173,6 +173,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorConfig: cfg.CompressorConfig.Config(),
BatchType: cfg.BatchType,
}
if err := bs.Channel.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
......
......@@ -76,6 +76,12 @@ var (
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVars: prefixEnvVars("STOPPED"),
}
BatchTypeFlag = &cli.UintFlag{
Name: "batch-type",
Usage: "The batch type. 0 for SingularBatch and 1 for SpanBatch.",
Value: 0,
EnvVars: prefixEnvVars("BATCH_TYPE"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
......@@ -94,6 +100,7 @@ var optionalFlags = []cli.Flag{
MaxL1TxSizeBytesFlag,
StoppedFlag,
SequencerHDPathFlag,
BatchTypeFlag,
}
func init() {
......
......@@ -61,8 +61,11 @@ type ChannelOutIface interface {
OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error)
}
// Compile-time check for ChannelOutIface interface implementation for the ChannelOut type.
var _ ChannelOutIface = (*derive.ChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SingularChannelOut type.
var _ ChannelOutIface = (*derive.SingularChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SpanChannelOut type.
var _ ChannelOutIface = (*derive.SpanChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the GarbageChannelOut type.
var _ ChannelOutIface = (*GarbageChannelOut)(nil)
......
......@@ -140,7 +140,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
ApproxComprRatio: 1,
})
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c)
ch, err = derive.NewChannelOut(derive.SingularBatchType, c, nil)
}
require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch
......
......@@ -49,6 +49,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
......@@ -679,6 +680,10 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
}
batchType := derive.SingularBatchType
if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" {
batchType = derive.SpanBatchType
}
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
......@@ -698,7 +703,8 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Level: log.LvlInfo,
Format: oplog.FormatText,
},
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: uint(batchType),
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"])
......
......@@ -48,7 +48,31 @@ type Compressor interface {
FullErr() error
}
type ChannelOut struct {
type ChannelOut interface {
ID() ChannelID
Reset() error
AddBlock(*types.Block) (uint64, error)
AddSingularBatch(*SingularBatch, uint64) (uint64, error)
InputBytes() int
ReadyBytes() int
Flush() error
FullErr() error
Close() error
OutputFrame(*bytes.Buffer, uint64) (uint16, error)
}
func NewChannelOut(batchType uint, compress Compressor, spanBatchBuilder *SpanBatchBuilder) (ChannelOut, error) {
switch batchType {
case SingularBatchType:
return NewSingularChannelOut(compress)
case SpanBatchType:
return NewSpanChannelOut(compress, spanBatchBuilder)
default:
return nil, fmt.Errorf("unrecognized batch type: %d", batchType)
}
}
type SingularChannelOut struct {
id ChannelID
// Frame ID of the next frame to emit. Increment after emitting
frame uint64
......@@ -61,13 +85,13 @@ type ChannelOut struct {
closed bool
}
func (co *ChannelOut) ID() ChannelID {
func (co *SingularChannelOut) ID() ChannelID {
return co.id
}
func NewChannelOut(compress Compressor) (*ChannelOut, error) {
c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data
func NewSingularChannelOut(compress Compressor) (*SingularChannelOut, error) {
c := &SingularChannelOut{
id: ChannelID{},
frame: 0,
rlpLength: 0,
compress: compress,
......@@ -80,8 +104,7 @@ func NewChannelOut(compress Compressor) (*ChannelOut, error) {
return c, nil
}
// TODO: reuse ChannelOut for performance
func (co *ChannelOut) Reset() error {
func (co *SingularChannelOut) Reset() error {
co.frame = 0
co.rlpLength = 0
co.compress.Reset()
......@@ -94,27 +117,27 @@ func (co *ChannelOut) Reset() error {
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
batch, _, err := BlockToBatch(block)
batch, l1Info, err := BlockToSingularBatch(block)
if err != nil {
return 0, err
}
return co.AddBatch(batch)
return co.AddSingularBatch(batch, l1Info.SequenceNumber)
}
// AddBatch adds a batch to the channel. It returns the RLP encoded byte size
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddBatch should be used together with BlockToBatch if you need to access the
// AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
......@@ -122,7 +145,7 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
// We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil {
if err := rlp.Encode(&buf, NewSingularBatchData(*batch)); err != nil {
return 0, err
}
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
......@@ -137,28 +160,28 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
}
// InputBytes returns the total amount of RLP-encoded input bytes.
func (co *ChannelOut) InputBytes() int {
func (co *SingularChannelOut) InputBytes() int {
return co.rlpLength
}
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int {
func (co *SingularChannelOut) ReadyBytes() int {
return co.compress.Len()
}
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
// complete frame. It reduces the compression efficiency.
func (co *ChannelOut) Flush() error {
func (co *SingularChannelOut) Flush() error {
return co.compress.Flush()
}
func (co *ChannelOut) FullErr() error {
func (co *SingularChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *ChannelOut) Close() error {
func (co *SingularChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
}
......@@ -173,28 +196,13 @@ func (co *ChannelOut) Close() error {
// Returns io.EOF when the channel is closed & there are no more frames.
// Returns nil if there is still more buffered data.
// Returns an error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
f := Frame{
ID: co.id,
FrameNumber: uint16(co.frame),
}
func (co *SingularChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
// Check that the maxSize is large enough for the frame overhead size.
if maxSize < FrameV0OverHeadSize {
return 0, ErrMaxFrameSizeTooSmall
}
// Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(co.compress.Len()) {
maxDataSize = uint64(co.compress.Len())
// If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if co.closed {
f.IsLast = true
}
}
f.Data = make([]byte, maxDataSize)
f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)
if _, err := io.ReadFull(co.compress, f.Data); err != nil {
return 0, err
......@@ -213,8 +221,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
}
}
// BlockToBatch transforms a block into a batch object that can easily be RLP encoded.
func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) {
// BlockToSingularBatch transforms a block into a batch object that can easily be RLP encoded.
func BlockToSingularBatch(block *types.Block) (*SingularBatch, L1BlockInfo, error) {
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType {
......@@ -238,15 +246,13 @@ func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) {
return nil, l1Info, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
}
return NewSingularBatchData(
SingularBatch{
ParentHash: block.ParentHash(),
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
},
), l1Info, nil
return &SingularBatch{
ParentHash: block.ParentHash(),
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
}, l1Info, nil
}
// ForceCloseTxData generates the transaction data for a transaction which will force close
......@@ -303,3 +309,24 @@ func ForceCloseTxData(frames []Frame) ([]byte, error) {
return out.Bytes(), nil
}
// createEmptyFrame creates new empty Frame with given information. Frame data must be copied from ChannelOut.
func createEmptyFrame(id ChannelID, frame uint64, readyBytes int, closed bool, maxSize uint64) *Frame {
f := Frame{
ID: id,
FrameNumber: uint16(frame),
}
// Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(readyBytes) {
maxDataSize = uint64(readyBytes)
// If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if closed {
f.IsLast = true
}
}
f.Data = make([]byte, maxDataSize)
return &f
}
......@@ -29,7 +29,7 @@ func (s *nonCompressor) FullErr() error {
}
func TestChannelOutAddBlock(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{})
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil)
require.NoError(t, err)
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) {
......@@ -50,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) {
// max size that is below the fixed frame size overhead of 23, will return
// an error.
func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{})
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil)
require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err
......@@ -97,42 +97,42 @@ func TestForceCloseTxData(t *testing.T) {
output: "",
},
{
frames: []Frame{Frame{FrameNumber: 0, IsLast: false}, Frame{ID: id, FrameNumber: 1, IsLast: true}},
frames: []Frame{{FrameNumber: 0, IsLast: false}, {ID: id, FrameNumber: 1, IsLast: true}},
errors: true,
output: "",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 0, IsLast: false}},
frames: []Frame{{ID: id, FrameNumber: 0, IsLast: false}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 0, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 0, IsLast: true}},
errors: false,
output: "00",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 2, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 2, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00010000000000",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}, Frame{ID: id, FrameNumber: 3, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}, {ID: id, FrameNumber: 3, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}, Frame{ID: id, FrameNumber: 3, IsLast: true}, Frame{ID: id, FrameNumber: 5, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}, {ID: id, FrameNumber: 3, IsLast: true}, {ID: id, FrameNumber: 5, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000",
},
......@@ -152,6 +152,6 @@ func TestForceCloseTxData(t *testing.T) {
func TestBlockToBatchValidity(t *testing.T) {
block := new(types.Block)
_, _, err := BlockToBatch(block)
_, _, err := BlockToSingularBatch(block)
require.ErrorContains(t, err, "has no transactions")
}
......@@ -597,31 +597,32 @@ func NewSpanBatch(singularBatches []*SingularBatch) *SpanBatch {
// SpanBatchBuilder is a utility type to build a SpanBatch by adding a SingularBatch one by one.
// makes easier to stack SingularBatches and convert to RawSpanBatch for encoding.
type SpanBatchBuilder struct {
parentEpoch uint64
genesisTimestamp uint64
chainID *big.Int
spanBatch *SpanBatch
originChangedBit uint
}
func NewSpanBatchBuilder(parentEpoch uint64, genesisTimestamp uint64, chainID *big.Int) *SpanBatchBuilder {
func NewSpanBatchBuilder(genesisTimestamp uint64, chainID *big.Int) *SpanBatchBuilder {
return &SpanBatchBuilder{
parentEpoch: parentEpoch,
genesisTimestamp: genesisTimestamp,
chainID: chainID,
spanBatch: &SpanBatch{},
}
}
func (b *SpanBatchBuilder) AppendSingularBatch(singularBatch *SingularBatch) {
func (b *SpanBatchBuilder) AppendSingularBatch(singularBatch *SingularBatch, seqNum uint64) {
if b.GetBlockCount() == 0 {
b.originChangedBit = 0
if seqNum == 0 {
b.originChangedBit = 1
}
}
b.spanBatch.AppendSingularBatch(singularBatch)
}
func (b *SpanBatchBuilder) GetRawSpanBatch() (*RawSpanBatch, error) {
originChangedBit := 0
if uint64(b.spanBatch.GetStartEpochNum()) != b.parentEpoch {
originChangedBit = 1
}
raw, err := b.spanBatch.ToRawSpanBatch(uint(originChangedBit), b.genesisTimestamp, b.chainID)
raw, err := b.spanBatch.ToRawSpanBatch(b.originChangedBit, b.genesisTimestamp, b.chainID)
if err != nil {
return nil, err
}
......
......@@ -493,16 +493,16 @@ func TestSpanBatchBuilder(t *testing.T) {
}
genesisTimeStamp := 1 + singularBatches[0].Timestamp - 128
parentEpoch := uint64(singularBatches[0].EpochNum)
var seqNum uint64 = 1
if originChangedBit == 1 {
parentEpoch -= 1
seqNum = 0
}
spanBatchBuilder := NewSpanBatchBuilder(parentEpoch, genesisTimeStamp, chainID)
spanBatchBuilder := NewSpanBatchBuilder(genesisTimeStamp, chainID)
assert.Equal(t, 0, spanBatchBuilder.GetBlockCount())
for i := 0; i < len(singularBatches); i++ {
spanBatchBuilder.AppendSingularBatch(singularBatches[i])
spanBatchBuilder.AppendSingularBatch(singularBatches[i], seqNum)
assert.Equal(t, i+1, spanBatchBuilder.GetBlockCount())
assert.Equal(t, singularBatches[0].ParentHash.Bytes()[:20], spanBatchBuilder.spanBatch.parentCheck)
assert.Equal(t, singularBatches[i].EpochHash.Bytes()[:20], spanBatchBuilder.spanBatch.l1OriginCheck)
......
package derive
import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"io"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
type SpanChannelOut struct {
id ChannelID
// Frame ID of the next frame to emit. Increment after emitting
frame uint64
// rlpLength is the uncompressed size of the channel. Must be less than MAX_RLP_BYTES_PER_CHANNEL
rlpLength int
// Compressor stage. Write input data to it
compress Compressor
// closed indicates if the channel is closed
closed bool
// spanBatchBuilder contains information requires to build SpanBatch
spanBatchBuilder *SpanBatchBuilder
// reader contains compressed data for making output frames
reader *bytes.Buffer
}
func (co *SpanChannelOut) ID() ChannelID {
return co.id
}
func NewSpanChannelOut(compress Compressor, spanBatchBuilder *SpanBatchBuilder) (*SpanChannelOut, error) {
c := &SpanChannelOut{
id: ChannelID{},
frame: 0,
rlpLength: 0,
compress: compress,
spanBatchBuilder: spanBatchBuilder,
reader: &bytes.Buffer{},
}
_, err := rand.Read(c.id[:])
if err != nil {
return nil, err
}
return c, nil
}
func (co *SpanChannelOut) Reset() error {
co.frame = 0
co.rlpLength = 0
co.compress.Reset()
co.reader.Reset()
co.closed = false
co.spanBatchBuilder.Reset()
_, err := rand.Read(co.id[:])
return err
}
// AddBlock adds a block to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
batch, l1Info, err := BlockToSingularBatch(block)
if err != nil {
return 0, err
}
return co.AddSingularBatch(batch, l1Info.SequenceNumber)
}
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddSingularBatch should be used together with BlockToSingularBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
//
// SingularBatch is appended to the channel's SpanBatch.
// A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed.
// So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed.
// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch.
func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
if co.FullErr() != nil {
// channel is already full
return 0, co.FullErr()
}
var buf bytes.Buffer
// Append Singular batch to its span batch builder
co.spanBatchBuilder.AppendSingularBatch(batch, seqNum)
// Convert Span batch to RawSpanBatch
rawSpanBatch, err := co.spanBatchBuilder.GetRawSpanBatch()
if err != nil {
return 0, fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err)
}
// Encode RawSpanBatch into bytes
if err = rlp.Encode(&buf, NewSpanBatchData(*rawSpanBatch)); err != nil {
return 0, fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err)
}
// Ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
if buf.Len() > MaxRLPBytesPerChannel {
return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
}
co.rlpLength = buf.Len()
if co.spanBatchBuilder.GetBlockCount() > 1 {
// Flush compressed data into reader to preserve current result.
// If the channel is full after this block is appended, we should use preserved data.
if err := co.compress.Flush(); err != nil {
return 0, fmt.Errorf("failed to flush compressor: %w", err)
}
_, err = io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return 0, fmt.Errorf("failed to copy compressed data to reader: %w", err)
}
}
// Reset compressor to rewrite the entire span batch
co.compress.Reset()
// Avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes())
if co.compress.FullErr() != nil {
err = co.compress.FullErr()
if co.spanBatchBuilder.GetBlockCount() == 1 {
// Do not return CompressorFullErr for the first block in the batch
// In this case, reader must be empty. then the contents of compressor will be copied to reader when the channel is closed.
err = nil
}
// If there are more than one blocks in the channel, reader should have data that preserves previous compression result before adding this block.
// So, as a result, this block is not added to the channel and the channel will be closed.
return uint64(written), err
}
// If compressor is not full yet, reader must be reset to avoid submitting invalid frames
co.reader.Reset()
return uint64(written), err
}
// InputBytes returns the total amount of RLP-encoded input bytes.
func (co *SpanChannelOut) InputBytes() int {
return co.rlpLength
}
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *SpanChannelOut) ReadyBytes() int {
return co.reader.Len()
}
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
// complete frame. It reduces the compression efficiency.
func (co *SpanChannelOut) Flush() error {
if err := co.compress.Flush(); err != nil {
return err
}
if co.closed && co.ReadyBytes() == 0 && co.compress.Len() > 0 {
_, err := io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return fmt.Errorf("failed to flush compressed data to reader: %w", err)
}
}
return nil
}
func (co *SpanChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *SpanChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
}
co.closed = true
if err := co.Flush(); err != nil {
return err
}
return co.compress.Close()
}
// OutputFrame writes a frame to w with a given max size and returns the frame
// number.
// Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer.
// Returns an error if the `maxSize` < FrameV0OverHeadSize.
// Returns io.EOF when the channel is closed & there are no more frames.
// Returns nil if there is still more buffered data.
// Returns an error if it ran into an error during processing.
func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
// Check that the maxSize is large enough for the frame overhead size.
if maxSize < FrameV0OverHeadSize {
return 0, ErrMaxFrameSizeTooSmall
}
f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)
if _, err := io.ReadFull(co.reader, f.Data); err != nil {
return 0, err
}
if err := f.MarshalBinary(w); err != nil {
return 0, err
}
co.frame += 1
fn := f.FrameNumber
if f.IsLast {
return fn, io.EOF
} else {
return fn, nil
}
}
package test
import (
"math/big"
"math/rand"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
......@@ -21,3 +22,13 @@ func RandomL2Block(rng *rand.Rand, txCount int) (*types.Block, []*types.Receipt)
}
return testutils.RandomBlockPrependTxs(rng, txCount, types.NewTx(l1InfoTx))
}
func RandomL2BlockWithChainId(rng *rand.Rand, txCount int, chainId *big.Int) *types.Block {
signer := types.NewLondonSigner(chainId)
block, _ := RandomL2Block(rng, 0)
txs := []*types.Transaction{block.Transactions()[0]} // L1 info deposit TX
for i := 0; i < txCount; i++ {
txs = append(txs, testutils.RandomTx(rng, big.NewInt(int64(rng.Uint32())), signer))
}
return block.WithBody(txs, nil)
}
......@@ -159,6 +159,7 @@ services:
OP_BATCHER_PPROF_ENABLED: "true"
OP_BATCHER_METRICS_ENABLED: "true"
OP_BATCHER_RPC_ENABLE_ADMIN: "true"
OP_BATCHER_BATCH_TYPE: 0
artifact-server:
depends_on:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment