Commit 7373ce76 authored by Roberto Bayardo's avatar Roberto Bayardo Committed by GitHub

- make state publishing loop abort if the txpool state is not good (#11633)

- protect txpool state vars with a mutex so they can be automically updated to avoid potential race
  condition
parent 323e688d
......@@ -8,7 +8,6 @@ import (
"math/big"
_ "net/http/pprof"
"sync"
"sync/atomic"
"time"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
......@@ -83,6 +82,10 @@ type BatchSubmitter struct {
mutex sync.Mutex
running bool
txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob
txpoolState int
txpoolBlockedBlob bool
// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef
......@@ -289,7 +292,7 @@ const (
// send a cancellation transaction.
// TxpoolCancelPending -> TxpoolGood:
// happens once the cancel transaction completes, whether successfully or in error.
TxpoolGood int32 = iota
TxpoolGood int = iota
TxpoolBlocked
TxpoolCancelPending
)
......@@ -304,23 +307,25 @@ func (l *BatchSubmitter) loop() {
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop
var (
txpoolState atomic.Int32
txpoolBlockedBlob bool
)
txpoolState.Store(TxpoolGood)
l.txpoolMutex.Lock()
l.txpoolState = TxpoolGood
l.txpoolMutex.Unlock()
go func() {
for {
select {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
l.txpoolMutex.Lock()
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
l.txpoolState = TxpoolBlocked
l.txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool", "is_blob", r.ID.isBlob)
} else if r.ID.isCancel && l.txpoolState == TxpoolCancelPending {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.txpoolState = TxpoolGood
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.txpoolMutex.Unlock()
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
......@@ -345,13 +350,7 @@ func (l *BatchSubmitter) loop() {
for {
select {
case <-ticker.C:
if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob)
}
if txpoolState.Load() != TxpoolGood {
if !l.checkTxpool(queue, receiptsCh) {
continue
}
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
......@@ -433,7 +432,12 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
if !l.checkTxpool(queue, receiptsCh) {
l.Log.Info("txpool state is not good, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
......@@ -545,10 +549,11 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
panic(err) // this error should not happen
}
l.Log.Warn("sending a cancellation transaction to unblock txpool", "blocked_blob", isBlockedBlob)
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
l.sendTx(txData{}, true, candidate, queue, receiptsCh)
}
// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// This call will block if the txmgr queue is at the max-pending limit.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
......@@ -585,11 +590,13 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
candidate = l.calldataTxCandidate(data)
}
l.queueTx(txdata, false, candidate, queue, receiptsCh)
l.sendTx(txdata, false, candidate, queue, receiptsCh)
return nil
}
func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
// sendTx uses the txmgr queue to send the given transaction candidate after setting its
// gaslimit. It will block if the txmgr queue has reached its MaxPendingTransactions limit.
func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
if err != nil {
// we log instead of return an error here because txmgr can do its own gas estimation
......@@ -665,6 +672,23 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil
}
func (l *BatchSubmitter) checkTxpool(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) bool {
l.txpoolMutex.Lock()
if l.txpoolState == TxpoolBlocked {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.txpoolState = TxpoolCancelPending
isBlob := l.txpoolBlockedBlob
l.txpoolMutex.Unlock()
l.cancelBlockingTx(queue, receiptsCh, isBlob)
return false
}
r := l.txpoolState == TxpoolGood
l.txpoolMutex.Unlock()
return r
}
func logFields(xs ...any) (fs []any) {
for _, x := range xs {
switch v := x.(type) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment