Commit b3ec4d69 authored by George Knee's avatar George Knee Committed by GitHub

batcher refactor: flatten out go routines (#12405)

* flatten out batcher goroutines

* move wg increment to parent fn

* ensure mainloop closes before receipts loop

* add comments

* pass a context to both loops

* remove debug lines

* clean up mutex handling in receiptsLoop

* don't need to set default value the first time
* avoid writing to Logger while holding mutex

* typo

* increase log level and include tx.ID

* fix changes from merge, hoist throttling loop goroutine launch to driver

* call done on waitgroup in throttlingLoop

* move function around

hoping this makes the diff nicer
parent e8de7b19
......@@ -111,7 +111,7 @@ type BatchSubmitter struct {
running bool
txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob
txpoolState int
txpoolState TxPoolState
txpoolBlockedBlob bool
// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
......@@ -160,8 +160,20 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
}
}
l.wg.Add(1)
go l.loop()
receiptsCh := make(chan txmgr.TxReceipt[txRef])
receiptsLoopCtx, cancelReceiptsLoopCtx := context.WithCancel(context.Background())
throttlingLoopCtx, cancelThrottlingLoopCtx := context.WithCancel(context.Background())
// DA throttling loop should always be started except for testing (indicated by ThrottleInterval == 0)
if l.Config.ThrottleInterval > 0 {
l.wg.Add(1)
go l.throttlingLoop(throttlingLoopCtx)
} else {
l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-interval. This should not be disabled in prod.")
}
l.wg.Add(2)
go l.processReceiptsLoop(receiptsLoopCtx, receiptsCh) // receives from receiptsCh
go l.mainLoop(l.shutdownCtx, receiptsCh, cancelReceiptsLoopCtx, cancelThrottlingLoopCtx) // sends on receiptsCh
l.Log.Info("Batch Submitter started")
return nil
......@@ -390,6 +402,8 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus)
// Submitted batch, but it is not valid
// Missed L2 block somehow.
type TxPoolState int
const (
// Txpool states. Possible state transitions:
// TxpoolGood -> TxpoolBlocked:
......@@ -399,13 +413,29 @@ const (
// send a cancellation transaction.
// TxpoolCancelPending -> TxpoolGood:
// happens once the cancel transaction completes, whether successfully or in error.
TxpoolGood int = iota
TxpoolGood TxPoolState = iota
TxpoolBlocked
TxpoolCancelPending
)
func (l *BatchSubmitter) loop() {
// setTxPoolState locks the mutex, sets the parameters to the supplied ones, and release the mutex.
func (l *BatchSubmitter) setTxPoolState(txPoolState TxPoolState, txPoolBlockedBlob bool) {
l.txpoolMutex.Lock()
l.txpoolState = txPoolState
l.txpoolBlockedBlob = txPoolBlockedBlob
l.txpoolMutex.Unlock()
}
// mainLoop periodically:
// - polls the sequencer,
// - prunes the channel manager state (i.e. safe blocks)
// - loads unsafe blocks from the sequencer
// - drives the creation of channels and frames
// - sends transactions to the DA layer
func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopCancel, throttlingLoopCancel context.CancelFunc) {
defer l.wg.Done()
defer receiptsLoopCancel()
defer throttlingLoopCancel()
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
daGroup := &errgroup.Group{}
......@@ -419,22 +449,8 @@ func (l *BatchSubmitter) loop() {
l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock()
// start the receipt/result processing loop
receiptsLoopDone := make(chan struct{})
defer close(receiptsLoopDone) // shut down receipt loop
l.l2BlockAdded = make(chan struct{})
defer close(l.l2BlockAdded)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
go l.processReceiptsLoop(receiptsCh, receiptsLoopDone)
// DA throttling loop should always be started except for testing (indicated by ThrottleInterval == 0)
if l.Config.ThrottleInterval > 0 {
throttlingLoopDone := make(chan struct{})
defer close(throttlingLoopDone)
go l.throttlingLoop(throttlingLoopDone)
} else {
l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-interval. This should not be disabled in prod.")
}
ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()
......@@ -467,34 +483,33 @@ func (l *BatchSubmitter) loop() {
continue
}
l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-l.shutdownCtx.Done():
case <-ctx.Done():
l.Log.Warn("main loop returning")
return
}
}
}
func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txRef], receiptsLoopDone chan struct{}) {
// processReceiptsLoop handles transaction receipts from the DA layer
func (l *BatchSubmitter) processReceiptsLoop(ctx context.Context, receiptsCh chan txmgr.TxReceipt[txRef]) {
defer l.wg.Done()
l.Log.Info("Starting receipts processing loop")
for {
select {
case r := <-receiptsCh:
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
l.setTxPoolState(TxpoolBlocked, r.ID.isBlob)
l.Log.Warn("incompatible tx in txpool", "id", r.ID, "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.setTxPoolState(TxpoolGood, l.txpoolBlockedBlob)
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptsLoopDone:
l.Log.Info("Receipts processing loop done")
case <-ctx.Done():
l.Log.Info("Receipt processing loop done")
return
}
}
......@@ -504,7 +519,8 @@ func (l *BatchSubmitter) processReceiptsLoop(receiptsCh chan txmgr.TxReceipt[txR
// throttling of incoming data prevent the backlog from growing too large. By looping & calling the miner API setter
// continuously, we ensure the engine currently in use is always going to be reset to the proper throttling settings
// even in the event of sequencer failover.
func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) {
func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
defer l.wg.Done()
l.Log.Info("Starting DA throttling loop")
ticker := time.NewTicker(l.Config.ThrottleInterval)
defer ticker.Stop()
......@@ -556,7 +572,7 @@ func (l *BatchSubmitter) throttlingLoop(throttlingLoopDone chan struct{}) {
updateParams()
case <-ticker.C:
updateParams()
case <-throttlingLoopDone:
case <-ctx.Done():
l.Log.Info("DA throttling loop done")
return
}
......
......@@ -50,7 +50,7 @@ When an L2 unsafe reorg is detected, the batch submitter will reset its state, a
When a Tx fails, an asynchronous receipts handler is triggered. The channel from whence the Tx's frames came has its `frameCursor` rewound, so that all the frames can be resubmitted in order.
### Channel Times Out
When at Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer.
When a Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer.
## Design Principles and Optimization Targets
At the current time, the batcher should be optimized for correctness, simplicity and robustness. It is considered preferable to prioritize these properties, even at the expense of other potentially desirable properties such as frugality. For example, it is preferable to have the batcher resubmit some data from time to time ("wasting" money on data availability costs) instead of avoiding that by e.g. adding some persistent state to the batcher.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment