Commit ac086453 authored by Michael de Hoog's avatar Michael de Hoog

Review comments:

 - Move pending metrics to txmgr package (and switch to int64)
 - Timeout context in tests
 - require.Duration
 - Fix comment docs
parent c38a81a4
...@@ -289,7 +289,7 @@ func (l *BatchSubmitter) loop() { ...@@ -289,7 +289,7 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop() defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData]) receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions, l.metr.RecordPendingTx) queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)
for { for {
select { select {
......
...@@ -34,7 +34,6 @@ type Metricer interface { ...@@ -34,7 +34,6 @@ type Metricer interface {
RecordChannelFullySubmitted(id derive.ChannelID) RecordChannelFullySubmitted(id derive.ChannelID)
RecordChannelTimedOut(id derive.ChannelID) RecordChannelTimedOut(id derive.ChannelID)
RecordPendingTx(pending uint64)
RecordBatchTxSubmitted() RecordBatchTxSubmitted()
RecordBatchTxSuccess() RecordBatchTxSuccess()
RecordBatchTxFailed() RecordBatchTxFailed()
...@@ -68,7 +67,6 @@ type Metrics struct { ...@@ -68,7 +67,6 @@ type Metrics struct {
ChannelInputBytesTotal prometheus.Counter ChannelInputBytesTotal prometheus.Counter
ChannelOutputBytesTotal prometheus.Counter ChannelOutputBytesTotal prometheus.Counter
PendingTxs prometheus.Gauge
BatcherTxEvs opmetrics.EventVec BatcherTxEvs opmetrics.EventVec
} }
...@@ -159,12 +157,6 @@ func NewMetrics(procName string) *Metrics { ...@@ -159,12 +157,6 @@ func NewMetrics(procName string) *Metrics {
Help: "Total number of compressed output bytes from a channel.", Help: "Total number of compressed output bytes from a channel.",
}), }),
PendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts.",
}),
BatcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}), BatcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}),
} }
} }
...@@ -264,10 +256,6 @@ func (m *Metrics) RecordChannelTimedOut(id derive.ChannelID) { ...@@ -264,10 +256,6 @@ func (m *Metrics) RecordChannelTimedOut(id derive.ChannelID) {
m.ChannelEvs.Record(StageTimedOut) m.ChannelEvs.Record(StageTimedOut)
} }
func (m *Metrics) RecordPendingTx(pending uint64) {
m.PendingTxs.Set(float64(pending))
}
func (m *Metrics) RecordBatchTxSubmitted() { func (m *Metrics) RecordBatchTxSubmitted() {
m.BatcherTxEvs.Record(TxStageSubmitted) m.BatcherTxEvs.Record(TxStageSubmitted)
} }
......
...@@ -29,7 +29,6 @@ func (*noopMetrics) RecordChannelClosed(derive.ChannelID, int, int, int, int, er ...@@ -29,7 +29,6 @@ func (*noopMetrics) RecordChannelClosed(derive.ChannelID, int, int, int, int, er
func (*noopMetrics) RecordChannelFullySubmitted(derive.ChannelID) {} func (*noopMetrics) RecordChannelFullySubmitted(derive.ChannelID) {}
func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {} func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {}
func (*noopMetrics) RecordPendingTx(uint64) {}
func (*noopMetrics) RecordBatchTxSubmitted() {} func (*noopMetrics) RecordBatchTxSubmitted() {}
func (*noopMetrics) RecordBatchTxSuccess() {} func (*noopMetrics) RecordBatchTxSuccess() {}
func (*noopMetrics) RecordBatchTxFailed() {} func (*noopMetrics) RecordBatchTxFailed() {}
...@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types" ...@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
type NoopTxMetrics struct{} type NoopTxMetrics struct{}
func (*NoopTxMetrics) RecordNonce(uint64) {} func (*NoopTxMetrics) RecordNonce(uint64) {}
func (*NoopTxMetrics) RecordPendingTx(int64) {}
func (*NoopTxMetrics) RecordGasBumpCount(int) {} func (*NoopTxMetrics) RecordGasBumpCount(int) {}
func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {} func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {}
func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {} func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {}
......
...@@ -12,6 +12,7 @@ type TxMetricer interface { ...@@ -12,6 +12,7 @@ type TxMetricer interface {
RecordGasBumpCount(int) RecordGasBumpCount(int)
RecordTxConfirmationLatency(int64) RecordTxConfirmationLatency(int64)
RecordNonce(uint64) RecordNonce(uint64)
RecordPendingTx(pending int64)
TxConfirmed(*types.Receipt) TxConfirmed(*types.Receipt)
TxPublished(string) TxPublished(string)
RPCError() RPCError()
...@@ -24,6 +25,7 @@ type TxMetrics struct { ...@@ -24,6 +25,7 @@ type TxMetrics struct {
txFeeHistogram prometheus.Histogram txFeeHistogram prometheus.Histogram
LatencyConfirmedTx prometheus.Gauge LatencyConfirmedTx prometheus.Gauge
currentNonce prometheus.Gauge currentNonce prometheus.Gauge
pendingTxs prometheus.Gauge
txPublishError *prometheus.CounterVec txPublishError *prometheus.CounterVec
publishEvent metrics.Event publishEvent metrics.Event
confirmEvent metrics.EventVec confirmEvent metrics.EventVec
...@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics { ...@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
Help: "Current nonce of the from address", Help: "Current nonce of the from address",
Subsystem: "txmgr", Subsystem: "txmgr",
}), }),
pendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts",
Subsystem: "txmgr",
}),
txPublishError: factory.NewCounterVec(prometheus.CounterOpts{ txPublishError: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "tx_publish_error_count", Name: "tx_publish_error_count",
...@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) { ...@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
t.currentNonce.Set(float64(nonce)) t.currentNonce.Set(float64(nonce))
} }
func (t *TxMetrics) RecordPendingTx(pending int64) {
t.pendingTxs.Set(float64(pending))
}
// TxConfirmed records lots of information about the confirmed transaction // TxConfirmed records lots of information about the confirmed transaction
func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) { func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) {
fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei) fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei)
......
...@@ -2,12 +2,10 @@ package txmgr ...@@ -2,12 +2,10 @@ package txmgr
import ( import (
"context" "context"
"math"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"math"
"sync"
) )
type TxReceipt[T any] struct { type TxReceipt[T any] struct {
...@@ -23,8 +21,6 @@ type Queue[T any] struct { ...@@ -23,8 +21,6 @@ type Queue[T any] struct {
ctx context.Context ctx context.Context
txMgr TxManager txMgr TxManager
maxPending uint64 maxPending uint64
pendingChanged func(uint64)
pending atomic.Uint64
groupLock sync.Mutex groupLock sync.Mutex
groupCtx context.Context groupCtx context.Context
group *errgroup.Group group *errgroup.Group
...@@ -34,7 +30,7 @@ type Queue[T any] struct { ...@@ -34,7 +30,7 @@ type Queue[T any] struct {
// - maxPending: max number of pending txs at once (0 == no limit) // - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a tx send starts or finishes. The // - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter. // number of currently pending txs is passed as a parameter.
func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64, pendingChanged func(uint64)) *Queue[T] { func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Queue[T] {
if maxPending > math.MaxInt { if maxPending > math.MaxInt {
// ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue // ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue
maxPending = math.MaxInt maxPending = math.MaxInt
...@@ -43,7 +39,6 @@ func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64, pe ...@@ -43,7 +39,6 @@ func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64, pe
ctx: ctx, ctx: ctx,
txMgr: txMgr, txMgr: txMgr,
maxPending: maxPending, maxPending: maxPending,
pendingChanged: pendingChanged,
} }
} }
...@@ -59,7 +54,7 @@ func (q *Queue[T]) Wait() { ...@@ -59,7 +54,7 @@ func (q *Queue[T]) Wait() {
// and then send the next tx. // and then send the next tx.
// //
// The actual tx sending is non-blocking, with the receipt returned on the // The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel .If the channel is unbuffered, the goroutine is // provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from. // blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) { func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext() group, ctx := q.groupContext()
...@@ -85,10 +80,6 @@ func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt ...@@ -85,10 +80,6 @@ func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt
} }
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error { func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
q.pendingChanged(q.pending.Add(1))
defer func() {
q.pendingChanged(q.pending.Add(^uint64(0))) // -1
}()
receipt, err := q.txMgr.Send(ctx, candidate) receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{ receiptCh <- TxReceipt[T]{
ID: id, ID: id,
...@@ -98,11 +89,11 @@ func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, rece ...@@ -98,11 +89,11 @@ func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, rece
return err return err
} }
// mergeWithGroupContext creates a new Context that is canceled if either the given context is // groupContext returns a Group and a Context to use when sending a tx.
// Done, or the group context is canceled. The returned CancelFunc should be called once finished.
// //
// If the group context doesn't exist or has already been canceled, a new one is created after // If any of the pending transactions returned an error, the queue's shared error Group is
// waiting for existing group threads to complete. // canceled. This method will wait on that Group for all pending transactions to return,
// and create a new Group with the queue's global context as its parent.
func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) { func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) {
q.groupLock.Lock() q.groupLock.Lock()
defer q.groupLock.Unlock() defer q.groupLock.Unlock()
......
...@@ -200,8 +200,9 @@ func TestSend(t *testing.T) { ...@@ -200,8 +200,9 @@ func TestSend(t *testing.T) {
} }
backend.setTxSender(sendTx) backend.setTxSender(sendTx)
ctx := context.Background() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
queue := NewQueue[int](ctx, mgr, test.max, func(uint64) {}) defer cancel()
queue := NewQueue[int](ctx, mgr, test.max)
// make all the queue calls given in the test case // make all the queue calls given in the test case
start := time.Now() start := time.Now()
...@@ -228,6 +229,8 @@ func TestSend(t *testing.T) { ...@@ -228,6 +229,8 @@ func TestSend(t *testing.T) {
queue.Wait() queue.Wait()
duration := time.Since(start) duration := time.Since(start)
// expect the execution time within a certain window // expect the execution time within a certain window
now := time.Now()
require.WithinDuration(t, now.Add(test.total), now.Add(duration), 500*time.Millisecond, "unexpected queue transaction timing")
require.Greater(t, duration, test.total, "test was faster than expected") require.Greater(t, duration, test.total, "test was faster than expected")
require.Less(t, duration, test.total+500*time.Millisecond, "test was slower than expected") require.Less(t, duration, test.total+500*time.Millisecond, "test was slower than expected")
// check that the nonces match // check that the nonces match
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"math/big" "math/big"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
...@@ -88,6 +89,8 @@ type SimpleTxManager struct { ...@@ -88,6 +89,8 @@ type SimpleTxManager struct {
nonce *uint64 nonce *uint64
nonceLock sync.RWMutex nonceLock sync.RWMutex
pending atomic.Int64
} }
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
...@@ -132,6 +135,10 @@ type TxCandidate struct { ...@@ -132,6 +135,10 @@ type TxCandidate struct {
// //
// NOTE: Send can be called concurrently, the nonce will be managed internally. // NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate) receipt, err := m.send(ctx, candidate)
if err != nil { if err != nil {
m.resetNonce() m.resetNonce()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment