Commit e0a34554 authored by George Knee's avatar George Knee Committed by GitHub

op-batcher: remove unecessary mutex (#13569)

* Revert "op-batcher: Fix concurrent map write (#13527)"

This reverts commit 90435d07.

* lock channelMgrMutex during recordFailedDARequest

* rename / export pruneSafeBlocks and pruneChannels
parent b36d065e
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -56,8 +55,6 @@ type channelManager struct { ...@@ -56,8 +55,6 @@ type channelManager struct {
channelQueue []*channel channelQueue []*channel
// used to lookup channels by tx ID upon tx success / failure // used to lookup channels by tx ID upon tx success / failure
txChannels map[string]*channel txChannels map[string]*channel
mtx sync.Mutex
} }
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager {
...@@ -96,9 +93,6 @@ func (s *channelManager) pendingBlocks() int { ...@@ -96,9 +93,6 @@ func (s *channelManager) pendingBlocks() int {
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction. // in the failed transaction.
func (s *channelManager) TxFailed(_id txID) { func (s *channelManager) TxFailed(_id txID) {
s.mtx.Lock()
defer s.mtx.Unlock()
id := _id.String() id := _id.String()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id) delete(s.txChannels, id)
...@@ -111,8 +105,6 @@ func (s *channelManager) TxFailed(_id txID) { ...@@ -111,8 +105,6 @@ func (s *channelManager) TxFailed(_id txID) {
// TxConfirmed marks a transaction as confirmed on L1. Only if the channel timed out // TxConfirmed marks a transaction as confirmed on L1. Only if the channel timed out
// the channelManager's state is modified. // the channelManager's state is modified.
func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) { func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
s.mtx.Lock()
defer s.mtx.Unlock()
id := _id.String() id := _id.String()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
...@@ -199,9 +191,6 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { ...@@ -199,9 +191,6 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// When switching DA type, the channelManager state will be rebuilt // When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig. // with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
channel, err := s.getReadyChannel(l1Head) channel, err := s.getReadyChannel(l1Head)
if err != nil { if err != nil {
return emptyTxData, err return emptyTxData, err
...@@ -463,8 +452,8 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo ...@@ -463,8 +452,8 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo
var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager") var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")
// pruneSafeBlocks dequeues the provided number of blocks from the internal blocks queue // PruneSafeBlocks dequeues the provided number of blocks from the internal blocks queue
func (s *channelManager) pruneSafeBlocks(num int) { func (s *channelManager) PruneSafeBlocks(num int) {
_, ok := s.blocks.DequeueN(int(num)) _, ok := s.blocks.DequeueN(int(num))
if !ok { if !ok {
panic("tried to prune more blocks than available") panic("tried to prune more blocks than available")
...@@ -475,8 +464,8 @@ func (s *channelManager) pruneSafeBlocks(num int) { ...@@ -475,8 +464,8 @@ func (s *channelManager) pruneSafeBlocks(num int) {
} }
} }
// pruneChannels dequeues the provided number of channels from the internal channels queue // PruneChannels dequeues the provided number of channels from the internal channels queue
func (s *channelManager) pruneChannels(num int) { func (s *channelManager) PruneChannels(num int) {
clearCurrentChannel := false clearCurrentChannel := false
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
if s.channelQueue[i] == s.currentChannel { if s.channelQueue[i] == s.currentChannel {
......
...@@ -550,10 +550,10 @@ func TestChannelManager_PruneBlocks(t *testing.T) { ...@@ -550,10 +550,10 @@ func TestChannelManager_PruneBlocks(t *testing.T) {
m.blocks = tc.initialQ m.blocks = tc.initialQ
m.blockCursor = tc.initialBlockCursor m.blockCursor = tc.initialBlockCursor
if tc.expectedQ != nil { if tc.expectedQ != nil {
m.pruneSafeBlocks(tc.numChannelsToPrune) m.PruneSafeBlocks(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.blocks) require.Equal(t, tc.expectedQ, m.blocks)
} else { } else {
require.Panics(t, func() { m.pruneSafeBlocks(tc.numChannelsToPrune) }) require.Panics(t, func() { m.PruneSafeBlocks(tc.numChannelsToPrune) })
} }
}) })
} }
...@@ -618,11 +618,11 @@ func TestChannelManager_PruneChannels(t *testing.T) { ...@@ -618,11 +618,11 @@ func TestChannelManager_PruneChannels(t *testing.T) {
m.channelQueue = tc.initialQ m.channelQueue = tc.initialQ
m.currentChannel = tc.initialCurrentChannel m.currentChannel = tc.initialCurrentChannel
if tc.expectedQ != nil { if tc.expectedQ != nil {
m.pruneChannels(tc.numChannelsToPrune) m.PruneChannels(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.channelQueue) require.Equal(t, tc.expectedQ, m.channelQueue)
require.Equal(t, tc.expectedCurrentChannel, m.currentChannel) require.Equal(t, tc.expectedCurrentChannel, m.currentChannel)
} else { } else {
require.Panics(t, func() { m.pruneChannels(tc.numChannelsToPrune) }) require.Panics(t, func() { m.PruneChannels(tc.numChannelsToPrune) })
} }
}) })
} }
......
...@@ -413,8 +413,8 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc ...@@ -413,8 +413,8 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc
if syncActions.clearState != nil { if syncActions.clearState != nil {
l.channelMgr.Clear(*syncActions.clearState) l.channelMgr.Clear(*syncActions.clearState)
} else { } else {
l.channelMgr.pruneSafeBlocks(syncActions.blocksToPrune) l.channelMgr.PruneSafeBlocks(syncActions.blocksToPrune)
l.channelMgr.pruneChannels(syncActions.channelsToPrune) l.channelMgr.PruneChannels(syncActions.channelsToPrune)
} }
return syncActions.blocksToLoad return syncActions.blocksToLoad
} }
...@@ -892,6 +892,8 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) { ...@@ -892,6 +892,8 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
} }
func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) { func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
if err != nil { if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...) l.Log.Warn("DA request failed", logFields(id, err)...)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment