Commit 05503827 authored by Sebastian Stammler's avatar Sebastian Stammler

op-batcher: Refactor frame & tx data handling

Introduces two separate types for frame data and tx data to make a clear
distinction between the two. Also prepares the batcher for future changes
when it will support sending multiple frames per tx.
parent e251dac4
...@@ -54,6 +54,16 @@ func (c ChannelConfig) InputThreshold() uint64 { ...@@ -54,6 +54,16 @@ func (c ChannelConfig) InputThreshold() uint64 {
return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio) return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio)
} }
type frameID struct {
chID derive.ChannelID
frameNumber uint16
}
type frameData struct {
data []byte
id frameID
}
// channelBuilder uses a ChannelOut to create a channel with output frame // channelBuilder uses a ChannelOut to create a channel with output frame
// size approximation. // size approximation.
type channelBuilder struct { type channelBuilder struct {
...@@ -76,7 +86,7 @@ type channelBuilder struct { ...@@ -76,7 +86,7 @@ type channelBuilder struct {
// list of blocks in the channel. Saved in case the channel must be rebuilt // list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block blocks []*types.Block
// frames data queue, to be send as txs // frames data queue, to be send as txs
frames []taggedData frames []frameData
} }
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
...@@ -319,7 +329,7 @@ func (c *channelBuilder) outputFrame() error { ...@@ -319,7 +329,7 @@ func (c *channelBuilder) outputFrame() error {
c.setFullErr(ErrMaxFrameIndex) c.setFullErr(ErrMaxFrameIndex)
} }
frame := taggedData{ frame := frameData{
id: txID{chID: c.co.ID(), frameNumber: fn}, id: txID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(), data: buf.Bytes(),
} }
...@@ -343,23 +353,23 @@ func (c *channelBuilder) NumFrames() int { ...@@ -343,23 +353,23 @@ func (c *channelBuilder) NumFrames() int {
// NextFrame returns the next available frame. // NextFrame returns the next available frame.
// HasFrame must be called prior to check if there's a next frame available. // HasFrame must be called prior to check if there's a next frame available.
// Panics if called when there's no next frame. // Panics if called when there's no next frame.
func (c *channelBuilder) NextFrame() (txID, []byte) { func (c *channelBuilder) NextFrame() frameData {
if len(c.frames) == 0 { if len(c.frames) == 0 {
panic("no next frame") panic("no next frame")
} }
f := c.frames[0] f := c.frames[0]
c.frames = c.frames[1:] c.frames = c.frames[1:]
return f.id, f.data return f
} }
// PushFrame adds the frame back to the internal frames queue. Panics if not of // PushFrame adds the frame back to the internal frames queue. Panics if not of
// the same channel. // the same channel.
func (c *channelBuilder) PushFrame(id txID, frame []byte) { func (c *channelBuilder) PushFrame(frame frameData) {
if id.chID != c.ID() { if frame.id.chID != c.ID() {
panic("wrong channel") panic("wrong channel")
} }
c.frames = append(c.frames, taggedData{id: id, data: frame}) c.frames = append(c.frames, frame)
} }
var ( var (
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"math" "math"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -15,29 +14,6 @@ import ( ...@@ -15,29 +14,6 @@ import (
var ErrReorg = errors.New("block does not extend existing chain") var ErrReorg = errors.New("block does not extend existing chain")
// txID is an opaque identifier for a transaction.
// It's internal fields should not be inspected after creation & are subject to change.
// This ID must be trivially comparable & work as a map key.
type txID struct {
chID derive.ChannelID
frameNumber uint16
}
func (id txID) String() string {
return fmt.Sprintf("%s:%d", id.chID.String(), id.frameNumber)
}
// TerminalString implements log.TerminalStringer, formatting a string for console
// output during logging.
func (id txID) TerminalString() string {
return fmt.Sprintf("%s:%d", id.chID.TerminalString(), id.frameNumber)
}
type taggedData struct {
data []byte
id txID
}
// channelManager stores a contiguous set of blocks & turns them into channels. // channelManager stores a contiguous set of blocks & turns them into channels.
// Upon receiving tx confirmation (or a tx failure), it does channel error handling. // Upon receiving tx confirmation (or a tx failure), it does channel error handling.
// //
...@@ -59,7 +35,7 @@ type channelManager struct { ...@@ -59,7 +35,7 @@ type channelManager struct {
// pending channel builder // pending channel builder
pendingChannel *channelBuilder pendingChannel *channelBuilder
// Set of unconfirmed txID -> frame data. For tx resubmission // Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID][]byte pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out // Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
} }
...@@ -68,7 +44,7 @@ func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager { ...@@ -68,7 +44,7 @@ func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
return &channelManager{ return &channelManager{
log: log, log: log,
cfg: cfg, cfg: cfg,
pendingTransactions: make(map[txID][]byte), pendingTransactions: make(map[txID]txData),
confirmedTransactions: make(map[txID]eth.BlockID), confirmedTransactions: make(map[txID]eth.BlockID),
} }
} }
...@@ -87,7 +63,10 @@ func (s *channelManager) Clear() { ...@@ -87,7 +63,10 @@ func (s *channelManager) Clear() {
func (s *channelManager) TxFailed(id txID) { func (s *channelManager) TxFailed(id txID) {
if data, ok := s.pendingTransactions[id]; ok { if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id) s.log.Trace("marked transaction as failed", "id", id)
s.pendingChannel.PushFrame(id, data[1:]) // strip the version byte // Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
s.pendingChannel.PushFrame(data.Frame())
delete(s.pendingTransactions, id) delete(s.pendingTransactions, id)
} else { } else {
s.log.Warn("unknown transaction marked as failed", "id", id) s.log.Warn("unknown transaction marked as failed", "id", id)
...@@ -128,7 +107,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { ...@@ -128,7 +107,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// TODO: Create separate "pending" state // TODO: Create separate "pending" state
func (s *channelManager) clearPendingChannel() { func (s *channelManager) clearPendingChannel() {
s.pendingChannel = nil s.pendingChannel = nil
s.pendingTransactions = make(map[txID][]byte) s.pendingTransactions = make(map[txID]txData)
s.confirmedTransactions = make(map[txID]eth.BlockID) s.confirmedTransactions = make(map[txID]eth.BlockID)
} }
...@@ -166,21 +145,19 @@ func (s *channelManager) pendingChannelIsFullySubmitted() bool { ...@@ -166,21 +145,19 @@ func (s *channelManager) pendingChannelIsFullySubmitted() bool {
} }
// nextTxData pops off s.datas & handles updating the internal state // nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData() ([]byte, txID, error) { func (s *channelManager) nextTxData() (txData, error) {
if s.pendingChannel == nil || !s.pendingChannel.HasFrame() { if s.pendingChannel == nil || !s.pendingChannel.HasFrame() {
s.log.Trace("no next tx data") s.log.Trace("no next tx data")
return nil, txID{}, io.EOF // TODO: not enough data error instead return txData{}, io.EOF // TODO: not enough data error instead
} }
id, data := s.pendingChannel.NextFrame() frame := s.pendingChannel.NextFrame()
// prepend version byte for first frame of transaction txdata := txData{frame}
// TODO: more memory efficient solution; shouldn't be responsibility of id := txdata.ID()
// channelBuilder though.
data = append([]byte{0}, data...)
s.log.Trace("returning next tx data", "id", id) s.log.Trace("returning next tx data", "id", id)
s.pendingTransactions[id] = data s.pendingTransactions[id] = txdata
return data, id, nil return txdata, nil
} }
// TxData returns the next tx data that should be submitted to L1. // TxData returns the next tx data that should be submitted to L1.
...@@ -188,7 +165,7 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) { ...@@ -188,7 +165,7 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) {
// It currently only uses one frame per transaction. If the pending channel is // It currently only uses one frame per transaction. If the pending channel is
// full, it only returns the remaining frames of this channel until it got // full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame. // successfully fully sent to L1. It returns io.EOF if there's no pending frame.
func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
...@@ -201,15 +178,15 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) { ...@@ -201,15 +178,15 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
// If we have no saved blocks, we will not be able to create valid frames // If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 { if len(s.blocks) == 0 {
return nil, txID{}, io.EOF return txData{}, io.EOF
} }
if err := s.ensurePendingChannel(l1Head); err != nil { if err := s.ensurePendingChannel(l1Head); err != nil {
return nil, txID{}, err return txData{}, err
} }
if err := s.processBlocks(); err != nil { if err := s.processBlocks(); err != nil {
return nil, txID{}, err return txData{}, err
} }
// Register current L1 head only after all pending blocks have been // Register current L1 head only after all pending blocks have been
...@@ -218,7 +195,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) { ...@@ -218,7 +195,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
s.registerL1Block(l1Head) s.registerL1Block(l1Head)
if err := s.pendingChannel.OutputFrames(); err != nil { if err := s.pendingChannel.OutputFrames(); err != nil {
return nil, txID{}, fmt.Errorf("creating frames with channel builder: %w", err) return txData{}, fmt.Errorf("creating frames with channel builder: %w", err)
} }
return s.nextTxData() return s.nextTxData()
......
...@@ -77,9 +77,9 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -77,9 +77,9 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
err = m.AddL2Block(a) err = m.AddL2Block(a)
require.NoError(t, err) require.NoError(t, err)
_, _, err = m.TxData(eth.BlockID{}) _, err = m.TxData(eth.BlockID{})
require.NoError(t, err) require.NoError(t, err)
_, _, err = m.TxData(eth.BlockID{}) _, err = m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF) require.ErrorIs(t, err, io.EOF)
err = m.AddL2Block(x) err = m.AddL2Block(x)
require.ErrorIs(t, err, batcher.ErrReorg) require.ErrorIs(t, err, batcher.ErrReorg)
......
...@@ -280,7 +280,7 @@ func (l *BatchSubmitter) loop() { ...@@ -280,7 +280,7 @@ func (l *BatchSubmitter) loop() {
} }
// Collect next transaction data // Collect next transaction data
data, id, err := l.state.TxData(l1tip.ID()) txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF { if err == io.EOF {
l.log.Trace("no transaction data available") l.log.Trace("no transaction data available")
break // local for loop break // local for loop
...@@ -289,10 +289,10 @@ func (l *BatchSubmitter) loop() { ...@@ -289,10 +289,10 @@ func (l *BatchSubmitter) loop() {
break break
} }
// Record TX Status // Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, data); err != nil { if receipt, err := l.txMgr.SendTransaction(l.ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(id, err) l.recordFailedTx(txdata.ID(), err)
} else { } else {
l.recordConfirmedTx(id, receipt) l.recordConfirmedTx(txdata.ID(), receipt)
} }
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending // hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
......
package batcher
import (
"fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
// txData represents the data for a single transaction.
//
// Note: The batcher currently sends exactly one frame per transaction. This
// might change in the future to allow for multiple frames from possibly
// different channels.
type txData struct {
frame frameData
}
// ID returns the id for this transaction data. It can be used as a map key.
func (td *txData) ID() txID {
return td.frame.id
}
// Bytes returns the transaction data. It's a version byte (0) followed by the
// concatenated frames for this transaction.
func (td *txData) Bytes() []byte {
return append([]byte{derive.DerivationVersion0}, td.frame.data...)
}
// Frame returns the single frame of this tx data.
//
// Note: when the batcher is changed to possibly send multiple frames per tx,
// this should be changed to a func Frames() []frameData.
func (td *txData) Frame() frameData {
return td.frame
}
// txID is an opaque identifier for a transaction.
// It's internal fields should not be inspected after creation & are subject to change.
// This ID must be trivially comparable & work as a map key.
//
// Note: transactions currently only hold a single frame, so it can be
// identified by the frame. This needs to be changed once the batcher is changed
// to send multiple frames per tx.
type txID = frameID
func (id txID) String() string {
return fmt.Sprintf("%s:%d", id.chID.String(), id.frameNumber)
}
// TerminalString implements log.TerminalStringer, formatting a string for console
// output during logging.
func (id txID) TerminalString() string {
return fmt.Sprintf("%s:%d", id.chID.TerminalString(), id.frameNumber)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment