Commit 106993f8 authored by George Knee's avatar George Knee Committed by GitHub

op-batcher: Move decision about data availability type to channel submission time (#12002)

* tidy up godoc

* move data availability config decision to channel submission time

instead of channel creation time

Also, cache the ChannelConfig whenever we switch DA type so it is used by default for new channels

* fix test

* formatting changes

* respond to PR comments

* add unit test for Requeue method

* reduce number of txs in test block

* improve test (more blocks in queue)

* hoist pending tx management up

* wip

* tidy up test

* wip

* fix

* refactor to do requeue before calling nextTxData

* introduce ErrInsufficientData

do not return nextTxData from channel which was discarded by requeue

* run test until nonzero data is returned by TxData

* break up and improve error logic

* fix test to anticipate ErrInsufficientData

* after requeuing, call nextTxData again

* remove unecessary checks

* move err declaration to top of file

* add some comments and whitespace

* hoist lock back up to TxData

* rename variable to blocksToRequeue

* remove panic

* add comment

* use deterministic rng and nonecompressor in test

* test: increase block size to fill channel more quickly

* remove ErrInsufficientData

replace with io.EOF as before

* tidy up

* typo
parent 4f1e8a70
...@@ -155,9 +155,9 @@ func (s *channel) ID() derive.ChannelID { ...@@ -155,9 +155,9 @@ func (s *channel) ID() derive.ChannelID {
return s.channelBuilder.ID() return s.channelBuilder.ID()
} }
// NextTxData returns the next tx data packet. // NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.MultiFrameTxs is false, it returns txData with a single frame. // If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.MultiFrameTxs is true, it will read frames from its channel builder // If cfg.UseBlobs is true, it will read frames from its channel builder
// until it either doesn't have more frames or the target number of frames is reached. // until it either doesn't have more frames or the target number of frames is reached.
// //
// NextTxData should only be called after HasTxData returned true. // NextTxData should only be called after HasTxData returned true.
...@@ -177,10 +177,11 @@ func (s *channel) NextTxData() txData { ...@@ -177,10 +177,11 @@ func (s *channel) NextTxData() txData {
} }
func (s *channel) HasTxData() bool { func (s *channel) HasTxData() bool {
if s.IsFull() || !s.cfg.UseBlobs { if s.IsFull() || // If the channel is full, we should start to submit it
!s.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return s.channelBuilder.HasFrame() return s.channelBuilder.HasFrame()
} }
// collect enough frames if channel is not full yet // Collect enough frames if channel is not full yet
return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx()) return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx())
} }
......
...@@ -417,12 +417,12 @@ func (c *ChannelBuilder) HasFrame() bool { ...@@ -417,12 +417,12 @@ func (c *ChannelBuilder) HasFrame() bool {
} }
// PendingFrames returns the number of pending frames in the frames queue. // PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrames() returns true. // It is larger zero iff HasFrame() returns true.
func (c *ChannelBuilder) PendingFrames() int { func (c *ChannelBuilder) PendingFrames() int {
return len(c.frames) return len(c.frames)
} }
// NextFrame returns the next available frame. // NextFrame dequeues the next available frame.
// HasFrame must be called prior to check if there's a next frame available. // HasFrame must be called prior to check if there's a next frame available.
// Panics if called when there's no next frame. // Panics if called when there's no next frame.
func (c *ChannelBuilder) NextFrame() frameData { func (c *ChannelBuilder) NextFrame() frameData {
......
...@@ -51,8 +51,8 @@ type ChannelConfig struct { ...@@ -51,8 +51,8 @@ type ChannelConfig struct {
UseBlobs bool UseBlobs bool
} }
// ChannelConfig returns a copy of itself. This makes a ChannelConfig a static // ChannelConfig returns a copy of the receiver.
// ChannelConfigProvider of itself. // This allows the receiver to be a static ChannelConfigProvider of itself.
func (cc ChannelConfig) ChannelConfig() ChannelConfig { func (cc ChannelConfig) ChannelConfig() ChannelConfig {
return cc return cc
} }
......
...@@ -48,6 +48,10 @@ func NewDynamicEthChannelConfig(lgr log.Logger, ...@@ -48,6 +48,10 @@ func NewDynamicEthChannelConfig(lgr log.Logger,
return dec return dec
} }
// ChannelConfig will perform an estimate of the cost per byte for
// calldata and for blobs, given current market conditions: it will return
// the appropriate ChannelConfig depending on which is cheaper. It makes
// assumptions about the typical makeup of channel data.
func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig { func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig {
ctx, cancel := context.WithTimeout(context.Background(), dec.timeout) ctx, cancel := context.WithTimeout(context.Background(), dec.timeout)
defer cancel() defer cancel()
......
...@@ -35,6 +35,8 @@ type channelManager struct { ...@@ -35,6 +35,8 @@ type channelManager struct {
blocks []*types.Block blocks []*types.Block
// The latest L1 block from all the L2 blocks in the most recently closed channel // The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID l1OriginLastClosedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
defaultCfg ChannelConfig
// last block hash - for reorg detection // last block hash - for reorg detection
tip common.Hash tip common.Hash
...@@ -54,6 +56,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe ...@@ -54,6 +56,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe
log: log, log: log,
metr: metr, metr: metr,
cfgProvider: cfgProvider, cfgProvider: cfgProvider,
defaultCfg: cfgProvider.ChannelConfig(),
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
txChannels: make(map[string]*channel), txChannels: make(map[string]*channel),
} }
...@@ -133,7 +136,8 @@ func (s *channelManager) removePendingChannel(channel *channel) { ...@@ -133,7 +136,8 @@ func (s *channelManager) removePendingChannel(channel *channel) {
s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...) s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...)
} }
// nextTxData pops off s.datas & handles updating the internal state // nextTxData dequeues frames from the channel and returns them encoded in a transaction.
// It also updates the internal tx -> channels mapping
func (s *channelManager) nextTxData(channel *channel) (txData, error) { func (s *channelManager) nextTxData(channel *channel) (txData, error) {
if channel == nil || !channel.HasTxData() { if channel == nil || !channel.HasTxData() {
s.log.Trace("no next tx data") s.log.Trace("no next tx data")
...@@ -146,12 +150,51 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { ...@@ -146,12 +150,51 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// TxData returns the next tx data that should be submitted to L1. // TxData returns the next tx data that should be submitted to L1.
// //
// If the pending channel is // If the current channel is
// full, it only returns the remaining frames of this channel until it got // full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending tx data. // successfully fully sent to L1. It returns io.EOF if there's no pending tx data.
//
// It will decide whether to switch DA type automatically.
// When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
channel, err := s.getReadyChannel(l1Head)
if err != nil {
return emptyTxData, err
}
// If the channel has already started being submitted,
// return now and ensure no requeueing happens
if !channel.NoneSubmitted() {
return s.nextTxData(channel)
}
// Call provider method to reassess optimal DA type
newCfg := s.cfgProvider.ChannelConfig()
// No change:
if newCfg.UseBlobs == s.defaultCfg.UseBlobs {
s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type",
"useBlobs", s.defaultCfg.UseBlobs)
return s.nextTxData(channel)
}
// Change:
s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...",
"useBlobsBefore", s.defaultCfg.UseBlobs,
"useBlobsAfter", newCfg.UseBlobs)
s.Requeue(newCfg)
channel, err = s.getReadyChannel(l1Head)
if err != nil {
return emptyTxData, err
}
return s.nextTxData(channel)
}
// getReadyChannel returns the next channel ready to submit data, or an error.
// It adds blocks from the block queue to the current channel and generates frames for it.
func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
var firstWithTxData *channel var firstWithTxData *channel
for _, ch := range s.channelQueue { for _, ch := range s.channelQueue {
if ch.HasTxData() { if ch.HasTxData() {
...@@ -160,27 +203,31 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -160,27 +203,31 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
} }
} }
dataPending := firstWithTxData != nil && firstWithTxData.HasTxData() dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is pending tx data or the channel manager is closed. // Short circuit if there is pending tx data or the channel manager is closed
if dataPending || s.closed { if dataPending {
return s.nextTxData(firstWithTxData) return firstWithTxData, nil
}
if s.closed {
return nil, io.EOF
} }
// No pending tx data, so we have to add new blocks to the channel // No pending tx data, so we have to add new blocks to the channel
// If we have no saved blocks, we will not be able to create valid frames // If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 { if len(s.blocks) == 0 {
return txData{}, io.EOF return nil, io.EOF
} }
if err := s.ensureChannelWithSpace(l1Head); err != nil { if err := s.ensureChannelWithSpace(l1Head); err != nil {
return txData{}, err return nil, err
} }
if err := s.processBlocks(); err != nil { if err := s.processBlocks(); err != nil {
return txData{}, err return nil, err
} }
// Register current L1 head only after all pending blocks have been // Register current L1 head only after all pending blocks have been
...@@ -189,10 +236,10 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -189,10 +236,10 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.registerL1Block(l1Head) s.registerL1Block(l1Head)
if err := s.outputFrames(); err != nil { if err := s.outputFrames(); err != nil {
return txData{}, err return nil, err
} }
return s.nextTxData(s.currentChannel) return s.currentChannel, nil
} }
// ensureChannelWithSpace ensures currentChannel is populated with a channel that has // ensureChannelWithSpace ensures currentChannel is populated with a channel that has
...@@ -203,7 +250,10 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -203,7 +250,10 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil return nil
} }
cfg := s.cfgProvider.ChannelConfig() // We reuse the ChannelConfig from the last channel.
// This will be reassessed at channel submission-time,
// but this is our best guess at the appropriate values for now.
cfg := s.defaultCfg
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number) pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil { if err != nil {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating new channel: %w", err)
...@@ -228,7 +278,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -228,7 +278,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil return nil
} }
// registerL1Block registers the given block at the pending channel. // registerL1Block registers the given block at the current channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) { func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.currentChannel.CheckTimeout(l1Head.Number) s.currentChannel.CheckTimeout(l1Head.Number)
s.log.Debug("new L1-block registered at channel builder", s.log.Debug("new L1-block registered at channel builder",
...@@ -238,7 +288,7 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) { ...@@ -238,7 +288,7 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
) )
} }
// processBlocks adds blocks from the blocks queue to the pending channel until // processBlocks adds blocks from the blocks queue to the current channel until
// either the queue got exhausted or the channel is full. // either the queue got exhausted or the channel is full.
func (s *channelManager) processBlocks() error { func (s *channelManager) processBlocks() error {
var ( var (
...@@ -288,6 +338,7 @@ func (s *channelManager) processBlocks() error { ...@@ -288,6 +338,7 @@ func (s *channelManager) processBlocks() error {
return nil return nil
} }
// outputFrames generates frames for the current channel, and computes and logs the compression ratio
func (s *channelManager) outputFrames() error { func (s *channelManager) outputFrames() error {
if err := s.currentChannel.OutputFrames(); err != nil { if err := s.currentChannel.OutputFrames(); err != nil {
return fmt.Errorf("creating frames with channel builder: %w", err) return fmt.Errorf("creating frames with channel builder: %w", err)
...@@ -339,6 +390,7 @@ func (s *channelManager) outputFrames() error { ...@@ -339,6 +390,7 @@ func (s *channelManager) outputFrames() error {
func (s *channelManager) AddL2Block(block *types.Block) error { func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg return ErrReorg
} }
...@@ -414,3 +466,26 @@ func (s *channelManager) Close() error { ...@@ -414,3 +466,26 @@ func (s *channelManager) Close() error {
} }
return nil return nil
} }
// Requeue rebuilds the channel manager state by
// rewinding blocks back from the channel queue, and setting the defaultCfg.
func (s *channelManager) Requeue(newCfg ChannelConfig) {
newChannelQueue := []*channel{}
blocksToRequeue := []*types.Block{}
for _, channel := range s.channelQueue {
if !channel.NoneSubmitted() {
newChannelQueue = append(newChannelQueue, channel)
continue
}
blocksToRequeue = append(blocksToRequeue, channel.channelBuilder.Blocks()...)
}
// We put the blocks back at the front of the queue:
s.blocks = append(blocksToRequeue, s.blocks...)
// Channels which where already being submitted are put back
s.channelQueue = newChannelQueue
s.currentChannel = nil
// Setting the defaultCfg will cause new channels
// to pick up the new ChannelConfig
s.defaultCfg = newCfg
}
package batcher package batcher
import ( import (
"errors"
"io" "io"
"math/big" "math/big"
"math/rand" "math/rand"
...@@ -483,3 +484,174 @@ func TestChannelManager_ChannelCreation(t *testing.T) { ...@@ -483,3 +484,174 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
}) })
} }
} }
// FakeDynamicEthChannelConfig is a ChannelConfigProvider which always returns
// either a blob- or calldata-based config depending on its internal chooseBlob
// switch.
type FakeDynamicEthChannelConfig struct {
DynamicEthChannelConfig
chooseBlobs bool
}
func (f *FakeDynamicEthChannelConfig) ChannelConfig() ChannelConfig {
if f.chooseBlobs {
return f.blobConfig
}
return f.calldataConfig
}
func newFakeDynamicEthChannelConfig(lgr log.Logger,
reqTimeout time.Duration) *FakeDynamicEthChannelConfig {
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
}
calldataCfg.InitNoneCompressor()
blobCfg.InitNoneCompressor()
return &FakeDynamicEthChannelConfig{
chooseBlobs: false,
DynamicEthChannelConfig: *NewDynamicEthChannelConfig(
lgr,
reqTimeout,
&mockGasPricer{},
blobCfg,
calldataCfg),
}
}
// TestChannelManager_TxData seeds the channel manager with blocks and triggers the
// blocks->channels pipeline multiple times. Values are chosen such that a channel
// is created under one set of market conditions, and then submitted under a different
// set of market conditions. The test asserts that the DA type is changed at channel
// submission time.
func TestChannelManager_TxData(t *testing.T) {
type TestCase struct {
name string
chooseBlobsWhenChannelCreated bool
chooseBlobsWhenChannelSubmitted bool
}
tt := []TestCase{
{"blobs->blobs", true, true},
{"calldata->calldata", false, false},
{"blobs->calldata", true, false},
{"calldata->blobs", false, true},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := newFakeDynamicEthChannelConfig(l, 1000)
cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs)
// Seed channel manager with a block
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
blockA := derivetest.RandomL2BlockWithChainId(rng, 200, defaultTestRollupConfig.L2ChainID)
m.blocks = []*types.Block{blockA}
// Call TxData a first time to trigger blocks->channels pipeline
_, err := m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF)
// The test requires us to have something in the channel queue
// at this point, but not yet ready to send and not full
require.NotEmpty(t, m.channelQueue)
require.False(t, m.channelQueue[0].IsFull())
// Simulate updated market conditions
// by possibly flipping the state of the
// fake channel provider
l.Info("updating market conditions", "chooseBlobs", tc.chooseBlobsWhenChannelSubmitted)
cfg.chooseBlobs = tc.chooseBlobsWhenChannelSubmitted
// Add a block and call TxData until
// we get some data to submit
var data txData
for {
m.blocks = []*types.Block{blockA}
data, err = m.TxData(eth.BlockID{})
if err == nil && data.Len() > 0 {
break
}
if !errors.Is(err, io.EOF) {
require.NoError(t, err)
}
}
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs)
})
}
}
// TestChannelManager_Requeue seeds the channel manager with blocks,
// takes a state snapshot, triggers the blocks->channels pipeline,
// and then calls Requeue. Finally, it asserts the channel manager's
// state is equal to the snapshot. It repeats this for a channel
// which has a pending transaction and verifies that Requeue is then
// a noop.
func TestChannelManager_Requeue(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
// Seed channel manager with blocks
rng := rand.New(rand.NewSource(99))
blockA := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
blockB := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
// This is the snapshot of channel manager state we want to reinstate
// when we requeue
stateSnapshot := []*types.Block{blockA, blockB}
m.blocks = stateSnapshot
require.Empty(t, m.channelQueue)
// Trigger the blocks -> channelQueue data pipelining
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.NotEmpty(t, m.channelQueue)
require.NoError(t, m.processBlocks())
// Assert that at least one block was processed into the channel
require.NotContains(t, m.blocks, blockA)
// Call the function we are testing
m.Requeue(m.defaultCfg)
// Ensure we got back to the state above
require.Equal(t, m.blocks, stateSnapshot)
require.Empty(t, m.channelQueue)
// Trigger the blocks -> channelQueue data pipelining again
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.NotEmpty(t, m.channelQueue)
require.NoError(t, m.processBlocks())
// Assert that at least one block was processed into the channel
require.NotContains(t, m.blocks, blockA)
// Now mark the 0th channel in the queue as already
// starting to send on chain
channel0 := m.channelQueue[0]
channel0.pendingTransactions["foo"] = txData{}
require.False(t, channel0.NoneSubmitted())
// Call the function we are testing
m.Requeue(m.defaultCfg)
// The requeue shouldn't affect the pending channel
require.Contains(t, m.channelQueue, channel0)
require.NotContains(t, m.blocks, blockA)
}
...@@ -86,8 +86,8 @@ func TestChannelManager_NextTxData(t *testing.T) { ...@@ -86,8 +86,8 @@ func TestChannelManager_NextTxData(t *testing.T) {
require.Equal(t, txData{}, returnedTxData) require.Equal(t, txData{}, returnedTxData)
// Set the pending channel // Set the pending channel
// The nextTxData function should still return EOF // The nextTxData function should still return io.EOF
// since the pending channel has no frames // since the current channel has no frames
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
channel := m.currentChannel channel := m.currentChannel
require.NotNil(t, channel) require.NotNil(t, channel)
......
...@@ -194,6 +194,7 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error { ...@@ -194,6 +194,7 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error {
// 2. Check if the sync status is valid or if we are all the way up to date // 2. Check if the sync status is valid or if we are all the way up to date
// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) // 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?)
// 4. Load all new blocks into the local state. // 4. Load all new blocks into the local state.
//
// If there is a reorg, it will reset the last stored block but not clear the internal state so // If there is a reorg, it will reset the last stored block but not clear the internal state so
// the state can be flushed to L1. // the state can be flushed to L1.
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
......
...@@ -50,6 +50,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) { ...@@ -50,6 +50,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) {
Log: testlog.Logger(t, log.LevelDebug), Log: testlog.Logger(t, log.LevelDebug),
Metr: metrics.NoopMetrics, Metr: metrics.NoopMetrics,
RollupConfig: cfg, RollupConfig: cfg,
ChannelConfig: defaultTestChannelConfig(),
EndpointProvider: ep, EndpointProvider: ep,
}), ep }), ep
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment