Commit 2011f6a9 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6878 from ethereum-optimism/jg/race_cond_in_batcher

op-batcher: Make channel manager safe for concurrent use
parents dec032a4 7639c974
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -21,8 +22,9 @@ var ErrReorg = errors.New("block does not extend existing chain") ...@@ -21,8 +22,9 @@ var ErrReorg = errors.New("block does not extend existing chain")
// For simplicity, it only creates a single pending channel at a time & waits for // For simplicity, it only creates a single pending channel at a time & waits for
// the channel to either successfully be submitted or timeout before creating a new // the channel to either successfully be submitted or timeout before creating a new
// channel. // channel.
// Functions on channelManager are not safe for concurrent access. // Public functions on channelManager are safe for concurrent access.
type channelManager struct { type channelManager struct {
mu sync.Mutex
log log.Logger log log.Logger
metr metrics.Metricer metr metrics.Metricer
cfg ChannelConfig cfg ChannelConfig
...@@ -55,6 +57,8 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) ...@@ -55,6 +57,8 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig)
// Clear clears the entire state of the channel manager. // Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg. // It is intended to be used after an L2 reorg.
func (s *channelManager) Clear() { func (s *channelManager) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0] s.blocks = s.blocks[:0]
s.tip = common.Hash{} s.tip = common.Hash{}
...@@ -67,6 +71,8 @@ func (s *channelManager) Clear() { ...@@ -67,6 +71,8 @@ func (s *channelManager) Clear() {
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction. // in the failed transaction.
func (s *channelManager) TxFailed(id txID) { func (s *channelManager) TxFailed(id txID) {
s.mu.Lock()
defer s.mu.Unlock()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id) delete(s.txChannels, id)
channel.TxFailed(id) channel.TxFailed(id)
...@@ -84,6 +90,8 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -84,6 +90,8 @@ func (s *channelManager) TxFailed(id txID) {
// resubmitted. // resubmitted.
// This function may reset the pending channel if the pending channel has timed out. // This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id) delete(s.txChannels, id)
done, blocks := channel.TxConfirmed(id, inclusionBlock) done, blocks := channel.TxConfirmed(id, inclusionBlock)
...@@ -134,6 +142,8 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { ...@@ -134,6 +142,8 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// full, it only returns the remaining frames of this channel until it got // full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame. // successfully fully sent to L1. It returns io.EOF if there's no pending frame.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
var firstWithFrame *channel var firstWithFrame *channel
for _, ch := range s.channelQueue { for _, ch := range s.channelQueue {
if ch.HasFrame() { if ch.HasFrame() {
...@@ -298,6 +308,8 @@ func (s *channelManager) outputFrames() error { ...@@ -298,6 +308,8 @@ func (s *channelManager) outputFrames() error {
// if the block does not extend the last block loaded into the state. If no // if the block does not extend the last block loaded into the state. If no
// blocks were added yet, the parent hash check is skipped. // blocks were added yet, the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error { func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg return ErrReorg
} }
...@@ -324,6 +336,8 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) ...@@ -324,6 +336,8 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
// and prevents the creation of any new channels. // and prevents the creation of any new channels.
// Any outputted frames still need to be published. // Any outputted frames still need to be published.
func (s *channelManager) Close() error { func (s *channelManager) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed { if s.closed {
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment