Commit 87f50eb5 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

txmgr,op-batcher: Improve logging & metrics (#8766)

* txmgr,op-batcher: Improve logging & metrics

- Standardize field names
- Add more fields to tx logging
- Add metrics for basefee and tip cap
- Log frame id and transaction hash together

* txmgr: Remove goroutine use in queue test

Assertions should be made from the main test routine, if possible.
I had local test failures where assertions where made after the main
routine returned already.

Also, the existing test leaked goroutines for transactions that
weren't queued.

A waitgroup can be used here alternatively, but it's cleaner to just
assert in the main routine.
parent 059ab7fd
......@@ -241,7 +241,7 @@ func (s *channelManager) processBlocks() error {
} else if err != nil {
return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
}
s.log.Debug("Added block to channel", "id", s.currentChannel.ID(), "block", block)
s.log.Debug("Added block to channel", "id", s.currentChannel.ID(), "block", eth.ToBlockID(block))
blocksAdded += 1
latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
......@@ -359,7 +359,7 @@ func (s *channelManager) Close() error {
// Any pending state can be proactively cleared if there are no submitted transactions
for _, ch := range s.channelQueue {
if ch.NoneSubmitted() {
s.log.Info("Channel has no past or pending submission - dropping", "id", ch.ID(), "")
s.log.Info("Channel has no past or pending submission - dropping", "id", ch.ID())
s.removePendingChannel(ch)
} else {
s.log.Info("Channel is in-flight and will need to be submitted after close", "id", ch.ID(), "confirmed", len(ch.confirmedTransactions), "pending", len(ch.pendingTransactions))
......
......@@ -334,7 +334,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.Log.Error("Failed to query L1 tip", "error", err)
l.Log.Error("Failed to query L1 tip", "err", err)
return err
}
l.recordL1Tip(l1tip)
......@@ -361,7 +361,7 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txDat
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
l.Log.Error("Failed to calculate intrinsic gas", "error", err)
l.Log.Error("Failed to calculate intrinsic gas", "err", err)
return
}
......@@ -376,11 +376,9 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txDat
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
if r.Err != nil {
l.Log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.recordFailedTx(r.ID.ID(), r.Err)
l.recordFailedTx(r.ID, r.Err)
} else {
l.Log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
l.recordConfirmedTx(r.ID.ID(), r.Receipt)
l.recordConfirmedTx(r.ID, r.Receipt)
}
}
......@@ -392,15 +390,15 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
l.Metr.RecordLatestL1Block(l1tip)
}
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.Log.Warn("Failed to send transaction", "err", err)
l.state.TxFailed(id)
func (l *BatchSubmitter) recordFailedTx(txd txData, err error) {
l.Log.Warn("Transaction failed to send", logFields(txd, err)...)
l.state.TxFailed(txd.ID())
}
func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
l.Log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
l1block := eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash}
l.state.TxConfirmed(id, l1block)
func (l *BatchSubmitter) recordConfirmedTx(txd txData, receipt *types.Receipt) {
l.Log.Info("Transaction confirmed", logFields(txd, receipt)...)
l1block := eth.ReceiptBlockID(receipt)
l.state.TxConfirmed(txd.ID(), l1block)
}
// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
......@@ -414,3 +412,19 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
}
return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil
}
func logFields(xs ...any) (fs []any) {
for _, x := range xs {
switch v := x.(type) {
case txData:
fs = append(fs, "frame_id", v.ID(), "data_len", v.Len())
case *types.Receipt:
fs = append(fs, "tx", v.TxHash, "block", eth.ReceiptBlockID(v))
case error:
fs = append(fs, "err", v)
default:
fs = append(fs, "ERROR", fmt.Sprintf("logFields: unknown type: %T", x))
}
}
return fs
}
......@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type BlockID struct {
......@@ -21,6 +22,14 @@ func (id BlockID) TerminalString() string {
return fmt.Sprintf("%s:%d", id.Hash.TerminalString(), id.Number)
}
func ReceiptBlockID(r *types.Receipt) BlockID {
return BlockID{Number: r.BlockNumber.Uint64(), Hash: r.BlockHash}
}
func HeaderBlockID(h *types.Header) BlockID {
return BlockID{Number: h.Number.Uint64(), Hash: h.Hash()}
}
type L2BlockRef struct {
Hash common.Hash `json:"hash"`
Number uint64 `json:"number"`
......
package metrics
import "github.com/ethereum/go-ethereum/core/types"
import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
)
type NoopTxMetrics struct{}
......@@ -10,4 +14,6 @@ func (*NoopTxMetrics) RecordGasBumpCount(int) {}
func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {}
func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {}
func (*NoopTxMetrics) TxPublished(string) {}
func (*NoopTxMetrics) RecordBasefee(*big.Int) {}
func (*NoopTxMetrics) RecordTipCap(*big.Int) {}
func (*NoopTxMetrics) RPCError() {}
package metrics
import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
......@@ -15,6 +17,8 @@ type TxMetricer interface {
RecordPendingTx(pending int64)
TxConfirmed(*types.Receipt)
TxPublished(string)
RecordBasefee(*big.Int)
RecordTipCap(*big.Int)
RPCError()
}
......@@ -29,6 +33,8 @@ type TxMetrics struct {
txPublishError *prometheus.CounterVec
publishEvent *metrics.Event
confirmEvent metrics.EventVec
basefee prometheus.Gauge
tipCap prometheus.Gauge
rpcError prometheus.Counter
}
......@@ -98,6 +104,18 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
}, []string{"error"}),
confirmEvent: metrics.NewEventVec(factory, ns, "txmgr", "confirm", "tx confirm", []string{"status"}),
publishEvent: metrics.NewEvent(factory, ns, "txmgr", "publish", "tx publish"),
basefee: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "basefee_wei",
Help: "Latest L1 basefee (in Wei)",
Subsystem: "txmgr",
}),
tipCap: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "tipcap_wei",
Help: "Latest L1 suggested tip cap (in Wei)",
Subsystem: "txmgr",
}),
rpcError: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "rpc_error_count",
......@@ -122,7 +140,6 @@ func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) {
t.TxL1GasFee.Set(fee)
t.txFees.Add(fee)
t.txFeeHistogram.Observe(fee)
}
func (t *TxMetrics) RecordGasBumpCount(times int) {
......@@ -141,6 +158,16 @@ func (t *TxMetrics) TxPublished(errString string) {
}
}
func (t *TxMetrics) RecordBasefee(basefee *big.Int) {
bff, _ := basefee.Float64()
t.basefee.Set(bff)
}
func (t *TxMetrics) RecordTipCap(tipcap *big.Int) {
tcf, _ := tipcap.Float64()
t.tipCap.Set(tcf)
}
func (t *TxMetrics) RPCError() {
t.rpcError.Inc()
}
......@@ -10,7 +10,7 @@ import (
)
type TxReceipt[T any] struct {
// ID can be used to identify unique tx receipts within the recept channel
// ID can be used to identify unique tx receipts within the receipt channel
ID T
// Receipt result from the transaction send
Receipt *types.Receipt
......@@ -28,6 +28,8 @@ type Queue[T any] struct {
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - ctx: runtime context of the queue. If canceled, all ongoing send processes are canceled.
// - txMgt: transaction managre to use for transaction sending
// - maxPending: max number of pending txs at once (0 == no limit)
func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Queue[T] {
if maxPending > math.MaxInt {
......
......@@ -55,7 +55,7 @@ func (b *mockBackendWithNonce) NonceAt(ctx context.Context, account common.Addre
return uint64(len(b.minedTxs)), nil
}
func TestSend(t *testing.T) {
func TestQueue_Send(t *testing.T) {
testCases := []struct {
name string // name of the test
max uint64 // max concurrency of the queue
......@@ -204,24 +204,16 @@ func TestSend(t *testing.T) {
// make all the queue calls given in the test case
start := time.Now()
receiptChs := make([]chan TxReceipt[int], len(test.calls))
for i, c := range test.calls {
msg := fmt.Sprintf("Call %d", i)
c := c
receiptCh := make(chan TxReceipt[int], 1)
candidate := TxCandidate{
TxData: []byte{byte(i)},
To: &common.Address{},
}
queued := c.call(i, candidate, receiptCh, queue)
receiptChs[i] = make(chan TxReceipt[int], 1)
queued := c.call(i, candidate, receiptChs[i], queue)
require.Equal(t, c.queued, queued, msg)
go func() {
r := <-receiptCh
if c.txErr {
require.Error(t, r.Err, msg)
} else {
require.NoError(t, r.Err, msg)
}
}()
}
// wait for the queue to drain (all txs complete or failed)
queue.Wait()
......@@ -232,6 +224,20 @@ func TestSend(t *testing.T) {
// check that the nonces match
slices.Sort(nonces)
require.Equal(t, test.nonces, nonces, "expected nonces do not match")
// check receipts
for i, c := range test.calls {
if !c.queued {
// non-queued txs won't have a tx result
continue
}
msg := fmt.Sprintf("Receipt %d", i)
r := <-receiptChs[i]
if c.txErr {
require.Error(t, r.Err, msg)
} else {
require.NoError(t, r.Err, msg)
}
}
})
}
}
......@@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)
......@@ -143,6 +144,14 @@ func (m *SimpleTxManager) Close() {
m.backend.Close()
}
func (m *SimpleTxManager) txLogger(tx *types.Transaction, logGas bool) log.Logger {
fields := []any{"tx", tx.Hash(), "nonce", tx.Nonce()}
if logGas {
fields = append(fields, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap(), "gasLimit", tx.Gas())
}
return m.l.New(fields)
}
// TxCandidate is a transaction candidate that can be submitted to ask the
// [TxManager] to construct a transaction with gas price bounds.
type TxCandidate struct {
......@@ -327,7 +336,7 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
}
// If we see lots of unrecoverable errors (and no pending transactions) abort sending the transaction.
if sendState.ShouldAbortImmediately() {
m.l.Warn("Aborting transaction submission")
m.txLogger(tx, false).Warn("Aborting transaction submission")
return nil, errors.New("aborted transaction sending")
}
tx = publishAndWait(tx, true)
......@@ -347,10 +356,7 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
// it will bump the fees and retry.
// Returns the latest fee bumped tx, and a boolean indicating whether the tx was sent or not
func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, sendState *SendState, bumpFeesImmediately bool) (*types.Transaction, bool) {
updateLogFields := func(tx *types.Transaction) log.Logger {
return m.l.New("hash", tx.Hash(), "nonce", tx.Nonce(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
}
l := updateLogFields(tx)
l := m.txLogger(tx, true)
l.Info("Publishing transaction")
......@@ -364,7 +370,7 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
}
tx = newTx
sendState.bumpCount++
l = updateLogFields(tx)
l = m.txLogger(tx, true)
}
bumpFeesImmediately = true // bump fees next loop
......@@ -423,7 +429,7 @@ func (m *SimpleTxManager) waitForTx(ctx context.Context, tx *types.Transaction,
receipt, err := m.waitMined(ctx, tx, sendState)
if err != nil {
// this will happen if the tx was successfully replaced by a tx with bumped fees
log.Info("Transaction receipt not found", "err", err)
m.txLogger(tx, true).Info("Transaction receipt not found", "err", err)
return
}
select {
......@@ -457,15 +463,15 @@ func (m *SimpleTxManager) queryReceipt(ctx context.Context, txHash common.Hash,
receipt, err := m.backend.TransactionReceipt(ctx, txHash)
if errors.Is(err, ethereum.NotFound) {
sendState.TxNotMined(txHash)
m.l.Trace("Transaction not yet mined", "hash", txHash)
m.l.Trace("Transaction not yet mined", "tx", txHash)
return nil
} else if err != nil {
m.metr.RPCError()
m.l.Info("Receipt retrieval failed", "hash", txHash, "err", err)
m.l.Info("Receipt retrieval failed", "tx", txHash, "err", err)
return nil
} else if receipt == nil {
m.metr.RPCError()
m.l.Warn("Receipt and error are both nil", "hash", txHash)
m.l.Warn("Receipt and error are both nil", "tx", txHash)
return nil
}
......@@ -473,14 +479,17 @@ func (m *SimpleTxManager) queryReceipt(ctx context.Context, txHash common.Hash,
sendState.TxMined(txHash)
txHeight := receipt.BlockNumber.Uint64()
tipHeight, err := m.backend.BlockNumber(ctx)
tip, err := m.backend.HeaderByNumber(ctx, nil)
if err != nil {
m.l.Error("Unable to fetch block number", "err", err)
m.metr.RPCError()
m.l.Error("Unable to fetch tip", "err", err)
return nil
}
m.l.Debug("Transaction mined, checking confirmations", "hash", txHash, "txHeight", txHeight,
"tipHeight", tipHeight, "numConfirmations", m.cfg.NumConfirmations)
m.metr.RecordBasefee(tip.BaseFee)
m.l.Debug("Transaction mined, checking confirmations", "tx", txHash,
"block", eth.ReceiptBlockID(receipt), "tip", eth.HeaderBlockID(tip),
"numConfirmations", m.cfg.NumConfirmations)
// The transaction is considered confirmed when
// txHeight+numConfirmations-1 <= tipHeight. Note that the -1 is
......@@ -489,14 +498,17 @@ func (m *SimpleTxManager) queryReceipt(ctx context.Context, txHash common.Hash,
// transaction should be confirmed when txHeight is equal to
// tipHeight. The equation is rewritten in this form to avoid
// underflows.
tipHeight := tip.Number.Uint64()
if txHeight+m.cfg.NumConfirmations <= tipHeight+1 {
m.l.Info("Transaction confirmed", "hash", txHash)
m.l.Info("Transaction confirmed", "tx", txHash,
"block", eth.ReceiptBlockID(receipt),
"effectiveGasPrice", receipt.EffectiveGasPrice)
return receipt
}
// Safe to subtract since we know the LHS above is greater.
confsRemaining := (txHeight + m.cfg.NumConfirmations) - (tipHeight + 1)
m.l.Debug("Transaction not yet confirmed", "hash", txHash, "confsRemaining", confsRemaining)
m.l.Debug("Transaction not yet confirmed", "tx", txHash, "confsRemaining", confsRemaining)
return nil
}
......@@ -506,10 +518,10 @@ func (m *SimpleTxManager) queryReceipt(ctx context.Context, txHash common.Hash,
// doesn't linger in the mempool. Finally to avoid runaway price increases, fees are capped at a
// `feeLimitMultiplier` multiple of the suggested values.
func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
m.l.Info("bumping gas price for tx", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap(), "gaslimit", tx.Gas())
m.txLogger(tx, true).Info("bumping gas price for transaction")
tip, basefee, err := m.suggestGasPriceCaps(ctx)
if err != nil {
m.l.Warn("failed to get suggested gas tip and basefee", "err", err)
m.txLogger(tx, false).Warn("failed to get suggested gas tip and basefee", "err", err)
return nil, err
}
bumpedTip, bumpedFee := updateFees(tx.GasTipCap(), tx.GasFeeCap(), tip, basefee, m.l)
......@@ -542,12 +554,12 @@ func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transa
// original tx can get included in a block just before the above call. In this case the
// error is due to the tx reverting with message "block number must be equal to next
// expected block number"
m.l.Warn("failed to re-estimate gas", "err", err, "gaslimit", tx.Gas(),
m.l.Warn("failed to re-estimate gas", "err", err, "tx", tx.Hash(), "gaslimit", tx.Gas(),
"gasFeeCap", bumpedFee, "gasTipCap", bumpedTip)
return nil, err
}
if tx.Gas() != gas {
m.l.Info("re-estimated gas differs", "oldgas", tx.Gas(), "newgas", gas,
m.l.Info("re-estimated gas differs", "tx", tx.Hash(), "oldgas", tx.Gas(), "newgas", gas,
"gasFeeCap", bumpedFee, "gasTipCap", bumpedTip)
}
rawTx.Gas = gas
......@@ -556,7 +568,7 @@ func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transa
defer cancel()
newTx, err := m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
if err != nil {
m.l.Warn("failed to sign new transaction", "err", err)
m.l.Warn("failed to sign new transaction", "err", err, "tx", tx.Hash())
return tx, nil
}
return newTx, nil
......@@ -582,6 +594,8 @@ func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *b
} else if head.BaseFee == nil {
return nil, nil, errors.New("txmgr does not support pre-london blocks that do not have a basefee")
}
m.metr.RecordBasefee(head.BaseFee)
m.metr.RecordTipCap(tip)
return tip, head.BaseFee, nil
}
......
......@@ -203,7 +203,15 @@ func (b *mockBackend) CallContract(ctx context.Context, call ethereum.CallMsg, b
}
func (b *mockBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
b.mu.RLock()
defer b.mu.RUnlock()
num := big.NewInt(int64(b.blockHeight))
if number != nil {
num.Set(number)
}
return &types.Header{
Number: num,
BaseFee: b.g.basefee(),
}, nil
}
......@@ -696,6 +704,7 @@ func TestManagerErrsOnZeroConfs(t *testing.T) {
// inner loop of WaitMined properly handles this case.
type failingBackend struct {
returnSuccessBlockNumber bool
returnSuccessHeader bool
returnSuccessReceipt bool
baseFee, gasTip *big.Int
}
......@@ -727,8 +736,14 @@ func (b *failingBackend) TransactionReceipt(
}, nil
}
func (b *failingBackend) HeaderByNumber(_ context.Context, _ *big.Int) (*types.Header, error) {
func (b *failingBackend) HeaderByNumber(ctx context.Context, _ *big.Int) (*types.Header, error) {
if !b.returnSuccessHeader {
b.returnSuccessHeader = true
return nil, errRpcFailure
}
return &types.Header{
Number: big.NewInt(1),
BaseFee: b.baseFee,
}, nil
}
......@@ -800,8 +815,9 @@ func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) {
func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int64) (*types.Transaction, *types.Transaction) {
borkedBackend := failingBackend{
gasTip: big.NewInt(newTip),
baseFee: big.NewInt(newBaseFee),
gasTip: big.NewInt(newTip),
baseFee: big.NewInt(newBaseFee),
returnSuccessHeader: true,
}
mgr := &SimpleTxManager{
......@@ -933,8 +949,9 @@ func testIncreaseGasPriceLimit(t *testing.T, lt gasPriceLimitTest) {
borkedTip := int64(10)
borkedFee := int64(45)
borkedBackend := failingBackend{
gasTip: big.NewInt(borkedTip),
baseFee: big.NewInt(borkedFee),
gasTip: big.NewInt(borkedTip),
baseFee: big.NewInt(borkedFee),
returnSuccessHeader: true,
}
mgr := &SimpleTxManager{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment