Commit d8cda3f1 authored by Joshua Gutow's avatar Joshua Gutow

op-batcher: Make channel manager safe for concurrent use

Because the batcher is concurrent with respect to sending transactions,
the main loop would fetch transaction data & record transaction status
updates in different go routines. This would lead to concurrent access
of the txChannels map which causes a panic.

This commit adds locks to the public methods for the channel manager
to make it safe for concurrent use.
Co-authored-by: default avatarbnoieh <135800952+bnoieh@users.noreply.github.com>
parent 4bb2e6c6
......@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
......@@ -21,8 +22,9 @@ var ErrReorg = errors.New("block does not extend existing chain")
// For simplicity, it only creates a single pending channel at a time & waits for
// the channel to either successfully be submitted or timeout before creating a new
// channel.
// Functions on channelManager are not safe for concurrent access.
// Public functions on channelManager are safe for concurrent access.
type channelManager struct {
mu sync.Mutex
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
......@@ -55,6 +57,8 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig)
// Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg.
func (s *channelManager) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0]
s.tip = common.Hash{}
......@@ -67,6 +71,8 @@ func (s *channelManager) Clear() {
// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(id txID) {
s.mu.Lock()
defer s.mu.Unlock()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
channel.TxFailed(id)
......@@ -84,6 +90,8 @@ func (s *channelManager) TxFailed(id txID) {
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
done, blocks := channel.TxConfirmed(id, inclusionBlock)
......@@ -134,6 +142,8 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
var firstWithFrame *channel
for _, ch := range s.channelQueue {
if ch.HasFrame() {
......@@ -298,6 +308,8 @@ func (s *channelManager) outputFrames() error {
// if the block does not extend the last block loaded into the state. If no
// blocks were added yet, the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg
}
......@@ -324,6 +336,8 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
func (s *channelManager) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment