diff --git a/op-batcher/channel_manager.go b/op-batcher/channel_manager.go index 8c029c92323c7dd9f9317e63865fa63b5306d627..a39a614212f9fa3d2d75d211d0ea108bf7c06f70 100644 --- a/op-batcher/channel_manager.go +++ b/op-batcher/channel_manager.go @@ -3,122 +3,274 @@ package op_batcher import ( "bytes" "errors" + "fmt" "io" + "math" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" ) var ErrReorg = errors.New("block does not extend existing chain") +// txID is an opaque identifier for a transaction. +// It's internal fields should not be inspected after creation & are subject to change. +// This ID must be trivially comparable & work as a map key. type txID struct { chID derive.ChannelID frameNumber uint16 } +func (id txID) String() string { + return fmt.Sprintf("%s:%d", id.chID.String(), id.frameNumber) +} + +// TerminalString implements log.TerminalStringer, formatting a string for console +// output during logging. +func (id txID) TerminalString() string { + return fmt.Sprintf("%s:%d", id.chID.TerminalString(), id.frameNumber) +} + type taggedData struct { data []byte id txID } +// channelManager stores a contiguous set of blocks & turns them into channels. +// Upon receiving tx confirmation (or a tx failure), it does channel error handling. +// +// For simplicity, it only creates a single pending channel at a time & waits for +// the channel to either successfully be submitted or timeout before creating a new +// channel. +// Functions on channelManager are not safe for concurrent access. type channelManager struct { - // All blocks since the last request for new tx data + log log.Logger + channelTimeout uint64 + + // All blocks since the last request for new tx data. blocks []*types.Block datas []taggedData + + // Pending data returned by TxData waiting on Tx Confirmed/Failed + // id of the pending channel + pendingChannel derive.ChannelID + // list of blocks in the channel. Saved in case the channel must be rebuilt + pendingBlocks []*types.Block + // Set of unconfirmed txID -> frame data. For tx resubmission + pendingTransactions map[txID][]byte + // Set of confirmed txID -> inclusion block. For determining if the channel is timed out + confirmedTransactions map[txID]eth.BlockID +} + +func NewChannelManager(log log.Logger, channelTimeout uint64) *channelManager { + return &channelManager{ + log: log, + channelTimeout: channelTimeout, + pendingTransactions: make(map[txID][]byte), + confirmedTransactions: make(map[txID]eth.BlockID), + } } // Clear clears the entire state of the channel manager. // It is intended to be used after an L2 reorg. func (s *channelManager) Clear() { + s.log.Trace("clearing channel manager state") s.blocks = s.blocks[:0] s.datas = s.datas[:0] } -// func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { -// // todo: implement -// } +// TxFailed records a transaction as failed. It will attempt to resubmit the data +// in the failed transaction. +func (s *channelManager) TxFailed(id txID) { + if data, ok := s.pendingTransactions[id]; ok { + s.log.Trace("marked transaction as failed", "id", id) + s.datas = append(s.datas, taggedData{data, id}) + delete(s.pendingTransactions, id) + } else { + s.log.Info("marked transaction as failed despite having no record of it.", "id", id) + } +} -// TxData returns the next tx.data that should be submitted to L1. -// It is very simple & currently ignores the l1Head provided (this will change). -// It may buffer very large channels as well. -func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) { - // Note: l1Head is not actually used in this function. +// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in +// a channel have been marked as confirmed on L1 the channel may be invalid & need to be +// resubmitted. +// This function may reset the pending channel if the pending channel has timed out. +func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { + s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock) + if _, ok := s.pendingTransactions[id]; !ok { + s.log.Info("marked transaction as confirmed despite having no record of it", "id", id, "block", inclusionBlock) + // TODO: This can occur if we clear the channel while there are still pending transactions + // We need to keep track of stale transactions instead + return + } + delete(s.pendingTransactions, id) + s.confirmedTransactions[id] = inclusionBlock - // Return a pre-existing frame if we have it. - if len(s.datas) != 0 { - r := s.datas[0] - s.datas = s.datas[1:] - return r.data, r.id, nil + // If this channel timed out, put the pending blocks back into the local saved blocks + // and then reset this state so it can try to build a new channel. + if s.pendingChannelIsTimedOut() { + s.log.Warn("Channel timed out", "chID", s.pendingChannel) + s.blocks = append(s.pendingBlocks, s.blocks...) + s.clearPendingChannel() + } + // If we are done with this channel, record that. + if s.pendingChannelIsFullySubmitted() { + s.log.Info("Channel is fully submitted", "chID", s.pendingChannel) + s.clearPendingChannel() } +} - // Also return io.EOF if we cannot create a channel - if len(s.blocks) == 0 { - return nil, txID{}, io.EOF +// clearPendingChannel resets all pending state back to an initialized but empty state. +// TODO: Create separate "pending" state +func (s *channelManager) clearPendingChannel() { + s.pendingChannel = derive.ChannelID{} + s.pendingBlocks = nil + s.pendingTransactions = make(map[txID][]byte) + s.confirmedTransactions = make(map[txID]eth.BlockID) +} + +// pendingChannelIsTimedOut returns true if submitted channel has timed out. +// A channel has timed out if the difference in L1 Inclusion blocks between +// the first & last included block is greater than or equal to the channel timeout. +func (s *channelManager) pendingChannelIsTimedOut() bool { + if s.pendingChannel == (derive.ChannelID{}) { + return false // no channel to be timed out + } + // No confirmed transactions => not timed out + if len(s.confirmedTransactions) == 0 { + return false } + // If there are confirmed transactions, find the first + last confirmed block numbers + min := uint64(math.MaxUint64) + max := uint64(0) + for _, inclusionBlock := range s.confirmedTransactions { + if inclusionBlock.Number < min { + min = inclusionBlock.Number + } + if inclusionBlock.Number > max { + max = inclusionBlock.Number + } + } + return max-min >= s.channelTimeout +} - // Add all pending blocks to a channel +// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted. +func (s *channelManager) pendingChannelIsFullySubmitted() bool { + if s.pendingChannel == (derive.ChannelID{}) { + return false // todo: can decide either way here. Nonsensical answer though + } + return len(s.pendingTransactions)+len(s.datas) == 0 +} + +// blocksToFrames turns a set of blocks into a set of frames inside a channel. +// It will only create a single channel which contains up to `MAX_RLP_BYTES`. Any +// blocks not added to the channel are returned. It uses the max supplied frame size. +func blocksToFrames(blocks []*types.Block, maxFrameSize uint64) (derive.ChannelID, [][]byte, []*types.Block, error) { ch, err := derive.NewChannelOut() if err != nil { - return nil, txID{}, err + return derive.ChannelID{}, nil, nil, err } - // TODO: use peek/pop paradigm here instead of manually slicing + i := 0 - // Cap length at 100 blocks - l := len(s.blocks) - if l > 100 { - l = 100 - } - for ; i < l; i++ { - if err := ch.AddBlock(s.blocks[i]); err == derive.ErrTooManyRLPBytes { + for ; i < len(blocks); i++ { + if err := ch.AddBlock(blocks[i]); err == derive.ErrTooManyRLPBytes { break } else if err != nil { - return nil, txID{}, err + return derive.ChannelID{}, nil, nil, err } - // TODO: limit the RLP size of the channel to be lower than the limit to enable - // channels to be fully submitted on time. } if err := ch.Close(); err != nil { - return nil, txID{}, err + return derive.ChannelID{}, nil, nil, err } - var t []taggedData - frameNumber := uint16(0) + var frames [][]byte for { var buf bytes.Buffer buf.WriteByte(derive.DerivationVersion0) - err := ch.OutputFrame(&buf, 120_000) + err := ch.OutputFrame(&buf, maxFrameSize-1) if err != io.EOF && err != nil { - return nil, txID{}, err + return derive.ChannelID{}, nil, nil, err } - - t = append(t, taggedData{ - data: buf.Bytes(), - id: txID{ch.ID(), frameNumber}, - }) - frameNumber += 1 + frames = append(frames, buf.Bytes()) if err == io.EOF { break } } + return ch.ID(), frames, blocks[i:], nil +} - s.datas = append(s.datas, t...) - // Say i = 0, 1 are added to the channel, but i = 2 returns ErrTooManyRLPBytes. i remains 2 & is inclusive, so this works. - // Say all blocks are added, i will be len(blocks) after exiting the loop (but never inside the loop). - s.blocks = s.blocks[i:] - - if len(s.datas) == 0 { +// nextTxData pops off s.datas & handles updating the internal state +func (s *channelManager) nextTxData() ([]byte, txID, error) { + if len(s.datas) != 0 { + r := s.datas[0] + s.log.Trace("returning next tx data", "id", r.id) + s.pendingTransactions[r.id] = r.data + s.datas = s.datas[1:] + return r.data, r.id, nil + } else { return nil, txID{}, io.EOF // TODO: not enough data error instead } +} + +// TxData returns the next tx.data that should be submitted to L1. +// It is very simple & currently ignores the l1Head provided (this will change). +// It may buffer very large channels as well. +func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) { + channelPending := s.pendingChannel != (derive.ChannelID{}) + s.log.Debug("Requested tx data", "l1Head", l1Head, "channel_pending", channelPending, "block_count", len(s.blocks)) + + // Short circuit if there is a pending channel. + // We either submit the next frame from that channel or + if channelPending { + return s.nextTxData() + } + // If we have no saved blocks, we will not be able to create valid frames + if len(s.blocks) == 0 { + return nil, txID{}, io.EOF + } + + // Select range of blocks + end := len(s.blocks) + if end > 100 { + end = 100 + } + blocks := s.blocks[:end] + s.blocks = s.blocks[end:] + + chID, frames, leftOverBlocks, err := blocksToFrames(blocks, 120_000) + // If the range of blocks serialized to be too large, restore + // blocks that could not be included inside the channel + if len(leftOverBlocks) != 0 { + s.blocks = append(leftOverBlocks, s.blocks...) + } + // TODO: Ensure that len(frames) < math.MaxUint16. Should not happen though. One tricky part + // is ensuring that s.blocks is properly restored. + if err != nil { + s.log.Warn("Failed to create channel from blocks", "err", err) + return nil, txID{}, err + } + s.log.Info("Created channel", "chID", chID, "frame_count", len(frames), "l1Head", l1Head) + + var t []taggedData + for i, data := range frames { + t = append(t, taggedData{data: data, id: txID{chID: chID, frameNumber: uint16(i)}}) + } + + // Load up pending state. Note: pending transactions is taken care of by nextTxData + s.datas = t + s.pendingChannel = chID + s.pendingBlocks = blocks[:len(leftOverBlocks)] + + return s.nextTxData() - r := s.datas[0] - s.datas = s.datas[1:] - return r.data, r.id, nil } // AddL2Block saves an L2 block to the internal state. It returns ErrReorg // if the block does not extend the last block loaded into the state. // If no block is already in the channel, the the parent hash check is skipped. +// TODO: Phantom last block b/c if the local state is fully drained we can reorg without realizing it. func (s *channelManager) AddL2Block(block *types.Block) error { if len(s.blocks) > 0 { if s.blocks[len(s.blocks)-1].Hash() != block.ParentHash() { diff --git a/op-batcher/driver.go b/op-batcher/driver.go index cb8f36cda6ce10a353ddfc9f4dd5a0a619d1185a..081c30bf31d0883c31a8a11df59dadcddf637c07 100644 --- a/op-batcher/driver.go +++ b/op-batcher/driver.go @@ -150,7 +150,7 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) { txMgr: NewTransactionManger(l, txManagerConfig, batchInboxAddress, chainID, sequencerPrivKey, l1Client), done: make(chan struct{}), log: l, - state: new(channelManager), + state: NewChannelManager(l, cfg.ChannelTimeout), // TODO: this context only exists because the even loop doesn't reach done // if the tx manager is blocking forever due to e.g. insufficient balance. ctx: ctx, @@ -234,7 +234,7 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. if l.lastStoredBlock == (eth.BlockID{}) { l.log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2) l.lastStoredBlock = syncStatus.SafeL2.ID() - } else if l.lastStoredBlock.Number <= syncStatus.SafeL2.Number { + } else if l.lastStoredBlock.Number < syncStatus.SafeL2.Number { l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastStoredBlock, "safe", syncStatus.SafeL2) l.lastStoredBlock = syncStatus.SafeL2.ID() } @@ -269,18 +269,33 @@ func (l *BatchSubmitter) loop() { l.loadBlocksIntoState(l.ctx) // Empty the state after loading into it on every iteration. + blockLoop: for { // Collect the output frame - data, _, err := l.state.TxData(eth.L1BlockRef{}) + data, id, err := l.state.TxData(eth.L1BlockRef{}) if err == io.EOF { + l.log.Trace("no transaction data available") break // local for loop } else if err != nil { l.log.Error("unable to get tx data", "err", err) break } - // Drop receipt + error for now - if _, err := l.txMgr.SendTransaction(l.ctx, data); err != nil { + // Record TX Status + if receipt, err := l.txMgr.SendTransaction(l.ctx, data); err != nil { l.log.Error("Failed to send transaction", "err", err) + l.state.TxFailed(id) + } else { + l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber) + l.state.TxConfirmed(id, eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash}) + } + + // hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending + // from the channel manager rather than sending the channel in a loop. This stalls b/c if the + // context is cancelled while sending, it will never fuilly clearing the pending txns. + select { + case <-l.ctx.Done(): + break blockLoop + default: } }