Commit 8dc16176 authored by Michael de Hoog's avatar Michael de Hoog

Code review fixes

parent 753c36ca
...@@ -294,13 +294,22 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -294,13 +294,22 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
sendState := NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout) sendState := NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout)
receiptChan := make(chan *types.Receipt, 1) receiptChan := make(chan *types.Receipt, 1)
sendTx := func(tx *types.Transaction, bumpFees bool) *types.Transaction { publishAndWait := func(tx *types.Transaction, bumpFees bool) *types.Transaction {
return m.publishAndWaitForTx(ctx, tx, sendState, receiptChan, bumpFees, wg.Done) tx, published := m.publishTx(ctx, tx, sendState, bumpFees)
if published {
go func() {
defer wg.Done()
m.waitForTx(ctx, tx, sendState, receiptChan)
}()
} else {
wg.Done()
}
return tx
} }
// Immediately publish a transaction before starting the resumbission loop // Immediately publish a transaction before starting the resumbission loop
wg.Add(1) wg.Add(1)
tx = sendTx(tx, false) tx = publishAndWait(tx, false)
ticker := time.NewTicker(m.cfg.ResubmissionTimeout) ticker := time.NewTicker(m.cfg.ResubmissionTimeout)
defer ticker.Stop() defer ticker.Stop()
...@@ -318,7 +327,7 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -318,7 +327,7 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
return nil, errors.New("aborted transaction sending") return nil, errors.New("aborted transaction sending")
} }
wg.Add(1) wg.Add(1)
tx = sendTx(tx, true) tx = publishAndWait(tx, true)
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
...@@ -331,43 +340,35 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -331,43 +340,35 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
} }
} }
// publishAndWaitForTx publishes the transaction to the transaction pool and then waits for it with [waitMined]. // publishTx publishes the transaction to the transaction pool. If it receives any underpriced errors
// It should be called in a new go-routine. It will send the receipt to receiptChan in a non-blocking way if a receipt is found // it will bump the fees and retry.
// for the transaction. // Returns the latest fee bumped tx, and a boolean indicating whether the tx was sent or not
func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Transaction, sendState *SendState, receiptChan chan *types.Receipt, bumpFees bool, done func()) *types.Transaction { func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, sendState *SendState, bumpFeesImmediately bool) (*types.Transaction, bool) {
waiting := false
defer func() {
if !waiting {
done()
}
}()
updateLogFields := func(tx *types.Transaction) log.Logger { updateLogFields := func(tx *types.Transaction) log.Logger {
return m.l.New("hash", tx.Hash(), "nonce", tx.Nonce(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) return m.l.New("hash", tx.Hash(), "nonce", tx.Nonce(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
} }
l := updateLogFields(tx) l := updateLogFields(tx)
l.Info("Publishing transaction") l.Info("Publishing transaction")
t := time.Now()
for { for {
if bumpFees { if bumpFeesImmediately {
newTx, err := m.increaseGasPrice(ctx, tx) newTx, err := m.increaseGasPrice(ctx, tx)
if err != nil { if err != nil {
l.Error("unable to increase gas", "err", err) l.Error("unable to increase gas", "err", err)
m.metr.TxPublished("bump_failed") m.metr.TxPublished("bump_failed")
return tx return tx, false
} }
tx = newTx tx = newTx
sendState.bumpCount++ sendState.bumpCount++
l = updateLogFields(tx) l = updateLogFields(tx)
} }
bumpFees = true // bump fees next loop bumpFeesImmediately = true // bump fees next loop
if sendState.IsWaitingForConfirmation() { if sendState.IsWaitingForConfirmation() {
// there is a chance the previous tx goes into "waiting for confirmation" state // there is a chance the previous tx goes into "waiting for confirmation" state
// during the increaseGasPrice call; continue waiting rather than resubmit the tx // during the increaseGasPrice call; continue waiting rather than resubmit the tx
return tx return tx, false
} }
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout) cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
...@@ -376,7 +377,9 @@ func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Tra ...@@ -376,7 +377,9 @@ func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Tra
sendState.ProcessSendError(err) sendState.ProcessSendError(err)
if err == nil { if err == nil {
break m.metr.TxPublished("")
log.Info("Transaction successfully published")
return tx, true
} }
switch { switch {
...@@ -405,15 +408,14 @@ func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Tra ...@@ -405,15 +408,14 @@ func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Tra
} }
// on non-underpriced error return immediately; will retry on next resubmission timeout // on non-underpriced error return immediately; will retry on next resubmission timeout
return tx return tx, false
} }
}
m.metr.TxPublished("") // waitForTx calls waitMined, and then sends the receipt to receiptChan in a non-blocking way if a receipt is found
log.Info("Transaction successfully published") // for the transaction. It should be called in a separate goroutine.
func (m *SimpleTxManager) waitForTx(ctx context.Context, tx *types.Transaction, sendState *SendState, receiptChan chan *types.Receipt) {
waiting = true t := time.Now()
go func() {
defer done()
// Poll for the transaction to be ready & then send the result to receiptChan // Poll for the transaction to be ready & then send the result to receiptChan
receipt, err := m.waitMined(ctx, tx, sendState) receipt, err := m.waitMined(ctx, tx, sendState)
if err != nil { if err != nil {
...@@ -426,9 +428,6 @@ func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Tra ...@@ -426,9 +428,6 @@ func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Tra
m.metr.RecordTxConfirmationLatency(time.Since(t).Milliseconds()) m.metr.RecordTxConfirmationLatency(time.Since(t).Milliseconds())
default: default:
} }
}()
return tx
} }
// waitMined waits for the transaction to be mined or for the context to be cancelled. // waitMined waits for the transaction to be mined or for the context to be cancelled.
...@@ -512,7 +511,7 @@ func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transa ...@@ -512,7 +511,7 @@ func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transa
} }
bumpedTip, bumpedFee := updateFees(tx.GasTipCap(), tx.GasFeeCap(), tip, basefee, m.l) bumpedTip, bumpedFee := updateFees(tx.GasTipCap(), tx.GasFeeCap(), tip, basefee, m.l)
// Make sure increase is at most 5x the suggested values // Make sure increase is at most [FeeLimitMultiplier] the suggested values
maxTip := new(big.Int).Mul(tip, big.NewInt(int64(m.cfg.FeeLimitMultiplier))) maxTip := new(big.Int).Mul(tip, big.NewInt(int64(m.cfg.FeeLimitMultiplier)))
if bumpedTip.Cmp(maxTip) > 0 { if bumpedTip.Cmp(maxTip) > 0 {
return nil, fmt.Errorf("bumped tip 0x%s is over %dx multiple of the suggested value", bumpedTip.Text(16), m.cfg.FeeLimitMultiplier) return nil, fmt.Errorf("bumped tip 0x%s is over %dx multiple of the suggested value", bumpedTip.Text(16), m.cfg.FeeLimitMultiplier)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment