Commit d2569b3e authored by Michael de Hoog's avatar Michael de Hoog

Multiple pending channels

parent a48e53c1
package batcher
import (
"fmt"
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type channel struct {
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
// pending channel builder
channelBuilder *channelBuilder
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
}
func newPendingChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) (*channel, error) {
cb, err := newChannelBuilder(cfg)
if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err)
}
return &channel{
log: log,
metr: metr,
cfg: cfg,
channelBuilder: cb,
pendingTransactions: make(map[txID]txData),
confirmedTransactions: make(map[txID]eth.BlockID),
}, nil
}
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channel) TxFailed(id txID) bool {
if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id)
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
s.channelBuilder.PushFrame(data.Frame())
delete(s.pendingTransactions, id)
} else {
s.log.Warn("unknown transaction marked as failed", "id", id)
}
s.metr.RecordBatchTxFailed()
return len(s.pendingTransactions) == 0 && len(s.confirmedTransactions) == 0
}
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channel) TxConfirmed(id txID, inclusionBlock eth.BlockID) (bool, []*types.Block) {
s.metr.RecordBatchTxSubmitted()
s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead
return false, nil
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
s.channelBuilder.FramePublished(inclusionBlock.Number)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if s.isTimedOut() {
s.metr.RecordChannelTimedOut(s.ID())
s.log.Warn("Channel timed out", "id", s.ID())
return true, s.channelBuilder.Blocks()
}
// If we are done with this channel, record that.
if s.isFullySubmitted() {
s.metr.RecordChannelFullySubmitted(s.ID())
s.log.Info("Channel is fully submitted", "id", s.ID())
return true, nil
}
return false, nil
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channel) isTimedOut() bool {
if len(s.confirmedTransactions) == 0 {
return false
}
// If there are confirmed transactions, find the first + last confirmed block numbers
min := uint64(math.MaxUint64)
max := uint64(0)
for _, inclusionBlock := range s.confirmedTransactions {
if inclusionBlock.Number < min {
min = inclusionBlock.Number
}
if inclusionBlock.Number > max {
max = inclusionBlock.Number
}
}
return max-min >= s.cfg.ChannelTimeout
}
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channel) isFullySubmitted() bool {
return s.IsFull() && len(s.pendingTransactions)+s.NumFrames() == 0
}
func (s *channel) NoneSubmitted() bool {
return len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0
}
func (s *channel) ID() derive.ChannelID {
return s.channelBuilder.ID()
}
func (s *channel) NextTxData() txData {
frame := s.channelBuilder.NextFrame()
txdata := txData{frame}
id := txdata.ID()
s.log.Trace("returning next tx data", "id", id)
s.pendingTransactions[id] = txdata
return txdata
}
func (s *channel) HasFrame() bool {
return s.channelBuilder.HasFrame()
}
func (s *channel) IsFull() bool {
return s.channelBuilder.IsFull()
}
func (s *channel) FullErr() error {
return s.channelBuilder.FullErr()
}
func (s *channel) RegisterL1Block(l1BlockNum uint64) {
s.channelBuilder.RegisterL1Block(l1BlockNum)
}
func (s *channel) AddBlock(block *types.Block) (derive.L1BlockInfo, error) {
return s.channelBuilder.AddBlock(block)
}
func (s *channel) InputBytes() int {
return s.channelBuilder.InputBytes()
}
func (s *channel) ReadyBytes() int {
return s.channelBuilder.ReadyBytes()
}
func (s *channel) OutputBytes() int {
return s.channelBuilder.OutputBytes()
}
func (s *channel) NumFrames() int {
return s.channelBuilder.NumFrames()
}
func (s *channel) OutputFrames() error {
return s.channelBuilder.OutputFrames()
}
func (s *channel) Close() {
s.channelBuilder.Close()
}
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
...@@ -34,13 +33,9 @@ type channelManager struct { ...@@ -34,13 +33,9 @@ type channelManager struct {
tip common.Hash tip common.Hash
// Pending data returned by TxData waiting on Tx Confirmed/Failed // Pending data returned by TxData waiting on Tx Confirmed/Failed
currentChannel *channel
// pending channel builder channelQueue []*channel
pendingChannel *channelBuilder txChannels map[txID]*channel
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
// if set to true, prevents production of any new channel frames // if set to true, prevents production of any new channel frames
closed bool closed bool
...@@ -51,9 +46,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) ...@@ -51,9 +46,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig)
log: log, log: log,
metr: metr, metr: metr,
cfg: cfg, cfg: cfg,
txChannels: make(map[txID]*channel),
pendingTransactions: make(map[txID]txData),
confirmedTransactions: make(map[txID]eth.BlockID),
} }
} }
...@@ -64,27 +57,23 @@ func (s *channelManager) Clear() { ...@@ -64,27 +57,23 @@ func (s *channelManager) Clear() {
s.blocks = s.blocks[:0] s.blocks = s.blocks[:0]
s.tip = common.Hash{} s.tip = common.Hash{}
s.closed = false s.closed = false
s.clearPendingChannel() s.currentChannel = nil
s.channelQueue = nil
s.txChannels = make(map[txID]*channel)
} }
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction. // in the failed transaction.
func (s *channelManager) TxFailed(id txID) { func (s *channelManager) TxFailed(id txID) {
if data, ok := s.pendingTransactions[id]; ok { if channel, ok := s.txChannels[id]; ok {
s.log.Trace("marked transaction as failed", "id", id) delete(s.txChannels, id)
// Note: when the batcher is changed to send multiple frames per tx, empty := channel.TxFailed(id)
// this needs to be changed to iterate over all frames of the tx data if s.closed && empty {
// and re-queue them. s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", channel.ID())
s.pendingChannel.PushFrame(data.Frame()) s.removePendingChannel(channel)
delete(s.pendingTransactions, id)
} else {
s.log.Warn("unknown transaction marked as failed", "id", id)
} }
} else {
s.metr.RecordBatchTxFailed() s.log.Warn("transaction from unknown channel marked as failed", "id", id)
if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 && s.pendingChannel != nil {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID())
s.clearPendingChannel()
} }
} }
...@@ -93,89 +82,48 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -93,89 +82,48 @@ func (s *channelManager) TxFailed(id txID) {
// resubmitted. // resubmitted.
// This function may reset the pending channel if the pending channel has timed out. // This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.metr.RecordBatchTxSubmitted() if channel, ok := s.txChannels[id]; ok {
s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock) delete(s.txChannels, id)
if _, ok := s.pendingTransactions[id]; !ok { done, blocks := channel.TxConfirmed(id, inclusionBlock)
s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock) s.blocks = append(blocks, s.blocks...)
// TODO: This can occur if we clear the channel while there are still pending transactions if done {
// We need to keep track of stale transactions instead s.removePendingChannel(channel)
return
} }
delete(s.pendingTransactions, id) } else {
s.confirmedTransactions[id] = inclusionBlock s.log.Warn("transaction from unknown channel marked as confirmed", "id", id)
s.pendingChannel.FramePublished(inclusionBlock.Number)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if s.pendingChannelIsTimedOut() {
s.metr.RecordChannelTimedOut(s.pendingChannel.ID())
s.log.Warn("Channel timed out", "id", s.pendingChannel.ID())
s.blocks = append(s.pendingChannel.Blocks(), s.blocks...)
s.clearPendingChannel()
}
// If we are done with this channel, record that.
if s.pendingChannelIsFullySubmitted() {
s.metr.RecordChannelFullySubmitted(s.pendingChannel.ID())
s.log.Info("Channel is fully submitted", "id", s.pendingChannel.ID())
s.clearPendingChannel()
} }
s.metr.RecordBatchTxSubmitted()
s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
} }
// clearPendingChannel resets all pending state back to an initialized but empty state. // removePendingChannel removes the given completed channel from the manager's state.
// TODO: Create separate "pending" state func (s *channelManager) removePendingChannel(channel *channel) {
func (s *channelManager) clearPendingChannel() { if s.currentChannel == channel {
s.pendingChannel = nil s.currentChannel = nil
s.pendingTransactions = make(map[txID]txData)
s.confirmedTransactions = make(map[txID]eth.BlockID)
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channelManager) pendingChannelIsTimedOut() bool {
if s.pendingChannel == nil {
return false // no channel to be timed out
}
// No confirmed transactions => not timed out
if len(s.confirmedTransactions) == 0 {
return false
} }
// If there are confirmed transactions, find the first + last confirmed block numbers index := -1
min := uint64(math.MaxUint64) for i, c := range s.channelQueue {
max := uint64(0) if c == channel {
for _, inclusionBlock := range s.confirmedTransactions { index = i
if inclusionBlock.Number < min { break
min = inclusionBlock.Number
}
if inclusionBlock.Number > max {
max = inclusionBlock.Number
} }
} }
return max-min >= s.cfg.ChannelTimeout if index < 0 {
} s.log.Warn("channel not found in channel queue", "id", channel.ID())
return
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channelManager) pendingChannelIsFullySubmitted() bool {
if s.pendingChannel == nil {
return false // todo: can decide either way here. Nonsensical answer though
} }
return s.pendingChannel.IsFull() && len(s.pendingTransactions)+s.pendingChannel.NumFrames() == 0 s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...)
} }
// nextTxData pops off s.datas & handles updating the internal state // nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData() (txData, error) { func (s *channelManager) nextTxData(channel *channel) (txData, error) {
if s.pendingChannel == nil || !s.pendingChannel.HasFrame() { if channel == nil || !channel.HasFrame() {
s.log.Trace("no next tx data") s.log.Trace("no next tx data")
return txData{}, io.EOF // TODO: not enough data error instead return txData{}, io.EOF // TODO: not enough data error instead
} }
tx := channel.NextTxData()
frame := s.pendingChannel.NextFrame() s.txChannels[tx.ID()] = channel
txdata := txData{frame} return tx, nil
id := txdata.ID()
s.log.Trace("returning next tx data", "id", id)
s.pendingTransactions[id] = txdata
return txdata, nil
} }
// TxData returns the next tx data that should be submitted to L1. // TxData returns the next tx data that should be submitted to L1.
...@@ -184,12 +132,20 @@ func (s *channelManager) nextTxData() (txData, error) { ...@@ -184,12 +132,20 @@ func (s *channelManager) nextTxData() (txData, error) {
// full, it only returns the remaining frames of this channel until it got // full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame. // successfully fully sent to L1. It returns io.EOF if there's no pending frame.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() var firstWithFrame *channel
for _, ch := range s.channelQueue {
if ch.HasFrame() {
firstWithFrame = ch
break
}
}
dataPending := firstWithFrame != nil && firstWithFrame.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is a pending frame or the channel manager is closed. // Short circuit if there is a pending frame or the channel manager is closed.
if dataPending || s.closed { if dataPending || s.closed {
return s.nextTxData() return s.nextTxData(firstWithFrame)
} }
// No pending frame, so we have to add new blocks to the channel // No pending frame, so we have to add new blocks to the channel
...@@ -199,12 +155,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -199,12 +155,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, io.EOF return txData{}, io.EOF
} }
// we have blocks, but we cannot add them to the channel right now if err := s.ensureChannelWithRoom(l1Head); err != nil {
if s.pendingChannel != nil && s.pendingChannel.IsFull() {
return txData{}, io.EOF
}
if err := s.ensurePendingChannel(l1Head); err != nil {
return txData{}, err return txData{}, err
} }
...@@ -221,35 +172,36 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -221,35 +172,36 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, err return txData{}, err
} }
return s.nextTxData() return s.nextTxData(s.currentChannel)
} }
func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error { func (s *channelManager) ensureChannelWithRoom(l1Head eth.BlockID) error {
if s.pendingChannel != nil { if s.currentChannel != nil && !s.currentChannel.IsFull() {
return nil return nil
} }
cb, err := newChannelBuilder(s.cfg) pc, err := newPendingChannel(s.log, s.metr, s.cfg)
if err != nil { if err != nil {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating new channel: %w", err)
} }
s.pendingChannel = cb s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc)
s.log.Info("Created channel", s.log.Info("Created channel",
"id", cb.ID(), "id", pc.ID(),
"l1Head", l1Head, "l1Head", l1Head,
"blocks_pending", len(s.blocks)) "blocks_pending", len(s.blocks))
s.metr.RecordChannelOpened(cb.ID(), len(s.blocks)) s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))
return nil return nil
} }
// registerL1Block registers the given block at the pending channel. // registerL1Block registers the given block at the pending channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) { func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.pendingChannel.RegisterL1Block(l1Head.Number) s.currentChannel.RegisterL1Block(l1Head.Number)
s.log.Debug("new L1-block registered at channel builder", s.log.Debug("new L1-block registered at channel builder",
"l1Head", l1Head, "l1Head", l1Head,
"channel_full", s.pendingChannel.IsFull(), "channel_full", s.currentChannel.IsFull(),
"full_reason", s.pendingChannel.FullErr(), "full_reason", s.currentChannel.FullErr(),
) )
} }
...@@ -262,7 +214,7 @@ func (s *channelManager) processBlocks() error { ...@@ -262,7 +214,7 @@ func (s *channelManager) processBlocks() error {
latestL2ref eth.L2BlockRef latestL2ref eth.L2BlockRef
) )
for i, block := range s.blocks { for i, block := range s.blocks {
l1info, err := s.pendingChannel.AddBlock(block) l1info, err := s.currentChannel.AddBlock(block)
if errors.As(err, &_chFullErr) { if errors.As(err, &_chFullErr) {
// current block didn't get added because channel is already full // current block didn't get added because channel is already full
break break
...@@ -272,7 +224,7 @@ func (s *channelManager) processBlocks() error { ...@@ -272,7 +224,7 @@ func (s *channelManager) processBlocks() error {
blocksAdded += 1 blocksAdded += 1
latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info) latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
// current block got added but channel is now full // current block got added but channel is now full
if s.pendingChannel.IsFull() { if s.currentChannel.IsFull() {
break break
} }
} }
...@@ -288,34 +240,34 @@ func (s *channelManager) processBlocks() error { ...@@ -288,34 +240,34 @@ func (s *channelManager) processBlocks() error {
s.metr.RecordL2BlocksAdded(latestL2ref, s.metr.RecordL2BlocksAdded(latestL2ref,
blocksAdded, blocksAdded,
len(s.blocks), len(s.blocks),
s.pendingChannel.InputBytes(), s.currentChannel.InputBytes(),
s.pendingChannel.ReadyBytes()) s.currentChannel.ReadyBytes())
s.log.Debug("Added blocks to channel", s.log.Debug("Added blocks to channel",
"blocks_added", blocksAdded, "blocks_added", blocksAdded,
"blocks_pending", len(s.blocks), "blocks_pending", len(s.blocks),
"channel_full", s.pendingChannel.IsFull(), "channel_full", s.currentChannel.IsFull(),
"input_bytes", s.pendingChannel.InputBytes(), "input_bytes", s.currentChannel.InputBytes(),
"ready_bytes", s.pendingChannel.ReadyBytes(), "ready_bytes", s.currentChannel.ReadyBytes(),
) )
return nil return nil
} }
func (s *channelManager) outputFrames() error { func (s *channelManager) outputFrames() error {
if err := s.pendingChannel.OutputFrames(); err != nil { if err := s.currentChannel.OutputFrames(); err != nil {
return fmt.Errorf("creating frames with channel builder: %w", err) return fmt.Errorf("creating frames with channel builder: %w", err)
} }
if !s.pendingChannel.IsFull() { if !s.currentChannel.IsFull() {
return nil return nil
} }
inBytes, outBytes := s.pendingChannel.InputBytes(), s.pendingChannel.OutputBytes() inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed( s.metr.RecordChannelClosed(
s.pendingChannel.ID(), s.currentChannel.ID(),
len(s.blocks), len(s.blocks),
s.pendingChannel.NumFrames(), s.currentChannel.NumFrames(),
inBytes, inBytes,
outBytes, outBytes,
s.pendingChannel.FullErr(), s.currentChannel.FullErr(),
) )
var comprRatio float64 var comprRatio float64
...@@ -323,12 +275,12 @@ func (s *channelManager) outputFrames() error { ...@@ -323,12 +275,12 @@ func (s *channelManager) outputFrames() error {
comprRatio = float64(outBytes) / float64(inBytes) comprRatio = float64(outBytes) / float64(inBytes)
} }
s.log.Info("Channel closed", s.log.Info("Channel closed",
"id", s.pendingChannel.ID(), "id", s.currentChannel.ID(),
"blocks_pending", len(s.blocks), "blocks_pending", len(s.blocks),
"num_frames", s.pendingChannel.NumFrames(), "num_frames", s.currentChannel.NumFrames(),
"input_bytes", inBytes, "input_bytes", inBytes,
"output_bytes", outBytes, "output_bytes", outBytes,
"full_reason", s.pendingChannel.FullErr(), "full_reason", s.currentChannel.FullErr(),
"compr_ratio", comprRatio, "compr_ratio", comprRatio,
) )
return nil return nil
...@@ -369,15 +321,17 @@ func (s *channelManager) Close() error { ...@@ -369,15 +321,17 @@ func (s *channelManager) Close() error {
s.closed = true s.closed = true
// Any pending state can be proactively cleared if there are no submitted transactions // Any pending state can be proactively cleared if there are no submitted transactions
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { for _, ch := range s.channelQueue {
s.clearPendingChannel() if ch.NoneSubmitted() {
s.removePendingChannel(ch)
}
} }
if s.pendingChannel == nil { if s.currentChannel == nil {
return nil return nil
} }
s.pendingChannel.Close() s.currentChannel.Close()
return s.outputFrames() return s.outputFrames()
} }
...@@ -19,50 +19,6 @@ import ( ...@@ -19,50 +19,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// TestPendingChannelTimeout tests that the channel manager
// correctly identifies when a pending channel is timed out.
func TestPendingChannelTimeout(t *testing.T) {
// Create a new channel manager with a ChannelTimeout
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
})
// Pending channel is nil so is cannot be timed out
timeout := m.pendingChannelIsTimedOut()
require.False(t, timeout)
// Set the pending channel
require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
// There are no confirmed transactions so
// the pending channel cannot be timed out
timeout = m.pendingChannelIsTimedOut()
require.False(t, timeout)
// Manually set a confirmed transactions
// To avoid other methods clearing state
m.confirmedTransactions[frameID{frameNumber: 0}] = eth.BlockID{Number: 0}
m.confirmedTransactions[frameID{frameNumber: 1}] = eth.BlockID{Number: 99}
// Since the ChannelTimeout is 100, the
// pending channel should not be timed out
timeout = m.pendingChannelIsTimedOut()
require.False(t, timeout)
// Add a confirmed transaction with a higher number
// than the ChannelTimeout
m.confirmedTransactions[frameID{
frameNumber: 2,
}] = eth.BlockID{
Number: 101,
}
// Now the pending channel should be timed out
timeout = m.pendingChannelIsTimedOut()
require.True(t, timeout)
}
// TestChannelManagerReturnsErrReorg ensures that the channel manager // TestChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks. // detects a reorg when it has cached L1 blocks.
func TestChannelManagerReturnsErrReorg(t *testing.T) { func TestChannelManagerReturnsErrReorg(t *testing.T) {
...@@ -101,7 +57,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -101,7 +57,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
ChannelConfig{ ChannelConfig{
MaxFrameSize: 120_000, MaxFrameSize: 120_000,
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
TargetFrameSize: 0, TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) })
...@@ -119,46 +76,6 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -119,46 +76,6 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
require.ErrorIs(t, m.AddL2Block(x), ErrReorg) require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
} }
// TestChannelManagerNextTxData checks the nextTxData function.
func TestChannelManagerNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
// Nil pending channel should return EOF
returnedTxData, err := m.nextTxData()
require.ErrorIs(t, err, io.EOF)
require.Equal(t, txData{}, returnedTxData)
// Set the pending channel
// The nextTxData function should still return EOF
// since the pending channel has no frames
require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
returnedTxData, err = m.nextTxData()
require.ErrorIs(t, err, io.EOF)
require.Equal(t, txData{}, returnedTxData)
// Manually push a frame into the pending channel
channelID := m.pendingChannel.ID()
frame := frameData{
data: []byte{},
id: frameID{
chID: channelID,
frameNumber: uint16(0),
},
}
m.pendingChannel.PushFrame(frame)
require.Equal(t, 1, m.pendingChannel.NumFrames())
// Now the nextTxData function should return the frame
returnedTxData, err = m.nextTxData()
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.pendingChannel.NumFrames())
require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID])
}
// TestChannelManager_Clear tests clearing the channel manager. // TestChannelManager_Clear tests clearing the channel manager.
func TestChannelManager_Clear(t *testing.T) { func TestChannelManager_Clear(t *testing.T) {
require := require.New(t) require := require.New(t)
...@@ -184,9 +101,9 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -184,9 +101,9 @@ func TestChannelManager_Clear(t *testing.T) {
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
require.Empty(m.blocks) require.Empty(m.blocks)
require.Equal(common.Hash{}, m.tip) require.Equal(common.Hash{}, m.tip)
require.Nil(m.pendingChannel) require.Nil(m.currentChannel)
require.Empty(m.pendingTransactions) require.Empty(m.channelQueue)
require.Empty(m.confirmedTransactions) require.Empty(m.txChannels)
// Add a block to the channel manager // Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -198,22 +115,22 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -198,22 +115,22 @@ func TestChannelManager_Clear(t *testing.T) {
require.NoError(m.AddL2Block(a)) require.NoError(m.AddL2Block(a))
// Make sure there is a channel builder // Make sure there is a channel builder
require.NoError(m.ensurePendingChannel(l1BlockID)) require.NoError(m.ensureChannelWithRoom(l1BlockID))
require.NotNil(m.pendingChannel) require.NotNil(m.currentChannel)
require.Len(m.confirmedTransactions, 0) require.Len(m.currentChannel.confirmedTransactions, 0)
// Process the blocks // Process the blocks
// We should have a pending channel with 1 frame // We should have a pending channel with 1 frame
// and no more blocks since processBlocks consumes // and no more blocks since processBlocks consumes
// the list // the list
require.NoError(m.processBlocks()) require.NoError(m.processBlocks())
require.NoError(m.pendingChannel.co.Flush()) require.NoError(m.currentChannel.channelBuilder.co.Flush())
require.NoError(m.pendingChannel.OutputFrames()) require.NoError(m.currentChannel.OutputFrames())
_, err := m.nextTxData() _, err := m.nextTxData(m.currentChannel)
require.NoError(err) require.NoError(err)
require.Len(m.blocks, 0) require.Len(m.blocks, 0)
require.Equal(newL1Tip, m.tip) require.Equal(newL1Tip, m.tip)
require.Len(m.pendingTransactions, 1) require.Len(m.currentChannel.pendingTransactions, 1)
// Add a new block so we can test clearing // Add a new block so we can test clearing
// the channel manager with a full state // the channel manager with a full state
...@@ -231,104 +148,9 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -231,104 +148,9 @@ func TestChannelManager_Clear(t *testing.T) {
// Check that the entire channel manager state cleared // Check that the entire channel manager state cleared
require.Empty(m.blocks) require.Empty(m.blocks)
require.Equal(common.Hash{}, m.tip) require.Equal(common.Hash{}, m.tip)
require.Nil(m.pendingChannel) require.Nil(m.currentChannel)
require.Empty(m.pendingTransactions) require.Empty(m.channelQueue)
require.Empty(m.confirmedTransactions) require.Empty(m.txChannels)
}
// TestChannelManagerTxConfirmed checks the [ChannelManager.TxConfirmed] function.
func TestChannelManagerTxConfirmed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
// Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10,
})
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
channelID := m.pendingChannel.ID()
frame := frameData{
data: []byte{},
id: frameID{
chID: channelID,
frameNumber: uint16(0),
},
}
m.pendingChannel.PushFrame(frame)
require.Equal(t, 1, m.pendingChannel.NumFrames())
returnedTxData, err := m.nextTxData()
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.pendingChannel.NumFrames())
require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID])
require.Len(t, m.pendingTransactions, 1)
// An unknown pending transaction should not be marked as confirmed
// and should not be removed from the pending transactions map
actualChannelID := m.pendingChannel.ID()
unknownChannelID := derive.ChannelID([derive.ChannelIDLength]byte{0x69})
require.NotEqual(t, actualChannelID, unknownChannelID)
unknownTxID := frameID{chID: unknownChannelID, frameNumber: 0}
blockID := eth.BlockID{Number: 0, Hash: common.Hash{0x69}}
m.TxConfirmed(unknownTxID, blockID)
require.Empty(t, m.confirmedTransactions)
require.Len(t, m.pendingTransactions, 1)
// Now let's mark the pending transaction as confirmed
// and check that it is removed from the pending transactions map
// and added to the confirmed transactions map
m.TxConfirmed(expectedChannelID, blockID)
require.Empty(t, m.pendingTransactions)
require.Len(t, m.confirmedTransactions, 1)
require.Equal(t, blockID, m.confirmedTransactions[expectedChannelID])
}
// TestChannelManagerTxFailed checks the [ChannelManager.TxFailed] function.
func TestChannelManagerTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
channelID := m.pendingChannel.ID()
frame := frameData{
data: []byte{},
id: frameID{
chID: channelID,
frameNumber: uint16(0),
},
}
m.pendingChannel.PushFrame(frame)
require.Equal(t, 1, m.pendingChannel.NumFrames())
returnedTxData, err := m.nextTxData()
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.pendingChannel.NumFrames())
require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID])
require.Len(t, m.pendingTransactions, 1)
// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m.TxFailed(frameID{})
require.Equal(t, 0, m.pendingChannel.NumFrames())
require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID])
// Now we still have a pending transaction
// Let's mark it as failed
m.TxFailed(expectedChannelID)
require.Empty(t, m.pendingTransactions)
// There should be a frame in the pending channel now
require.Equal(t, 1, m.pendingChannel.NumFrames())
} }
func TestChannelManager_TxResend(t *testing.T) { func TestChannelManager_TxResend(t *testing.T) {
...@@ -339,7 +161,8 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -339,7 +161,8 @@ func TestChannelManager_TxResend(t *testing.T) {
ChannelConfig{ ChannelConfig{
MaxFrameSize: 120_000, MaxFrameSize: 120_000,
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
TargetFrameSize: 0, TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) })
......
package batcher
import (
"io"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// TestChannelTimeout tests that the channel manager
// correctly identifies when a pending channel is timed out.
func TestChannelTimeout(t *testing.T) {
// Create a new channel manager with a ChannelTimeout
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
})
// Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel)
// Set the pending channel
require.NoError(t, m.ensureChannelWithRoom(eth.BlockID{}))
channel := m.currentChannel
require.NotNil(t, channel)
// There are no confirmed transactions so
// the pending channel cannot be timed out
timeout := channel.isTimedOut()
require.False(t, timeout)
// Manually set a confirmed transactions
// To avoid other methods clearing state
channel.confirmedTransactions[frameID{frameNumber: 0}] = eth.BlockID{Number: 0}
channel.confirmedTransactions[frameID{frameNumber: 1}] = eth.BlockID{Number: 99}
// Since the ChannelTimeout is 100, the
// pending channel should not be timed out
timeout = channel.isTimedOut()
require.False(t, timeout)
// Add a confirmed transaction with a higher number
// than the ChannelTimeout
channel.confirmedTransactions[frameID{
frameNumber: 2,
}] = eth.BlockID{
Number: 101,
}
// Now the pending channel should be timed out
timeout = channel.isTimedOut()
require.True(t, timeout)
}
// TestChannelNextTxData checks the nextTxData function.
func TestChannelNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
// Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil)
require.ErrorIs(t, err, io.EOF)
require.Equal(t, txData{}, returnedTxData)
// Set the pending channel
// The nextTxData function should still return EOF
// since the pending channel has no frames
require.NoError(t, m.ensureChannelWithRoom(eth.BlockID{}))
channel := m.currentChannel
require.NotNil(t, channel)
returnedTxData, err = m.nextTxData(channel)
require.ErrorIs(t, err, io.EOF)
require.Equal(t, txData{}, returnedTxData)
// Manually push a frame into the pending channel
channelID := channel.ID()
frame := frameData{
data: []byte{},
id: frameID{
chID: channelID,
frameNumber: uint16(0),
},
}
channel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, channel.NumFrames())
// Now the nextTxData function should return the frame
returnedTxData, err = m.nextTxData(channel)
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, channel.NumFrames())
require.Equal(t, expectedTxData, channel.pendingTransactions[expectedChannelID])
}
// TestChannelManagerTxConfirmed checks the [ChannelManager.TxConfirmed] function.
func TestChannelManagerTxConfirmed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
// Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10,
})
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
require.NoError(t, m.ensureChannelWithRoom(eth.BlockID{}))
channelID := m.currentChannel.ID()
frame := frameData{
data: []byte{},
id: frameID{
chID: channelID,
frameNumber: uint16(0),
},
}
m.currentChannel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, m.currentChannel.NumFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.NumFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Len(t, m.currentChannel.pendingTransactions, 1)
// An unknown pending transaction should not be marked as confirmed
// and should not be removed from the pending transactions map
actualChannelID := m.currentChannel.ID()
unknownChannelID := derive.ChannelID([derive.ChannelIDLength]byte{0x69})
require.NotEqual(t, actualChannelID, unknownChannelID)
unknownTxID := frameID{chID: unknownChannelID, frameNumber: 0}
blockID := eth.BlockID{Number: 0, Hash: common.Hash{0x69}}
m.TxConfirmed(unknownTxID, blockID)
require.Empty(t, m.currentChannel.confirmedTransactions)
require.Len(t, m.currentChannel.pendingTransactions, 1)
// Now let's mark the pending transaction as confirmed
// and check that it is removed from the pending transactions map
// and added to the confirmed transactions map
m.TxConfirmed(expectedChannelID, blockID)
require.Empty(t, m.currentChannel.pendingTransactions)
require.Len(t, m.currentChannel.confirmedTransactions, 1)
require.Equal(t, blockID, m.currentChannel.confirmedTransactions[expectedChannelID])
}
// TestChannelManagerTxFailed checks the [ChannelManager.TxFailed] function.
func TestChannelManagerTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
require.NoError(t, m.ensureChannelWithRoom(eth.BlockID{}))
channelID := m.currentChannel.ID()
frame := frameData{
data: []byte{},
id: frameID{
chID: channelID,
frameNumber: uint16(0),
},
}
m.currentChannel.channelBuilder.PushFrame(frame)
require.Equal(t, 1, m.currentChannel.NumFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.NumFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Len(t, m.currentChannel.pendingTransactions, 1)
// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m.TxFailed(frameID{})
require.Equal(t, 0, m.currentChannel.NumFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
// Now we still have a pending transaction
// Let's mark it as failed
m.TxFailed(expectedChannelID)
require.Empty(t, m.currentChannel.pendingTransactions)
// There should be a frame in the pending channel now
require.Equal(t, 1, m.currentChannel.NumFrames())
}
...@@ -16,7 +16,8 @@ type Config struct { ...@@ -16,7 +16,8 @@ type Config struct {
// ApproxComprRatio to assume. Should be slightly smaller than average from // ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame. // experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64 ApproxComprRatio float64
// Kind of compressor to use. Must // Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor
// will default to RatioKind.
Kind string Kind string
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment