Commit 743e782b authored by George Knee's avatar George Knee Committed by GitHub

op-batcher: lock state mutex while computing sync actions (#13330)

* lock state mutex while  computing sync actions

* rename batcher.state to batcher.channelManager

Closes https://github.com/ethereum-optimism/optimism/issues/13280

* move channel manager mutex up into driver

encapsulate imperative code and mutex locking/unlocking into new method executeSyncActions

* don't use pointer to mutex

* rename

* throttlingLoop uses channelMgrMutex

* unblock throttling loop by sending pendingBytes on a channel

* disable proposer in 4844 system test

it mostly adds noise to the logs

* remove unused ticker

* Revert "remove unused ticker"

This reverts commit b2e9762c5e6e92e8b62771439d864d9985dcc943.

* reinstate throttle on tick

used cached value

* make the main loop release the chMgr lock more often

* simplify

* add some more mutex calls

* simplify some more

* push mutex locking down into l.publishTxToL1

* do not signal the throttling loop inside publishTxToL1

This is a change in behaviour that is outside the scope of this PR.

* prefer deferring mutex unlock
parent 2932a181
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -29,7 +28,6 @@ type ChannelOutFactory func(cfg ChannelConfig, rollupCfg *rollup.Config) (derive ...@@ -29,7 +28,6 @@ type ChannelOutFactory func(cfg ChannelConfig, rollupCfg *rollup.Config) (derive
// channel. // channel.
// Public functions on channelManager are safe for concurrent access. // Public functions on channelManager are safe for concurrent access.
type channelManager struct { type channelManager struct {
mu sync.Mutex
log log.Logger log log.Logger
metr metrics.Metricer metr metrics.Metricer
cfgProvider ChannelConfigProvider cfgProvider ChannelConfigProvider
...@@ -78,8 +76,6 @@ func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) { ...@@ -78,8 +76,6 @@ func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) {
// Clear clears the entire state of the channel manager. // Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg. // It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) { func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
s.blocks.Clear() s.blocks.Clear()
s.blockCursor = 0 s.blockCursor = 0
...@@ -97,8 +93,6 @@ func (s *channelManager) pendingBlocks() int { ...@@ -97,8 +93,6 @@ func (s *channelManager) pendingBlocks() int {
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction. // in the failed transaction.
func (s *channelManager) TxFailed(_id txID) { func (s *channelManager) TxFailed(_id txID) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String() id := _id.String()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id) delete(s.txChannels, id)
...@@ -111,8 +105,7 @@ func (s *channelManager) TxFailed(_id txID) { ...@@ -111,8 +105,7 @@ func (s *channelManager) TxFailed(_id txID) {
// TxConfirmed marks a transaction as confirmed on L1. Only if the channel timed out // TxConfirmed marks a transaction as confirmed on L1. Only if the channel timed out
// the channelManager's state is modified. // the channelManager's state is modified.
func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) { func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String() id := _id.String()
if channel, ok := s.txChannels[id]; ok { if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id) delete(s.txChannels, id)
...@@ -155,7 +148,7 @@ func (s *channelManager) handleChannelInvalidated(c *channel) { ...@@ -155,7 +148,7 @@ func (s *channelManager) handleChannelInvalidated(c *channel) {
} }
s.rewindToBlock(blockID) s.rewindToBlock(blockID)
} else { } else {
s.log.Debug("channelManager.handleChanneInvalidated: channel had no blocks") s.log.Debug("channelManager.handleChannelInvalidated: channel had no blocks")
} }
// Trim provided channel and any older channels: // Trim provided channel and any older channels:
...@@ -198,8 +191,6 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { ...@@ -198,8 +191,6 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// When switching DA type, the channelManager state will be rebuilt // When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig. // with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
channel, err := s.getReadyChannel(l1Head) channel, err := s.getReadyChannel(l1Head)
if err != nil { if err != nil {
return emptyTxData, err return emptyTxData, err
...@@ -437,9 +428,6 @@ func (s *channelManager) outputFrames() error { ...@@ -437,9 +428,6 @@ func (s *channelManager) outputFrames() error {
// if the block does not extend the last block loaded into the state. If no // if the block does not extend the last block loaded into the state. If no
// blocks were added yet, the parent hash check is skipped. // blocks were added yet, the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error { func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg return ErrReorg
} }
......
...@@ -105,7 +105,7 @@ type BatchSubmitter struct { ...@@ -105,7 +105,7 @@ type BatchSubmitter struct {
killCtx context.Context killCtx context.Context
cancelKillCtx context.CancelFunc cancelKillCtx context.CancelFunc
l2BlockAdded chan struct{} // notifies the throttling loop whenever an l2 block is added pendingBytesUpdated chan int64 // notifies the throttling with the new pending bytes
mutex sync.Mutex mutex sync.Mutex
running bool running bool
...@@ -114,7 +114,8 @@ type BatchSubmitter struct { ...@@ -114,7 +114,8 @@ type BatchSubmitter struct {
txpoolState TxPoolState txpoolState TxPoolState
txpoolBlockedBlob bool txpoolBlockedBlob bool
state *channelManager channelMgrMutex sync.Mutex // guards channelMgr and prevCurrentL1
channelMgr *channelManager
prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus
} }
...@@ -126,7 +127,7 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { ...@@ -126,7 +127,7 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
} }
return &BatchSubmitter{ return &BatchSubmitter{
DriverSetup: setup, DriverSetup: setup,
state: state, channelMgr: state,
} }
} }
...@@ -293,13 +294,15 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -293,13 +294,15 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
return nil, fmt.Errorf("getting L2 block: %w", err) return nil, fmt.Errorf("getting L2 block: %w", err)
} }
if err := l.state.AddL2Block(block); err != nil { l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
if err := l.channelMgr.AddL2Block(block); err != nil {
return nil, fmt.Errorf("adding L2 block to state: %w", err) return nil, fmt.Errorf("adding L2 block to state: %w", err)
} }
// notify the throttling loop it may be time to initiate throttling without blocking // notify the throttling loop it may be time to initiate throttling without blocking
select { select {
case l.l2BlockAdded <- struct{}{}: case l.pendingBytesUpdated <- l.channelMgr.PendingDABytes():
default: default:
} }
...@@ -387,6 +390,35 @@ func (l *BatchSubmitter) setTxPoolState(txPoolState TxPoolState, txPoolBlockedBl ...@@ -387,6 +390,35 @@ func (l *BatchSubmitter) setTxPoolState(txPoolState TxPoolState, txPoolBlockedBl
l.txpoolMutex.Unlock() l.txpoolMutex.Unlock()
} }
// syncAndPrune computes actions to take based on the current sync status, prunes the channel manager state
// and returns blocks to load.
func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBlockRange {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
// Decide appropriate actions
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log)
if outOfSync {
// If the sequencer is out of sync
// do nothing and wait to see if it has
// got in sync on the next tick.
l.Log.Warn("Sequencer is out of sync, retrying next tick.")
return syncActions.blocksToLoad
}
l.prevCurrentL1 = syncStatus.CurrentL1
// Manage existing state / garbage collection
if syncActions.clearState != nil {
l.channelMgr.Clear(*syncActions.clearState)
} else {
l.channelMgr.pruneSafeBlocks(syncActions.blocksToPrune)
l.channelMgr.pruneChannels(syncActions.channelsToPrune)
}
return syncActions.blocksToLoad
}
// mainLoop periodically: // mainLoop periodically:
// - polls the sequencer, // - polls the sequencer,
// - prunes the channel manager state (i.e. safe blocks) // - prunes the channel manager state (i.e. safe blocks)
...@@ -410,8 +442,8 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR ...@@ -410,8 +442,8 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR
l.txpoolState = TxpoolGood l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock() l.txpoolMutex.Unlock()
l.l2BlockAdded = make(chan struct{}) l.pendingBytesUpdated = make(chan int64)
defer close(l.l2BlockAdded) defer close(l.pendingBytesUpdated)
ticker := time.NewTicker(l.Config.PollInterval) ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop() defer ticker.Stop()
...@@ -430,30 +462,11 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR ...@@ -430,30 +462,11 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR
continue continue
} }
// Decide appropriate actions blocksToLoad := l.syncAndPrune(syncStatus)
syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.state.blocks, l.state.channelQueue, l.Log)
if outOfSync { if blocksToLoad != nil {
// If the sequencer is out of sync
// do nothing and wait to see if it has
// got in sync on the next tick.
l.Log.Warn("Sequencer is out of sync, retrying next tick.")
continue
}
l.prevCurrentL1 = syncStatus.CurrentL1
// Manage existing state / garbage collection
if syncActions.clearState != nil {
l.state.Clear(*syncActions.clearState)
} else {
l.state.pruneSafeBlocks(syncActions.blocksToPrune)
l.state.pruneChannels(syncActions.channelsToPrune)
}
if syncActions.blocksToLoad != nil {
// Get fresh unsafe blocks // Get fresh unsafe blocks
if err := l.loadBlocksIntoState(l.shutdownCtx, syncActions.blocksToLoad.start, syncActions.blocksToLoad.end); errors.Is(err, ErrReorg) { if err := l.loadBlocksIntoState(l.shutdownCtx, blocksToLoad.start, blocksToLoad.end); errors.Is(err, ErrReorg) {
l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err) l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState() l.waitNodeSyncAndClearState()
continue continue
...@@ -461,6 +474,7 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR ...@@ -461,6 +474,7 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR
} }
l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-ctx.Done(): case <-ctx.Done():
if err := queue.Wait(); err != nil { if err := queue.Wait(); err != nil {
l.Log.Error("error waiting for transactions to complete", "err", err) l.Log.Error("error waiting for transactions to complete", "err", err)
...@@ -506,7 +520,7 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) { ...@@ -506,7 +520,7 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
ticker := time.NewTicker(l.Config.ThrottleInterval) ticker := time.NewTicker(l.Config.ThrottleInterval)
defer ticker.Stop() defer ticker.Stop()
updateParams := func() { updateParams := func(pendingBytes int64) {
ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout) ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout)
defer cancel() defer cancel()
cl, err := l.EndpointProvider.EthClient(ctx) cl, err := l.EndpointProvider.EthClient(ctx)
...@@ -514,7 +528,7 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) { ...@@ -514,7 +528,7 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
l.Log.Error("Can't reach sequencer execution RPC", "err", err) l.Log.Error("Can't reach sequencer execution RPC", "err", err)
return return
} }
pendingBytes := l.state.PendingDABytes()
maxTxSize := uint64(0) maxTxSize := uint64(0)
maxBlockSize := l.Config.ThrottleAlwaysBlockSize maxBlockSize := l.Config.ThrottleAlwaysBlockSize
if pendingBytes > int64(l.Config.ThrottleThreshold) { if pendingBytes > int64(l.Config.ThrottleThreshold) {
...@@ -550,12 +564,14 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) { ...@@ -550,12 +564,14 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
} }
} }
cachedPendingBytes := int64(0)
for { for {
select { select {
case <-l.l2BlockAdded:
updateParams()
case <-ticker.C: case <-ticker.C:
updateParams() updateParams(int64(cachedPendingBytes))
case pendingBytes := <-l.pendingBytesUpdated:
cachedPendingBytes = pendingBytes
updateParams(pendingBytes)
case <-ctx.Done(): case <-ctx.Done():
l.Log.Info("DA throttling loop done") l.Log.Info("DA throttling loop done")
return return
...@@ -621,7 +637,9 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh ...@@ -621,7 +637,9 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh
l.Log.Info("txpool state is not good, aborting state publishing") l.Log.Info("txpool state is not good, aborting state publishing")
return return
} }
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daGroup) err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daGroup)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err) l.Log.Error("Error publishing tx to l1", "err", err)
...@@ -647,7 +665,9 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { ...@@ -647,7 +665,9 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
return false return false
} else { } else {
l.Log.Info("Clearing state with safe L1 origin", "origin", l1SafeOrigin) l.Log.Info("Clearing state with safe L1 origin", "origin", l1SafeOrigin)
l.state.Clear(l1SafeOrigin) l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
l.channelMgr.Clear(l1SafeOrigin)
return true return true
} }
} }
...@@ -668,7 +688,9 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { ...@@ -668,7 +688,9 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
} }
case <-ctx.Done(): case <-ctx.Done():
l.Log.Warn("Clearing state cancelled") l.Log.Warn("Clearing state cancelled")
l.state.Clear(eth.BlockID{}) l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
l.channelMgr.Clear(eth.BlockID{})
return return
} }
} }
...@@ -676,6 +698,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { ...@@ -676,6 +698,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
// publishTxToL1 submits a single state tx to the L1 // publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
// send all available transactions // send all available transactions
l1tip, err := l.l1Tip(ctx) l1tip, err := l.l1Tip(ctx)
if err != nil { if err != nil {
...@@ -686,7 +709,9 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t ...@@ -686,7 +709,9 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
// Collect next transaction data. This pulls data out of the channel, so we need to make sure // Collect next transaction data. This pulls data out of the channel, so we need to make sure
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx. // to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
txdata, err := l.state.TxData(l1tip.ID()) l.channelMgrMutex.Lock()
txdata, err := l.channelMgr.TxData(l1tip.ID())
l.channelMgrMutex.Unlock()
if err == io.EOF { if err == io.EOF {
l.Log.Trace("No transaction data available") l.Log.Trace("No transaction data available")
...@@ -870,18 +895,22 @@ func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) { ...@@ -870,18 +895,22 @@ func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
if err != nil { if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...) l.Log.Warn("DA request failed", logFields(id, err)...)
} }
l.state.TxFailed(id) l.channelMgr.TxFailed(id)
} }
func (l *BatchSubmitter) recordFailedTx(id txID, err error) { func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
l.Log.Warn("Transaction failed to send", logFields(id, err)...) l.Log.Warn("Transaction failed to send", logFields(id, err)...)
l.state.TxFailed(id) l.channelMgr.TxFailed(id)
} }
func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) { func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
l.Log.Info("Transaction confirmed", logFields(id, receipt)...) l.Log.Info("Transaction confirmed", logFields(id, receipt)...)
l1block := eth.ReceiptBlockID(receipt) l1block := eth.ReceiptBlockID(receipt)
l.state.TxConfirmed(id, l1block) l.channelMgr.TxConfirmed(id, l1block)
} }
// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed // l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
......
...@@ -476,6 +476,6 @@ func (bs *BatcherService) ThrottlingTestDriver() *TestBatchSubmitter { ...@@ -476,6 +476,6 @@ func (bs *BatcherService) ThrottlingTestDriver() *TestBatchSubmitter {
tbs := &TestBatchSubmitter{ tbs := &TestBatchSubmitter{
BatchSubmitter: bs.driver, BatchSubmitter: bs.driver,
} }
tbs.BatchSubmitter.state.metr = new(metrics.ThrottlingMetrics) tbs.BatchSubmitter.channelMgr.metr = new(metrics.ThrottlingMetrics)
return tbs return tbs
} }
...@@ -27,7 +27,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error { ...@@ -27,7 +27,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error {
} }
var candidate *txmgr.TxCandidate var candidate *txmgr.TxCandidate
var err error var err error
cc := l.state.cfgProvider.ChannelConfig() cc := l.channelMgr.cfgProvider.ChannelConfig()
if cc.UseBlobs { if cc.UseBlobs {
candidate = l.calldataTxCandidate([]byte{}) candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
......
...@@ -92,6 +92,7 @@ func testSystem4844E2E(t *testing.T, multiBlob bool, daType batcherFlags.DataAva ...@@ -92,6 +92,7 @@ func testSystem4844E2E(t *testing.T, multiBlob bool, daType batcherFlags.DataAva
} }
}() }()
cfg.DisableProposer = true // disable L2 output submission for this test
sys, err := cfg.Start(t, action) sys, err := cfg.Start(t, action)
require.NoError(t, err, "Error starting up system") require.NoError(t, err, "Error starting up system")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment