Commit 90435d07 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

op-batcher: Fix concurrent map write (#13527)

mtx on channel_manager too
parent 3772b5f3
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -55,6 +56,8 @@ type channelManager struct { ...@@ -55,6 +56,8 @@ type channelManager struct {
channelQueue []*channel channelQueue []*channel
// used to lookup channels by tx ID upon tx success / failure // used to lookup channels by tx ID upon tx success / failure
txChannels map[string]*channel txChannels map[string]*channel
mtx sync.Mutex
} }
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager {
...@@ -93,6 +96,9 @@ func (s *channelManager) pendingBlocks() int { ...@@ -93,6 +96,9 @@ func (s *channelManager) pendingBlocks() int {
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction. // in the failed transaction.
func (s *channelManager) TxFailed(_id txID) { func (s *channelManager) TxFailed(_id txID) {
s.mtx.Lock()
defer s.mtx.Unlock()
id := _id.String() id := _id.String()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id) delete(s.txChannels, id)
...@@ -105,6 +111,8 @@ func (s *channelManager) TxFailed(_id txID) { ...@@ -105,6 +111,8 @@ func (s *channelManager) TxFailed(_id txID) {
// TxConfirmed marks a transaction as confirmed on L1. Only if the channel timed out // TxConfirmed marks a transaction as confirmed on L1. Only if the channel timed out
// the channelManager's state is modified. // the channelManager's state is modified.
func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) { func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
s.mtx.Lock()
defer s.mtx.Unlock()
id := _id.String() id := _id.String()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
...@@ -191,6 +199,9 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { ...@@ -191,6 +199,9 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// When switching DA type, the channelManager state will be rebuilt // When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig. // with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
channel, err := s.getReadyChannel(l1Head) channel, err := s.getReadyChannel(l1Head)
if err != nil { if err != nil {
return emptyTxData, err return emptyTxData, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment