Commit e74c7d91 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

TXManager: add IsClosed to TxMgr and use check in BatchSubmitter (#9470)

* add IsClosed to TxMgr and use check in BatchSubmitter

* add generated mock

* move isClosed above publish in sending loop

* more IsClosed
parent 176478d7
...@@ -280,6 +280,12 @@ func (l *BatchSubmitter) loop() { ...@@ -280,6 +280,12 @@ func (l *BatchSubmitter) loop() {
case r := <-receiptsCh: case r := <-receiptsCh:
l.handleReceipt(r) l.handleReceipt(r)
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
// if the txmgr is closed, we stop the transaction sending
// don't even bother draining the queue, as all sending will fail
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
return
}
// This removes any never-submitted pending channels, so these do not have to be drained with transactions. // This removes any never-submitted pending channels, so these do not have to be drained with transactions.
// Any remaining unfinished channel is terminated, so its data gets submitted. // Any remaining unfinished channel is terminated, so its data gets submitted.
err := l.state.Close() err := l.state.Close()
...@@ -304,13 +310,19 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh ...@@ -304,13 +310,19 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks // send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
go func() { go func() {
defer func() { defer func() {
if drain { // if draining, we wait for all transactions to complete
// if draining, we wait for all transactions to complete // if the txmgr is closed, there is no need to wait as all transactions will fail
if drain && !l.Txmgr.IsClosed() {
queue.Wait() queue.Wait()
} }
close(txDone) close(txDone)
}() }()
for { for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, no further receipts expected")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh) err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil { if err != nil {
if drain && err != io.EOF { if drain && err != io.EOF {
......
...@@ -93,6 +93,10 @@ type stubTxMgr struct { ...@@ -93,6 +93,10 @@ type stubTxMgr struct {
sending map[byte]chan *types.Receipt sending map[byte]chan *types.Receipt
} }
func (s *stubTxMgr) IsClosed() bool {
return false
}
func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) { func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) {
ch := s.recordTx(candidate) ch := s.recordTx(candidate)
return <-ch, nil return <-ch, nil
......
...@@ -68,6 +68,10 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt, ...@@ -68,6 +68,10 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt,
func (f fakeTxMgr) Close() { func (f fakeTxMgr) Close() {
} }
func (f fakeTxMgr) IsClosed() bool {
return false
}
func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer { func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer {
proposerConfig := proposer.ProposerConfig{ proposerConfig := proposer.ProposerConfig{
PollInterval: time.Second, PollInterval: time.Second,
......
...@@ -64,6 +64,20 @@ func (_m *TxManager) From() common.Address { ...@@ -64,6 +64,20 @@ func (_m *TxManager) From() common.Address {
return r0 return r0
} }
// IsClosed provides a mock function with given fields:
func (_m *TxManager) IsClosed() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// Send provides a mock function with given fields: ctx, candidate // Send provides a mock function with given fields: ctx, candidate
func (_m *TxManager) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) { func (_m *TxManager) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) {
ret := _m.Called(ctx, candidate) ret := _m.Called(ctx, candidate)
......
...@@ -74,6 +74,7 @@ type TxManager interface { ...@@ -74,6 +74,7 @@ type TxManager interface {
// Close the underlying connection // Close the underlying connection
Close() Close()
IsClosed() bool
} }
// ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine // ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine
...@@ -795,6 +796,11 @@ func (m *SimpleTxManager) checkBlobFeeLimits(blobBaseFee, bumpedBlobFee *big.Int ...@@ -795,6 +796,11 @@ func (m *SimpleTxManager) checkBlobFeeLimits(blobBaseFee, bumpedBlobFee *big.Int
return nil return nil
} }
// IsClosed returns true if the tx manager is closed.
func (m *SimpleTxManager) IsClosed() bool {
return m.closed.Load()
}
// calcThresholdValue returns ceil(x * priceBumpPercent / 100) for non-blob txs, or // calcThresholdValue returns ceil(x * priceBumpPercent / 100) for non-blob txs, or
// ceil(x * blobPriceBumpPercent / 100) for blob txs. // ceil(x * blobPriceBumpPercent / 100) for blob txs.
// It guarantees that x is increased by at least 1 // It guarantees that x is increased by at least 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment