Commit 4abae618 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-batcher: Rewrite ChannelManager (#3859)

* op-batcher: Rewrite ChannelManager

The channel manager is now responsible for tracking in flight frames
and properly handling different error conditions.

* op-batcher: String() & TerminalString() for txID

* op-batcher: Remove channelPending bool

* op-batcher: Fix wraparound bug in timeout check

* op-batcher: Comments + Logging

* op-batcher: Properly exit loop
parent 9940c8cd
......@@ -3,122 +3,274 @@ package op_batcher
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
var ErrReorg = errors.New("block does not extend existing chain")
// txID is an opaque identifier for a transaction.
// It's internal fields should not be inspected after creation & are subject to change.
// This ID must be trivially comparable & work as a map key.
type txID struct {
chID derive.ChannelID
frameNumber uint16
}
func (id txID) String() string {
return fmt.Sprintf("%s:%d", id.chID.String(), id.frameNumber)
}
// TerminalString implements log.TerminalStringer, formatting a string for console
// output during logging.
func (id txID) TerminalString() string {
return fmt.Sprintf("%s:%d", id.chID.TerminalString(), id.frameNumber)
}
type taggedData struct {
data []byte
id txID
}
// channelManager stores a contiguous set of blocks & turns them into channels.
// Upon receiving tx confirmation (or a tx failure), it does channel error handling.
//
// For simplicity, it only creates a single pending channel at a time & waits for
// the channel to either successfully be submitted or timeout before creating a new
// channel.
// Functions on channelManager are not safe for concurrent access.
type channelManager struct {
// All blocks since the last request for new tx data
log log.Logger
channelTimeout uint64
// All blocks since the last request for new tx data.
blocks []*types.Block
datas []taggedData
// Pending data returned by TxData waiting on Tx Confirmed/Failed
// id of the pending channel
pendingChannel derive.ChannelID
// list of blocks in the channel. Saved in case the channel must be rebuilt
pendingBlocks []*types.Block
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID][]byte
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
}
func NewChannelManager(log log.Logger, channelTimeout uint64) *channelManager {
return &channelManager{
log: log,
channelTimeout: channelTimeout,
pendingTransactions: make(map[txID][]byte),
confirmedTransactions: make(map[txID]eth.BlockID),
}
}
// Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg.
func (s *channelManager) Clear() {
s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0]
s.datas = s.datas[:0]
}
// func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// // todo: implement
// }
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(id txID) {
if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id)
s.datas = append(s.datas, taggedData{data, id})
delete(s.pendingTransactions, id)
} else {
s.log.Info("marked transaction as failed despite having no record of it.", "id", id)
}
}
// TxData returns the next tx.data that should be submitted to L1.
// It is very simple & currently ignores the l1Head provided (this will change).
// It may buffer very large channels as well.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
// Note: l1Head is not actually used in this function.
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
s.log.Info("marked transaction as confirmed despite having no record of it", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead
return
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
// Return a pre-existing frame if we have it.
if len(s.datas) != 0 {
r := s.datas[0]
s.datas = s.datas[1:]
return r.data, r.id, nil
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if s.pendingChannelIsTimedOut() {
s.log.Warn("Channel timed out", "chID", s.pendingChannel)
s.blocks = append(s.pendingBlocks, s.blocks...)
s.clearPendingChannel()
}
// If we are done with this channel, record that.
if s.pendingChannelIsFullySubmitted() {
s.log.Info("Channel is fully submitted", "chID", s.pendingChannel)
s.clearPendingChannel()
}
}
// Also return io.EOF if we cannot create a channel
if len(s.blocks) == 0 {
return nil, txID{}, io.EOF
// clearPendingChannel resets all pending state back to an initialized but empty state.
// TODO: Create separate "pending" state
func (s *channelManager) clearPendingChannel() {
s.pendingChannel = derive.ChannelID{}
s.pendingBlocks = nil
s.pendingTransactions = make(map[txID][]byte)
s.confirmedTransactions = make(map[txID]eth.BlockID)
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channelManager) pendingChannelIsTimedOut() bool {
if s.pendingChannel == (derive.ChannelID{}) {
return false // no channel to be timed out
}
// No confirmed transactions => not timed out
if len(s.confirmedTransactions) == 0 {
return false
}
// If there are confirmed transactions, find the first + last confirmed block numbers
min := uint64(math.MaxUint64)
max := uint64(0)
for _, inclusionBlock := range s.confirmedTransactions {
if inclusionBlock.Number < min {
min = inclusionBlock.Number
}
if inclusionBlock.Number > max {
max = inclusionBlock.Number
}
}
return max-min >= s.channelTimeout
}
// Add all pending blocks to a channel
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channelManager) pendingChannelIsFullySubmitted() bool {
if s.pendingChannel == (derive.ChannelID{}) {
return false // todo: can decide either way here. Nonsensical answer though
}
return len(s.pendingTransactions)+len(s.datas) == 0
}
// blocksToFrames turns a set of blocks into a set of frames inside a channel.
// It will only create a single channel which contains up to `MAX_RLP_BYTES`. Any
// blocks not added to the channel are returned. It uses the max supplied frame size.
func blocksToFrames(blocks []*types.Block, maxFrameSize uint64) (derive.ChannelID, [][]byte, []*types.Block, error) {
ch, err := derive.NewChannelOut()
if err != nil {
return nil, txID{}, err
return derive.ChannelID{}, nil, nil, err
}
// TODO: use peek/pop paradigm here instead of manually slicing
i := 0
// Cap length at 100 blocks
l := len(s.blocks)
if l > 100 {
l = 100
}
for ; i < l; i++ {
if err := ch.AddBlock(s.blocks[i]); err == derive.ErrTooManyRLPBytes {
for ; i < len(blocks); i++ {
if err := ch.AddBlock(blocks[i]); err == derive.ErrTooManyRLPBytes {
break
} else if err != nil {
return nil, txID{}, err
return derive.ChannelID{}, nil, nil, err
}
// TODO: limit the RLP size of the channel to be lower than the limit to enable
// channels to be fully submitted on time.
}
if err := ch.Close(); err != nil {
return nil, txID{}, err
return derive.ChannelID{}, nil, nil, err
}
var t []taggedData
frameNumber := uint16(0)
var frames [][]byte
for {
var buf bytes.Buffer
buf.WriteByte(derive.DerivationVersion0)
err := ch.OutputFrame(&buf, 120_000)
err := ch.OutputFrame(&buf, maxFrameSize-1)
if err != io.EOF && err != nil {
return nil, txID{}, err
return derive.ChannelID{}, nil, nil, err
}
t = append(t, taggedData{
data: buf.Bytes(),
id: txID{ch.ID(), frameNumber},
})
frameNumber += 1
frames = append(frames, buf.Bytes())
if err == io.EOF {
break
}
}
return ch.ID(), frames, blocks[i:], nil
}
s.datas = append(s.datas, t...)
// Say i = 0, 1 are added to the channel, but i = 2 returns ErrTooManyRLPBytes. i remains 2 & is inclusive, so this works.
// Say all blocks are added, i will be len(blocks) after exiting the loop (but never inside the loop).
s.blocks = s.blocks[i:]
if len(s.datas) == 0 {
// nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData() ([]byte, txID, error) {
if len(s.datas) != 0 {
r := s.datas[0]
s.log.Trace("returning next tx data", "id", r.id)
s.pendingTransactions[r.id] = r.data
s.datas = s.datas[1:]
return r.data, r.id, nil
} else {
return nil, txID{}, io.EOF // TODO: not enough data error instead
}
}
// TxData returns the next tx.data that should be submitted to L1.
// It is very simple & currently ignores the l1Head provided (this will change).
// It may buffer very large channels as well.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
channelPending := s.pendingChannel != (derive.ChannelID{})
s.log.Debug("Requested tx data", "l1Head", l1Head, "channel_pending", channelPending, "block_count", len(s.blocks))
// Short circuit if there is a pending channel.
// We either submit the next frame from that channel or
if channelPending {
return s.nextTxData()
}
// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
return nil, txID{}, io.EOF
}
// Select range of blocks
end := len(s.blocks)
if end > 100 {
end = 100
}
blocks := s.blocks[:end]
s.blocks = s.blocks[end:]
chID, frames, leftOverBlocks, err := blocksToFrames(blocks, 120_000)
// If the range of blocks serialized to be too large, restore
// blocks that could not be included inside the channel
if len(leftOverBlocks) != 0 {
s.blocks = append(leftOverBlocks, s.blocks...)
}
// TODO: Ensure that len(frames) < math.MaxUint16. Should not happen though. One tricky part
// is ensuring that s.blocks is properly restored.
if err != nil {
s.log.Warn("Failed to create channel from blocks", "err", err)
return nil, txID{}, err
}
s.log.Info("Created channel", "chID", chID, "frame_count", len(frames), "l1Head", l1Head)
var t []taggedData
for i, data := range frames {
t = append(t, taggedData{data: data, id: txID{chID: chID, frameNumber: uint16(i)}})
}
// Load up pending state. Note: pending transactions is taken care of by nextTxData
s.datas = t
s.pendingChannel = chID
s.pendingBlocks = blocks[:len(leftOverBlocks)]
return s.nextTxData()
r := s.datas[0]
s.datas = s.datas[1:]
return r.data, r.id, nil
}
// AddL2Block saves an L2 block to the internal state. It returns ErrReorg
// if the block does not extend the last block loaded into the state.
// If no block is already in the channel, the the parent hash check is skipped.
// TODO: Phantom last block b/c if the local state is fully drained we can reorg without realizing it.
func (s *channelManager) AddL2Block(block *types.Block) error {
if len(s.blocks) > 0 {
if s.blocks[len(s.blocks)-1].Hash() != block.ParentHash() {
......
......@@ -150,7 +150,7 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
txMgr: NewTransactionManger(l, txManagerConfig, batchInboxAddress, chainID, sequencerPrivKey, l1Client),
done: make(chan struct{}),
log: l,
state: new(channelManager),
state: NewChannelManager(l, cfg.ChannelTimeout),
// TODO: this context only exists because the even loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
......@@ -234,7 +234,7 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
if l.lastStoredBlock == (eth.BlockID{}) {
l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
} else if l.lastStoredBlock.Number <= syncStatus.SafeL2.Number {
} else if l.lastStoredBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastStoredBlock, "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
}
......@@ -269,18 +269,33 @@ func (l *BatchSubmitter) loop() {
l.loadBlocksIntoState(l.ctx)
// Empty the state after loading into it on every iteration.
blockLoop:
for {
// Collect the output frame
data, _, err := l.state.TxData(eth.L1BlockRef{})
data, id, err := l.state.TxData(eth.L1BlockRef{})
if err == io.EOF {
l.log.Trace("no transaction data available")
break // local for loop
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
break
}
// Drop receipt + error for now
if _, err := l.txMgr.SendTransaction(l.ctx, data); err != nil {
// Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, data); err != nil {
l.log.Error("Failed to send transaction", "err", err)
l.state.TxFailed(id)
} else {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
l.state.TxConfirmed(id, eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash})
}
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
// from the channel manager rather than sending the channel in a loop. This stalls b/c if the
// context is cancelled while sending, it will never fuilly clearing the pending txns.
select {
case <-l.ctx.Done():
break blockLoop
default:
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment