Commit 3d632f70 authored by Michael de Hoog's avatar Michael de Hoog

Remove concurrent resets from txmgr

parent 6a2d3381
...@@ -4,8 +4,6 @@ import ( ...@@ -4,8 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync/atomic"
"math/big" "math/big"
"strings" "strings"
"sync" "sync"
...@@ -89,11 +87,7 @@ type SimpleTxManager struct { ...@@ -89,11 +87,7 @@ type SimpleTxManager struct {
metr metrics.TxMetricer metr metrics.TxMetricer
nonce *uint64 nonce *uint64
lock sync.RWMutex nonceLock sync.RWMutex
wg sync.WaitGroup
resetC chan struct{}
resetWG sync.WaitGroup
resetting atomic.Bool
} }
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
...@@ -110,7 +104,6 @@ func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLI ...@@ -110,7 +104,6 @@ func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLI
backend: conf.Backend, backend: conf.Backend,
l: l.New("service", name), l: l.New("service", name),
metr: m, metr: m,
resetC: make(chan struct{}),
}, nil }, nil
} }
...@@ -139,50 +132,15 @@ type TxCandidate struct { ...@@ -139,50 +132,15 @@ type TxCandidate struct {
// //
// NOTE: Send can be called concurrently, the nonce will be managed internally. // NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.resetWG.Wait() // wait for any current resets
receipt, err := m.send(ctx, candidate) receipt, err := m.send(ctx, candidate)
if err != nil { if err != nil {
m.reset() m.resetNonce()
} }
return receipt, err return receipt, err
} }
// reset the transaction manager. All currently pending transactions will receive
// a ResetErr error. This is called if any pending send returns an error.
func (m *SimpleTxManager) reset() {
m.resetWG.Add(1)
defer m.resetWG.Done()
if m.resetting.Swap(true) {
// already resetting
return
}
close(m.getResetChannel())
m.wg.Wait()
m.lock.Lock()
defer m.lock.Unlock()
m.nonce = nil
m.resetC = make(chan struct{})
m.resetting.Store(false)
}
// getResetChannel is a thread-safe getter for the channel that is closed upon
// transaction manager resets.
func (m *SimpleTxManager) getResetChannel() chan struct{} {
m.lock.RLock()
defer m.lock.RUnlock()
return m.resetC
}
// send performs the actual transaction creation and sending. // send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.wg.Add(1)
defer m.wg.Done()
if m.cfg.TxSendTimeout != 0 { if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout) ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
...@@ -252,8 +210,8 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -252,8 +210,8 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
// increment this number. If the transaction manager is reset, it will query the // increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again. // eth_getTransactionCount nonce again.
func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) { func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
m.lock.Lock() m.nonceLock.Lock()
defer m.lock.Unlock() defer m.nonceLock.Unlock()
if m.nonce == nil { if m.nonce == nil {
// Fetch the sender's nonce from the latest known block (nil `blockNumber`) // Fetch the sender's nonce from the latest known block (nil `blockNumber`)
...@@ -273,6 +231,14 @@ func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) { ...@@ -273,6 +231,14 @@ func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
return *m.nonce, nil return *m.nonce, nil
} }
// resetNonce resets the internal nonce tracking. This is called if any pending send
// returns an error.
func (m *SimpleTxManager) resetNonce() {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
m.nonce = nil
}
// send submits the same transaction several times with increasing gas prices as necessary. // send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain. // It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
...@@ -294,7 +260,6 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -294,7 +260,6 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
ticker := time.NewTicker(m.cfg.ResubmissionTimeout) ticker := time.NewTicker(m.cfg.ResubmissionTimeout)
defer ticker.Stop() defer ticker.Stop()
resetChan := m.getResetChannel()
bumpCounter := 0 bumpCounter := 0
for { for {
...@@ -318,9 +283,6 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -318,9 +283,6 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
case <-resetChan:
return nil, ResetErr
case receipt := <-receiptChan: case receipt := <-receiptChan:
m.metr.RecordGasBumpCount(bumpCounter) m.metr.RecordGasBumpCount(bumpCounter)
m.metr.TxConfirmed(receipt) m.metr.TxConfirmed(receipt)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment