Commit 25985c12 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-batcher: Multi-blob Support (#9779)

* op-batcher: Prepare multi-frame support

* op-batcher: adapt tests to multi-frame txData

* op-batcher: add multi-blob transaction support

The existing configuration parameter TargetNumFrames can be used to specify
the desired number of blobs per transaction.

* op-batcher: improve blobs configuration (for testing)

* op-e2e: add multi-blob batcher test

* op-batcher: consolidate txID String & TerminalString impls

and add a test for it.

* op-batcher: Fix config test

* op-e2e: Improve multi-blob test to assert full blobs

* op-batcher: resolve open TODOs & renames (multi-blob)

* op-batcher: Test channel.NextTxData for single and multi frame txs
parent 9c888f6a
......@@ -21,10 +21,10 @@ type channel struct {
// pending channel builder
channelBuilder *ChannelBuilder
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID]txData
// Set of unconfirmed txID -> tx data. For tx resubmission
pendingTransactions map[string]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
confirmedTransactions map[string]eth.BlockID
// True if confirmed TX list is updated. Set to false after updated min/max inclusion blocks.
confirmedTxUpdated bool
......@@ -44,20 +44,20 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
metr: metr,
cfg: cfg,
channelBuilder: cb,
pendingTransactions: make(map[txID]txData),
confirmedTransactions: make(map[txID]eth.BlockID),
pendingTransactions: make(map[string]txData),
confirmedTransactions: make(map[string]eth.BlockID),
}, nil
}
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channel) TxFailed(id txID) {
func (s *channel) TxFailed(id string) {
if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id)
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
s.channelBuilder.PushFrame(data.Frame())
s.channelBuilder.PushFrames(data.Frames()...)
delete(s.pendingTransactions, id)
} else {
s.log.Warn("unknown transaction marked as failed", "id", id)
......@@ -70,7 +70,7 @@ func (s *channel) TxFailed(id txID) {
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channel) TxConfirmed(id txID, inclusionBlock eth.BlockID) (bool, []*types.Block) {
func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) {
s.metr.RecordBatchTxSubmitted()
s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
......@@ -146,20 +146,33 @@ func (s *channel) ID() derive.ChannelID {
return s.channelBuilder.ID()
}
// NextTxData returns the next tx data packet.
// If cfg.MultiFrameTxs is false, it returns txData with a single frame.
// If cfg.MultiFrameTxs is true, it will read frames from its channel builder
// until it either doesn't have more frames or the target number of frames is reached.
//
// NextTxData should only be called after HasTxData returned true.
func (s *channel) NextTxData() txData {
frame := s.channelBuilder.NextFrame()
txdata := txData{frame}
id := txdata.ID()
nf := s.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf)}
for i := 0; i < nf && s.channelBuilder.HasFrame(); i++ {
frame := s.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}
s.log.Trace("returning next tx data", "id", id)
id := txdata.ID().String()
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames))
s.pendingTransactions[id] = txdata
return txdata
}
func (s *channel) HasFrame() bool {
return s.channelBuilder.HasFrame()
func (s *channel) HasTxData() bool {
if s.IsFull() || !s.cfg.MultiFrameTxs {
return s.channelBuilder.HasFrame()
}
// collect enough frames if channel is not full yet
return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx())
}
func (s *channel) IsFull() bool {
......
......@@ -62,6 +62,17 @@ type ChannelConfig struct {
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
// Whether to put all frames of a channel inside a single tx.
// Should only be used for blob transactions.
MultiFrameTxs bool
}
func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.MultiFrameTxs {
return 1
}
return cc.CompressorConfig.TargetNumFrames
}
// Check validates the [ChannelConfig] parameters.
......@@ -91,6 +102,10 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
if nf := cc.CompressorConfig.TargetNumFrames; nf < 1 {
return fmt.Errorf("invalid number of frames %d", nf)
}
return nil
}
......@@ -449,11 +464,13 @@ func (c *ChannelBuilder) NextFrame() frameData {
return f
}
// PushFrame adds the frame back to the internal frames queue. Panics if not of
// PushFrames adds the frames back to the internal frames queue. Panics if not of
// the same channel.
func (c *ChannelBuilder) PushFrame(frame frameData) {
if frame.id.chID != c.ID() {
panic("wrong channel")
func (c *ChannelBuilder) PushFrames(frames ...frameData) {
for _, f := range frames {
if f.id.chID != c.ID() {
panic("wrong channel")
}
c.frames = append(c.frames, f)
}
c.frames = append(c.frames, frame)
}
......@@ -410,7 +410,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
require.NoError(t, err)
// Push one frame into to the channel builder
expectedTx := txID{chID: co.ID(), frameNumber: fn}
expectedTx := txID{frameID{chID: co.ID(), frameNumber: fn}}
expectedBytes := buf.Bytes()
frameData := frameData{
id: frameID{
......@@ -419,14 +419,14 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
},
data: expectedBytes,
}
cb.PushFrame(frameData)
cb.PushFrames(frameData)
// There should only be 1 frame in the channel builder
require.Equal(t, 1, cb.PendingFrames())
// We should be able to increment to the next frame
constructedFrame := cb.NextFrame()
require.Equal(t, expectedTx, constructedFrame.id)
require.Equal(t, expectedTx[0], constructedFrame.id)
require.Equal(t, expectedBytes, constructedFrame.data)
require.Equal(t, 0, cb.PendingFrames())
......@@ -462,7 +462,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
},
data: buf.Bytes(),
}
cb.PushFrame(frame)
cb.PushFrames(frame)
})
}
......
......@@ -41,7 +41,7 @@ type channelManager struct {
// channels to read frame data from, for writing batches onchain
channelQueue []*channel
// used to lookup channels by tx ID upon tx success / failure
txChannels map[txID]*channel
txChannels map[string]*channel
// if set to true, prevents production of any new channel frames
closed bool
......@@ -53,7 +53,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig,
metr: metr,
cfg: cfg,
rollupCfg: rollupCfg,
txChannels: make(map[txID]*channel),
txChannels: make(map[string]*channel),
}
}
......@@ -68,14 +68,15 @@ func (s *channelManager) Clear() {
s.closed = false
s.currentChannel = nil
s.channelQueue = nil
s.txChannels = make(map[txID]*channel)
s.txChannels = make(map[string]*channel)
}
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(id txID) {
func (s *channelManager) TxFailed(_id txID) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
channel.TxFailed(id)
......@@ -92,9 +93,10 @@ func (s *channelManager) TxFailed(id txID) {
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
done, blocks := channel.TxConfirmed(id, inclusionBlock)
......@@ -130,40 +132,40 @@ func (s *channelManager) removePendingChannel(channel *channel) {
// nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData(channel *channel) (txData, error) {
if channel == nil || !channel.HasFrame() {
if channel == nil || !channel.HasTxData() {
s.log.Trace("no next tx data")
return txData{}, io.EOF // TODO: not enough data error instead
}
tx := channel.NextTxData()
s.txChannels[tx.ID()] = channel
s.txChannels[tx.ID().String()] = channel
return tx, nil
}
// TxData returns the next tx data that should be submitted to L1.
//
// It currently only uses one frame per transaction. If the pending channel is
// If the pending channel is
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame.
// successfully fully sent to L1. It returns io.EOF if there's no pending tx data.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
var firstWithFrame *channel
var firstWithTxData *channel
for _, ch := range s.channelQueue {
if ch.HasFrame() {
firstWithFrame = ch
if ch.HasTxData() {
firstWithTxData = ch
break
}
}
dataPending := firstWithFrame != nil && firstWithFrame.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
dataPending := firstWithTxData != nil && firstWithTxData.HasTxData()
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is a pending frame or the channel manager is closed.
// Short circuit if there is pending tx data or the channel manager is closed.
if dataPending || s.closed {
return s.nextTxData(firstWithFrame)
return s.nextTxData(firstWithTxData)
}
// No pending frame, so we have to add new blocks to the channel
// No pending tx data, so we have to add new blocks to the channel
// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
......@@ -385,7 +387,7 @@ func (s *channelManager) Close() error {
}
}
if s.currentChannel.HasFrame() {
if s.currentChannel.HasTxData() {
// Make it clear to the caller that there is remaining pending work.
return ErrPendingAfterClose
}
......
......@@ -217,7 +217,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
txdata0, err := m.TxData(eth.BlockID{})
require.NoError(err)
txdata0bytes := txdata0.Bytes()
txdata0bytes := txdata0.CallData()
data0 := make([]byte, len(txdata0bytes))
// make sure we have a clone for later comparison
copy(data0, txdata0bytes)
......@@ -232,7 +232,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
txdata1, err := m.TxData(eth.BlockID{})
require.NoError(err)
data1 := txdata1.Bytes()
data1 := txdata1.CallData()
require.Equal(data1, data0)
fs, err := derive.ParseFrames(data1)
require.NoError(err)
......
......@@ -4,6 +4,7 @@ import (
"io"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
......@@ -14,6 +15,14 @@ import (
"github.com/stretchr/testify/require"
)
func singleFrameTxID(cid derive.ChannelID, fn uint16) txID {
return txID{frameID{chID: cid, frameNumber: fn}}
}
func zeroFrameTxID(fn uint16) txID {
return txID{frameID{frameNumber: fn}}
}
// TestChannelTimeout tests that the channel manager
// correctly identifies when a pending channel is timed out.
func TestChannelTimeout(t *testing.T) {
......@@ -39,8 +48,8 @@ func TestChannelTimeout(t *testing.T) {
// Manually set a confirmed transactions
// To avoid other methods clearing state
channel.confirmedTransactions[frameID{frameNumber: 0}] = eth.BlockID{Number: 0}
channel.confirmedTransactions[frameID{frameNumber: 1}] = eth.BlockID{Number: 99}
channel.confirmedTransactions[zeroFrameTxID(0).String()] = eth.BlockID{Number: 0}
channel.confirmedTransactions[zeroFrameTxID(1).String()] = eth.BlockID{Number: 99}
channel.confirmedTxUpdated = true
// Since the ChannelTimeout is 100, the
......@@ -50,9 +59,7 @@ func TestChannelTimeout(t *testing.T) {
// Add a confirmed transaction with a higher number
// than the ChannelTimeout
channel.confirmedTransactions[frameID{
frameNumber: 2,
}] = eth.BlockID{
channel.confirmedTransactions[zeroFrameTxID(2).String()] = eth.BlockID{
Number: 101,
}
channel.confirmedTxUpdated = true
......@@ -62,8 +69,8 @@ func TestChannelTimeout(t *testing.T) {
require.True(t, timeout)
}
// TestChannelNextTxData checks the nextTxData function.
func TestChannelNextTxData(t *testing.T) {
// TestChannelManager_NextTxData tests the nextTxData function.
func TestChannelManager_NextTxData(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
......@@ -92,19 +99,105 @@ func TestChannelNextTxData(t *testing.T) {
frameNumber: uint16(0),
},
}
channel.channelBuilder.PushFrame(frame)
channel.channelBuilder.PushFrames(frame)
require.Equal(t, 1, channel.PendingFrames())
// Now the nextTxData function should return the frame
returnedTxData, err = m.nextTxData(channel)
expectedTxData := txData{frame}
expectedChannelID := expectedTxData.ID()
expectedTxData := singleFrameTxData(frame)
expectedChannelID := expectedTxData.ID().String()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, channel.PendingFrames())
require.Equal(t, expectedTxData, channel.pendingTransactions[expectedChannelID])
}
func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
require := require.New(t)
const n = 6
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: false,
CompressorConfig: compressor.Config{
TargetNumFrames: n,
},
}, &rollup.Config{})
require.NoError(err)
chID := ch.ID()
mockframes := makeMockFrameDatas(chID, n+1)
// put multiple frames into channel, but less than target
ch.channelBuilder.PushFrames(mockframes[:n-1]...)
requireTxData := func(i int) {
require.True(ch.HasTxData(), "expected tx data %d", i)
txdata := ch.NextTxData()
require.Len(txdata.frames, 1)
frame := txdata.frames[0]
require.Len(frame.data, 1)
require.EqualValues(i, frame.data[0])
require.Equal(frameID{chID: chID, frameNumber: uint16(i)}, frame.id)
}
for i := 0; i < n-1; i++ {
requireTxData(i)
}
require.False(ch.HasTxData())
// put in last two
ch.channelBuilder.PushFrames(mockframes[n-1 : n+1]...)
for i := n - 1; i < n+1; i++ {
requireTxData(i)
}
require.False(ch.HasTxData())
}
func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
require := require.New(t)
const n = 6
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: true,
CompressorConfig: compressor.Config{
TargetNumFrames: n,
},
}, &rollup.Config{})
require.NoError(err)
chID := ch.ID()
mockframes := makeMockFrameDatas(chID, n+1)
// put multiple frames into channel, but less than target
ch.channelBuilder.PushFrames(mockframes[:n-1]...)
require.False(ch.HasTxData())
// put in last two
ch.channelBuilder.PushFrames(mockframes[n-1 : n+1]...)
require.True(ch.HasTxData())
txdata := ch.NextTxData()
require.Len(txdata.frames, n)
for i := 0; i < n; i++ {
frame := txdata.frames[i]
require.Len(frame.data, 1)
require.EqualValues(i, frame.data[0])
require.Equal(frameID{chID: chID, frameNumber: uint16(i)}, frame.id)
}
require.False(ch.HasTxData(), "no tx data expected with single pending frame")
}
func makeMockFrameDatas(id derive.ChannelID, n int) []frameData {
fds := make([]frameData, 0, n)
for i := 0; i < n; i++ {
fds = append(fds, frameData{
data: []byte{byte(i)},
id: frameID{
chID: id,
frameNumber: uint16(i),
},
})
}
return fds
}
// TestChannelTxConfirmed checks the [ChannelManager.TxConfirmed] function.
func TestChannelTxConfirmed(t *testing.T) {
// Create a channel manager
......@@ -128,15 +221,15 @@ func TestChannelTxConfirmed(t *testing.T) {
frameNumber: uint16(0),
},
}
m.currentChannel.channelBuilder.PushFrame(frame)
m.currentChannel.channelBuilder.PushFrames(frame)
require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame}
expectedTxData := singleFrameTxData(frame)
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID.String()])
require.Len(t, m.currentChannel.pendingTransactions, 1)
// An unknown pending transaction should not be marked as confirmed
......@@ -144,7 +237,7 @@ func TestChannelTxConfirmed(t *testing.T) {
actualChannelID := m.currentChannel.ID()
unknownChannelID := derive.ChannelID([derive.ChannelIDLength]byte{0x69})
require.NotEqual(t, actualChannelID, unknownChannelID)
unknownTxID := frameID{chID: unknownChannelID, frameNumber: 0}
unknownTxID := singleFrameTxID(unknownChannelID, 0)
blockID := eth.BlockID{Number: 0, Hash: common.Hash{0x69}}
m.TxConfirmed(unknownTxID, blockID)
require.Empty(t, m.currentChannel.confirmedTransactions)
......@@ -156,7 +249,7 @@ func TestChannelTxConfirmed(t *testing.T) {
m.TxConfirmed(expectedChannelID, blockID)
require.Empty(t, m.currentChannel.pendingTransactions)
require.Len(t, m.currentChannel.confirmedTransactions, 1)
require.Equal(t, blockID, m.currentChannel.confirmedTransactions[expectedChannelID])
require.Equal(t, blockID, m.currentChannel.confirmedTransactions[expectedChannelID.String()])
}
// TestChannelTxFailed checks the [ChannelManager.TxFailed] function.
......@@ -177,22 +270,22 @@ func TestChannelTxFailed(t *testing.T) {
frameNumber: uint16(0),
},
}
m.currentChannel.channelBuilder.PushFrame(frame)
m.currentChannel.channelBuilder.PushFrames(frame)
require.Equal(t, 1, m.currentChannel.PendingFrames())
returnedTxData, err := m.nextTxData(m.currentChannel)
expectedTxData := txData{frame}
expectedTxData := singleFrameTxData(frame)
expectedChannelID := expectedTxData.ID()
require.NoError(t, err)
require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID.String()])
require.Len(t, m.currentChannel.pendingTransactions, 1)
// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m.TxFailed(frameID{})
m.TxFailed(zeroFrameTxID(0))
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID])
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID.String()])
// Now we still have a pending transaction
// Let's mark it as failed
......
......@@ -52,6 +52,7 @@ type CLIConfig struct {
MaxPendingTransactions uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
// If using blobs, this setting is ignored and the max blob size is used.
MaxL1TxSize uint64
Stopped bool
......@@ -62,6 +63,10 @@ type CLIConfig struct {
// the data availability type to use for posting batches, e.g. blobs vs calldata.
DataAvailabilityType flags.DataAvailabilityType
// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
// Should only be used for testing purposes.
TestUseMaxTxSizeForBlobs bool
// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration
......@@ -91,11 +96,17 @@ func (c *CLIConfig) Check() error {
return errors.New("must set PollInterval")
}
if c.MaxL1TxSize <= 1 {
return errors.New("MaxL1TxSize must be greater than 0")
return errors.New("MaxL1TxSize must be greater than 1")
}
if target, max := c.CompressorConfig.TargetL1TxSizeBytes, c.MaxL1TxSize; target > max {
return fmt.Errorf("target tx size > max, %d > %d", target, max)
}
if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType)
}
if c.DataAvailabilityType == flags.BlobsType && c.CompressorConfig.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6")
}
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
return fmt.Errorf("unknown data availability type: %q", c.DataAvailabilityType)
}
......
......@@ -70,7 +70,7 @@ func TestBatcherConfig(t *testing.T) {
{
name: "max L1 tx size too small",
override: func(c *batcher.CLIConfig) { c.MaxL1TxSize = 0 },
errString: "MaxL1TxSize must be greater than 0",
errString: "MaxL1TxSize must be greater than 1",
},
{
name: "invalid batch type close",
......
......@@ -375,29 +375,32 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
// if plasma DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UsePlasma {
data, err = l.PlasmaDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Plasma DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata, err)
return nil
}
}
var candidate *txmgr.TxCandidate
if l.Config.UseBlobs {
if candidate, err = l.blobTxCandidate(data); err != nil {
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
l.Metr.RecordBlobUsedBytes(len(data))
} else {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("unexpected number of frames in calldata tx", "num_frames", nf)
}
data := txdata.CallData()
// if plasma DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UsePlasma {
data, err = l.PlasmaDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Plasma DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata, err)
return nil
}
}
candidate = l.calldataTxCandidate(data)
}
......@@ -413,15 +416,19 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
return nil
}
func (l *BatchSubmitter) blobTxCandidate(data []byte) (*txmgr.TxCandidate, error) {
l.Log.Info("building Blob transaction candidate", "size", len(data))
var b eth.Blob
if err := b.FromData(data); err != nil {
return nil, fmt.Errorf("data could not be converted to blob: %w", err)
func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
blobs, err := data.Blobs()
if err != nil {
return nil, fmt.Errorf("generating blobs for tx data: %w", err)
}
size := data.Len()
lastSize := len(data.frames[len(data.frames)-1].data)
l.Log.Info("building Blob transaction candidate",
"size", size, "last_size", lastSize, "num_blobs", len(blobs))
l.Metr.RecordBlobUsedBytes(lastSize)
return &txmgr.TxCandidate{
To: &l.RollupConfig.BatchInboxAddress,
Blobs: []*eth.Blob{&b},
Blobs: blobs,
}, nil
}
......@@ -477,7 +484,7 @@ func logFields(xs ...any) (fs []any) {
for _, x := range xs {
switch v := x.(type) {
case txData:
fs = append(fs, "frame_id", v.ID(), "data_len", v.Len())
fs = append(fs, "tx_id", v.ID(), "data_len", v.Len())
case *types.Receipt:
fs = append(fs, "tx", v.TxHash, "block", eth.ReceiptBlockID(v))
case error:
......
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
......@@ -190,6 +191,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
SeqWindowSize: bs.RollupConfig.SeqWindowSize,
ChannelTimeout: bs.RollupConfig.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration,
MaxFrameSize: cfg.MaxL1TxSize, // reset for blobs
SubSafetyMargin: cfg.SubSafetyMargin,
CompressorConfig: cfg.CompressorConfig.Config(),
BatchType: cfg.BatchType,
......@@ -197,16 +199,23 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
switch cfg.DataAvailabilityType {
case flags.BlobsType:
bs.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize
if !cfg.TestUseMaxTxSizeForBlobs {
bs.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize
}
bs.ChannelConfig.MultiFrameTxs = true
bs.UseBlobs = true
case flags.CalldataType:
bs.ChannelConfig.MaxFrameSize = cfg.MaxL1TxSize
bs.UseBlobs = false
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}
bs.ChannelConfig.MaxFrameSize-- // subtract 1 byte for version
if bs.ChannelConfig.CompressorConfig.Kind == compressor.ShadowKind {
// shadow compressor guarantees to not go over target size, so can use max size
bs.ChannelConfig.CompressorConfig.TargetFrameSize = bs.ChannelConfig.MaxFrameSize
}
if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running.
}
......
......@@ -2,8 +2,10 @@ package batcher
import (
"fmt"
"strings"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// txData represents the data for a single transaction.
......@@ -12,47 +14,89 @@ import (
// might change in the future to allow for multiple frames from possibly
// different channels.
type txData struct {
frame frameData
frames []frameData
}
// ID returns the id for this transaction data. It can be used as a map key.
func singleFrameTxData(frame frameData) txData {
return txData{frames: []frameData{frame}}
}
// ID returns the id for this transaction data. Its String() can be used as a map key.
func (td *txData) ID() txID {
return td.frame.id
id := make(txID, 0, len(td.frames))
for _, f := range td.frames {
id = append(id, f.id)
}
return id
}
// CallData returns the transaction data as calldata.
// It's a version byte (0) followed by the concatenated frames for this transaction.
func (td *txData) CallData() []byte {
data := make([]byte, 1, 1+td.Len())
data[0] = derive.DerivationVersion0
for _, f := range td.frames {
data = append(data, f.data...)
}
return data
}
// Bytes returns the transaction data. It's a version byte (0) followed by the
// concatenated frames for this transaction.
func (td *txData) Bytes() []byte {
return append([]byte{derive.DerivationVersion0}, td.frame.data...)
func (td *txData) Blobs() ([]*eth.Blob, error) {
blobs := make([]*eth.Blob, 0, len(td.frames))
for _, f := range td.frames {
var blob eth.Blob
if err := blob.FromData(append([]byte{derive.DerivationVersion0}, f.data...)); err != nil {
return nil, err
}
blobs = append(blobs, &blob)
}
return blobs, nil
}
func (td *txData) Len() int {
return 1 + len(td.frame.data)
// Len returns the sum of all the sizes of data in all frames.
// Len only counts the data itself and doesn't account for the version byte(s).
func (td *txData) Len() (l int) {
for _, f := range td.frames {
l += len(f.data)
}
return l
}
// Frame returns the single frame of this tx data.
//
// Note: when the batcher is changed to possibly send multiple frames per tx,
// this should be changed to a func Frames() []frameData.
func (td *txData) Frame() frameData {
return td.frame
func (td *txData) Frames() []frameData {
return td.frames
}
// txID is an opaque identifier for a transaction.
// It's internal fields should not be inspected after creation & are subject to change.
// This ID must be trivially comparable & work as a map key.
//
// Note: transactions currently only hold a single frame, so it can be
// identified by the frame. This needs to be changed once the batcher is changed
// to send multiple frames per tx.
type txID = frameID
// Its internal fields should not be inspected after creation & are subject to change.
// Its String() can be used for comparisons and works as a map key.
type txID []frameID
func (id txID) String() string {
return fmt.Sprintf("%s:%d", id.chID.String(), id.frameNumber)
return id.string(func(id derive.ChannelID) string { return id.String() })
}
// TerminalString implements log.TerminalStringer, formatting a string for console
// output during logging.
func (id txID) TerminalString() string {
return fmt.Sprintf("%s:%d", id.chID.TerminalString(), id.frameNumber)
return id.string(func(id derive.ChannelID) string { return id.TerminalString() })
}
func (id txID) string(chIDStringer func(id derive.ChannelID) string) string {
var (
sb strings.Builder
curChID derive.ChannelID
)
for _, f := range id {
if f.chID == curChID {
sb.WriteString(fmt.Sprintf("+%d", f.frameNumber))
} else {
if curChID != (derive.ChannelID{}) {
sb.WriteString("|")
}
curChID = f.chID
sb.WriteString(fmt.Sprintf("%s:%d", chIDStringer(f.chID), f.frameNumber))
}
}
return sb.String()
}
package batcher
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestTxID_String(t *testing.T) {
for _, test := range []struct {
desc string
id txID
expStr string
}{
{
desc: "empty",
id: []frameID{},
expStr: "",
},
{
desc: "nil",
id: nil,
expStr: "",
},
{
desc: "single",
id: []frameID{{
chID: [16]byte{0: 0xca, 15: 0xaf},
frameNumber: 42,
}},
expStr: "ca0000000000000000000000000000af:42",
},
{
desc: "multi",
id: []frameID{
{
chID: [16]byte{0: 0xca, 15: 0xaf},
frameNumber: 42,
},
{
chID: [16]byte{0: 0xca, 15: 0xaf},
frameNumber: 33,
},
{
chID: [16]byte{0: 0xbe, 15: 0xef},
frameNumber: 0,
},
{
chID: [16]byte{0: 0xbe, 15: 0xef},
frameNumber: 128,
},
},
expStr: "ca0000000000000000000000000000af:42+33|be0000000000000000000000000000ef:0+128",
},
} {
t.Run(test.desc, func(t *testing.T) {
require.Equal(t, test.expStr, test.id.String())
})
}
}
......@@ -5,13 +5,11 @@ import (
)
type Config struct {
// TargetFrameSize to target when creating channel frames. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close the target is to the
// max frame size.
// TargetFrameSize to target when creating channel frames.
// It is guaranteed that a frame will never be larger.
TargetFrameSize uint64
// TargetNumFrames to create in this channel. If the realized compression ratio
// is worse than approxComprRatio, additional leftover frame(s) might get created.
// TargetNumFrames to create in this channel. If the first block that is added
// doesn't fit within a single frame, more frames might be created.
TargetNumFrames int
// ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
......
package geth
import (
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
var ErrNotFound = errors.New("not found")
// FindBlock finds the first block for which the predicate [pred] matches
// and returns it. It starts at [from] and iterates until [to], inclusively,
// using the provided [client]. It supports both search directions, forwards
// and backwards.
func FindBlock(client *ethclient.Client,
from, to int, timeout time.Duration,
pred func(*types.Block) (bool, error),
) (*types.Block, error) {
dir := 1
if from > to {
dir = -1
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for n := from; ; n += dir {
b, err := client.BlockByNumber(ctx, big.NewInt(int64(n)))
if err != nil {
return nil, fmt.Errorf("fetching block[%d]: %w", n, err)
}
ok, err := pred(b)
if err != nil {
return nil, fmt.Errorf("predicate error[%d]: %w", n, err)
} else if ok {
return b, nil
}
// include n in range
if n == to {
break
}
}
return nil, ErrNotFound
}
......@@ -3,6 +3,7 @@ package op_e2e
import (
"context"
"math/big"
"math/rand"
"testing"
"time"
......@@ -11,22 +12,43 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
gethutils "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
// TestSystem4844E2E runs the SystemE2E test with 4844 enabled on L1,
// and active on the rollup in the op-batcher and verifier.
func TestSystem4844E2E(t *testing.T) {
t.Run("single-blob", func(t *testing.T) { testSystem4844E2E(t, false) })
t.Run("multi-blob", func(t *testing.T) { testSystem4844E2E(t, true) })
}
func testSystem4844E2E(t *testing.T, multiBlob bool) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
cfg.DataAvailabilityType = batcherFlags.BlobsType
const maxBlobs = 6
var maxL1TxSize int
if multiBlob {
cfg.BatcherTargetNumFrames = 6
cfg.BatcherUseMaxTxSizeForBlobs = true
// leads to 6 blobs for an L2 block with a user tx with 400 random bytes
// while all other L2 blocks take 1 blob (deposit tx)
maxL1TxSize = derive.FrameV0OverHeadSize + 100
cfg.BatcherMaxL1TxSizeBytes = uint64(maxL1TxSize)
}
genesisActivation := hexutil.Uint64(0)
cfg.DeployConfig.L1CancunTimeOffset = &genesisActivation
......@@ -77,6 +99,10 @@ func TestSystem4844E2E(t *testing.T) {
opts.Value = big.NewInt(1_000_000_000)
opts.Nonce = 1 // Already have deposit
opts.ToAddr = &common.Address{0xff, 0xff}
// put some random data in the tx to make it fill up 6 blobs (multi-blob case)
opts.Data = testutils.RandomData(rand.New(rand.NewSource(420)), 400)
opts.Gas, err = core.IntrinsicGas(opts.Data, nil, false, true, true, false)
require.NoError(t, err)
opts.VerifyOnClients(l2Verif)
})
......@@ -108,11 +134,61 @@ func TestSystem4844E2E(t *testing.T) {
// wait for chain to be marked as "safe" (i.e. confirm batch-submission works)
stat, err := rollupClient.SyncStatus(context.Background())
require.NoError(t, err)
return stat.SafeL2.Number > 0
return stat.SafeL2.Number >= receipt.BlockNumber.Uint64()
}, time.Second*20, time.Second, "expected L2 to be batch-submitted and labeled as safe")
// check that the L2 tx is still canonical
seqBlock, err = l2Seq.BlockByNumber(context.Background(), receipt.BlockNumber)
require.NoError(t, err)
require.Equal(t, seqBlock.Hash(), receipt.BlockHash, "receipt block must match canonical block at tx inclusion height")
// find L1 block that contained the blob(s) batch tx
tip, err := l1Client.HeaderByNumber(context.Background(), nil)
require.NoError(t, err)
var blobTx *types.Transaction
blobBlock, err := gethutils.FindBlock(l1Client, int(tip.Number.Int64()), 0, 5*time.Second,
func(b *types.Block) (bool, error) {
for _, tx := range b.Transactions() {
if tx.Type() != types.BlobTxType {
continue
}
// expect to find at least one tx with multiple blobs in multi-blob case
if !multiBlob || len(tx.BlobHashes()) > 1 {
blobTx = tx
return true, nil
}
}
return false, nil
})
require.NoError(t, err)
numBlobs := len(blobTx.BlobHashes())
if !multiBlob {
require.NotZero(t, numBlobs, "single-blob: expected to find L1 blob tx")
} else {
require.Equal(t, maxBlobs, numBlobs, "multi-blob: expected to find L1 blob tx with 6 blobs")
// blob tx should have filled up all but last blob
bcl := sys.L1BeaconHTTPClient()
hashes := toIndexedBlobHashes(blobTx.BlobHashes()...)
sidecars, err := bcl.BeaconBlobSideCars(context.Background(), false, sys.L1Slot(blobBlock.Time()), hashes)
require.NoError(t, err)
require.Len(t, sidecars.Data, maxBlobs)
for i := 0; i < maxBlobs-1; i++ {
data, err := sidecars.Data[i].Blob.ToData()
require.NoError(t, err)
require.Len(t, data, maxL1TxSize)
}
// last blob should only be partially filled
data, err := sidecars.Data[maxBlobs-1].Blob.ToData()
require.NoError(t, err)
require.Less(t, len(data), maxL1TxSize)
}
}
func toIndexedBlobHashes(hs ...common.Hash) []eth.IndexedBlobHash {
hashes := make([]eth.IndexedBlobHash, 0, len(hs))
for i, hash := range hs {
hashes = append(hashes, eth.IndexedBlobHash{Index: uint64(i), Hash: hash})
}
return hashes
}
......@@ -58,6 +58,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -67,9 +68,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
var (
testingJWTSecret = [32]byte{123}
)
var testingJWTSecret = [32]byte{123}
func newTxMgrConfig(l1Addr string, privKey *ecdsa.PrivateKey) txmgr.CLIConfig {
return txmgr.CLIConfig{
......@@ -220,6 +219,14 @@ type SystemConfig struct {
// Max L1 tx size for the batcher transactions
BatcherMaxL1TxSizeBytes uint64
// Target number of frames to create per channel. Can be used to create
// multi-blob transactions.
// Default is 1 if unset.
BatcherTargetNumFrames uint64
// whether to actually use BatcherMaxL1TxSizeBytes for blobs, insteaf of max blob size
BatcherUseMaxTxSizeForBlobs bool
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel bool
......@@ -307,6 +314,11 @@ func (sys *System) L1BeaconEndpoint() string {
return sys.L1BeaconAPIAddr
}
func (sys *System) L1BeaconHTTPClient() *sources.BeaconHTTPClient {
logger := testlog.Logger(sys.t, log.LevelInfo).New("component", "beaconClient")
return sources.NewBeaconHTTPClient(client.NewBasicHTTPClient(sys.L1BeaconEndpoint(), logger))
}
func (sys *System) NodeEndpoint(name string) string {
return selectEndpoint(sys.EthInstances[name])
}
......@@ -344,6 +356,11 @@ func (sys *System) L2Genesis() *core.Genesis {
return sys.L2GenesisCfg
}
func (sys *System) L1Slot(l1Timestamp uint64) uint64 {
return (l1Timestamp - uint64(sys.Cfg.DeployConfig.L1GenesisBlockTimestamp)) /
sys.Cfg.DeployConfig.L1BlockTime
}
func (sys *System) Close() {
if !sys.closed.CompareAndSwap(false, true) {
// Already closed.
......@@ -795,20 +812,26 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
if cfg.DeployConfig.L2GenesisDeltaTimeOffset != nil && *cfg.DeployConfig.L2GenesisDeltaTimeOffset == hexutil.Uint64(0) {
batchType = derive.SpanBatchType
}
// batcher defaults if unset
batcherMaxL1TxSizeBytes := cfg.BatcherMaxL1TxSizeBytes
if batcherMaxL1TxSizeBytes == 0 {
batcherMaxL1TxSizeBytes = 240_000
}
batcherTargetNumFrames := cfg.BatcherTargetNumFrames
if batcherTargetNumFrames == 0 {
batcherTargetNumFrames = 1
}
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: cfg.MaxPendingTransactions,
MaxChannelDuration: 1,
MaxL1TxSize: batcherMaxL1TxSizeBytes,
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: cfg.MaxPendingTransactions,
MaxChannelDuration: 1,
MaxL1TxSize: batcherMaxL1TxSizeBytes,
TestUseMaxTxSizeForBlobs: cfg.BatcherUseMaxTxSizeForBlobs,
CompressorConfig: compressor.CLIConfig{
TargetL1TxSizeBytes: cfg.BatcherTargetL1TxSizeBytes,
TargetNumFrames: 1,
TargetNumFrames: int(batcherTargetNumFrames),
ApproxComprRatio: 0.4,
},
SubSafetyMargin: 4,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment