Commit 582b56e1 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into inphi/fpp-boot

parents b6a28df8 57c008ef
...@@ -34,6 +34,7 @@ require ( ...@@ -34,6 +34,7 @@ require (
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
golang.org/x/crypto v0.6.0 golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
golang.org/x/term v0.5.0 golang.org/x/term v0.5.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
) )
...@@ -176,7 +177,6 @@ require ( ...@@ -176,7 +177,6 @@ require (
go.uber.org/zap v1.24.0 // indirect go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.8.0 // indirect golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.7.0 // indirect golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect golang.org/x/tools v0.6.0 // indirect
......
...@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, io.EOF return txData{}, io.EOF
} }
// we have blocks, but we cannot add them to the channel right now
if s.pendingChannel != nil && s.pendingChannel.IsFull() {
return txData{}, io.EOF
}
if err := s.ensurePendingChannel(l1Head); err != nil { if err := s.ensurePendingChannel(l1Head); err != nil {
return txData{}, err return txData{}, err
} }
......
...@@ -28,6 +28,7 @@ type Config struct { ...@@ -28,6 +28,7 @@ type Config struct {
NetworkTimeout time.Duration NetworkTimeout time.Duration
PollInterval time.Duration PollInterval time.Duration
MaxPendingTransactions uint64
// RollupConfig is queried at startup // RollupConfig is queried at startup
Rollup *rollup.Config Rollup *rollup.Config
...@@ -76,6 +77,10 @@ type CLIConfig struct { ...@@ -76,6 +77,10 @@ type CLIConfig struct {
// and creating a new batch. // and creating a new batch.
PollInterval time.Duration PollInterval time.Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
MaxPendingTransactions uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
...@@ -128,6 +133,7 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -128,6 +133,7 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
/* Optional Flags */ /* Optional Flags */
MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
......
...@@ -79,6 +79,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -79,6 +79,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
L2Client: l2Client, L2Client: l2Client,
RollupNode: rollupClient, RollupNode: rollupClient,
PollInterval: cfg.PollInterval, PollInterval: cfg.PollInterval,
MaxPendingTransactions: cfg.MaxPendingTransactions,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout, NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
TxManager: txManager, TxManager: txManager,
Rollup: rcfg, Rollup: rcfg,
...@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() { ...@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() {
ticker := time.NewTicker(l.PollInterval) ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop() defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx) l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx) l.publishStateToL1(queue, receiptsCh, false)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx) err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
return return
} }
} }
...@@ -300,28 +311,45 @@ func (l *BatchSubmitter) loop() { ...@@ -300,28 +311,45 @@ func (l *BatchSubmitter) loop() {
// publishStateToL1 loops through the block data loaded into `state` and // publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames. // submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData], drain bool) {
txDone := make(chan struct{})
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
go func() {
defer func() {
if drain {
// if draining, we wait for all transactions to complete
queue.Wait()
}
close(txDone)
}()
for { for {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
err := l.state.Close()
if err != nil { if err != nil {
l.log.Error("error closing the channel manager", "err", err) if drain && err != io.EOF {
l.log.Error("error sending tx while draining state", "err", err)
} }
case <-l.shutdownCtx.Done(): return
err := l.state.Close() }
if err != nil { }
l.log.Error("error closing the channel manager", "err", err) }()
for {
select {
case r := <-receiptsCh:
l.handleReceipt(r)
case <-txDone:
return
} }
default:
} }
}
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx) l1tip, err := l.l1Tip(ctx)
if err != nil { if err != nil {
l.log.Error("Failed to query L1 tip", "error", err) l.log.Error("Failed to query L1 tip", "error", err)
return return err
} }
l.recordL1Tip(l1tip) l.recordL1Tip(l1tip)
...@@ -329,41 +357,44 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { ...@@ -329,41 +357,44 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
txdata, err := l.state.TxData(l1tip.ID()) txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF { if err == io.EOF {
l.log.Trace("no transaction data available") l.log.Trace("no transaction data available")
break return err
} else if err != nil { } else if err != nil {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
break return err
}
// Record TX Status
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
} }
l.sendTransaction(txdata, queue, receiptsCh)
return nil
} }
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`. // sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management. // It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently. // This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) { func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false) intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err) l.log.Error("Failed to calculate intrinsic gas", "error", err)
return
} }
// Send the transaction through the txmgr candidate := txmgr.TxCandidate{
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress, To: &l.Rollup.BatchInboxAddress,
TxData: data, TxData: data,
GasLimit: intrinsicGas, GasLimit: intrinsicGas,
}); err != nil { }
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data)) queue.Send(txdata, candidate, receiptsCh)
return nil, err }
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.recordFailedTx(r.ID.ID(), r.Err)
} else { } else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data)) l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
return receipt, nil l.recordConfirmedTx(r.ID.ID(), r.Receipt)
} }
} }
......
...@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte { ...@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte {
return append([]byte{derive.DerivationVersion0}, td.frame.data...) return append([]byte{derive.DerivationVersion0}, td.frame.data...)
} }
func (td *txData) Len() int {
return 1 + len(td.frame.data)
}
// Frame returns the single frame of this tx data. // Frame returns the single frame of this tx data.
// //
// Note: when the batcher is changed to possibly send multiple frames per tx, // Note: when the batcher is changed to possibly send multiple frames per tx,
......
...@@ -2,6 +2,7 @@ package flags ...@@ -2,6 +2,7 @@ package flags
import ( import (
"fmt" "fmt"
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
...@@ -33,21 +34,27 @@ var ( ...@@ -33,21 +34,27 @@ var (
Usage: "HTTP provider URL for Rollup node", Usage: "HTTP provider URL for Rollup node",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
} }
// Optional flags
SubSafetyMarginFlag = cli.Uint64Flag{ SubSafetyMarginFlag = cli.Uint64Flag{
Name: "sub-safety-margin", Name: "sub-safety-margin",
Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " + Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " +
"from a channel's timeout and sequencing window, to guarantee safe inclusion " + "from a channel's timeout and sequencing window, to guarantee safe inclusion " +
"of a channel on L1.", "of a channel on L1.",
Value: 10,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"),
} }
PollIntervalFlag = cli.DurationFlag{ PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval", Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " + Usage: "How frequently to poll L2 for new blocks",
"creating a new batch", Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
} }
MaxPendingTransactionsFlag = cli.Uint64Flag{
// Optional flags Name: "max-pending-tx",
Usage: "The maximum number of pending transactions. 0 for no limit.",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_PENDING_TX"),
}
MaxChannelDurationFlag = cli.Uint64Flag{ MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration", Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.", Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
...@@ -91,11 +98,12 @@ var requiredFlags = []cli.Flag{ ...@@ -91,11 +98,12 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag, L1EthRpcFlag,
L2EthRpcFlag, L2EthRpcFlag,
RollupRpcFlag, RollupRpcFlag,
SubSafetyMarginFlag,
PollIntervalFlag,
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
SubSafetyMarginFlag,
PollIntervalFlag,
MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain" "github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-chain-ops/util" "github.com/ethereum-optimism/optimism/op-chain-ops/util"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
...@@ -933,14 +934,12 @@ func createOutput( ...@@ -933,14 +934,12 @@ func createOutput(
LatestBlockhash: header.Hash(), LatestBlockhash: header.Hash(),
} }
// TODO(mark): import the function from `op-node` to compute the hash // Compute the output root locally
// instead of doing this. Will update when testing against mainnet. l2OutputRoot, err := rollup.ComputeL2OutputRoot(&outputRootProof)
localOutputRootHash := crypto.Keccak256Hash( localOutputRootHash := common.Hash(l2OutputRoot)
outputRootProof.Version[:], if err != nil {
outputRootProof.StateRoot[:], return nil, bindings.TypesOutputRootProof{}, nil, err
outputRootProof.MessagePasserStorageRoot[:], }
outputRootProof.LatestBlockhash[:],
)
// ensure that the locally computed hash matches // ensure that the locally computed hash matches
if l2Output.OutputRoot != localOutputRootHash { if l2Output.OutputRoot != localOutputRootHash {
......
...@@ -596,6 +596,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -596,6 +596,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 1,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
......
...@@ -2,6 +2,7 @@ package flags ...@@ -2,6 +2,7 @@ package flags
import ( import (
"fmt" "fmt"
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
...@@ -32,13 +33,14 @@ var ( ...@@ -32,13 +33,14 @@ var (
Usage: "Address of the L2OutputOracle contract", Usage: "Address of the L2OutputOracle contract",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "L2OO_ADDRESS"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "L2OO_ADDRESS"),
} }
// Optional flags
PollIntervalFlag = cli.DurationFlag{ PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval", Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " + Usage: "How frequently to poll L2 for new blocks",
"creating a new batch", Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
} }
// Optional flags
AllowNonFinalizedFlag = cli.BoolFlag{ AllowNonFinalizedFlag = cli.BoolFlag{
Name: "allow-non-finalized", Name: "allow-non-finalized",
Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.", Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.",
...@@ -52,10 +54,10 @@ var requiredFlags = []cli.Flag{ ...@@ -52,10 +54,10 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag, L1EthRpcFlag,
RollupRpcFlag, RollupRpcFlag,
L2OOAddressFlag, L2OOAddressFlag,
PollIntervalFlag,
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
PollIntervalFlag,
AllowNonFinalizedFlag, AllowNonFinalizedFlag,
} }
......
...@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types" ...@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
type NoopTxMetrics struct{} type NoopTxMetrics struct{}
func (*NoopTxMetrics) RecordNonce(uint64) {} func (*NoopTxMetrics) RecordNonce(uint64) {}
func (*NoopTxMetrics) RecordPendingTx(int64) {}
func (*NoopTxMetrics) RecordGasBumpCount(int) {} func (*NoopTxMetrics) RecordGasBumpCount(int) {}
func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {} func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {}
func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {} func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {}
......
...@@ -12,6 +12,7 @@ type TxMetricer interface { ...@@ -12,6 +12,7 @@ type TxMetricer interface {
RecordGasBumpCount(int) RecordGasBumpCount(int)
RecordTxConfirmationLatency(int64) RecordTxConfirmationLatency(int64)
RecordNonce(uint64) RecordNonce(uint64)
RecordPendingTx(pending int64)
TxConfirmed(*types.Receipt) TxConfirmed(*types.Receipt)
TxPublished(string) TxPublished(string)
RPCError() RPCError()
...@@ -24,6 +25,7 @@ type TxMetrics struct { ...@@ -24,6 +25,7 @@ type TxMetrics struct {
txFeeHistogram prometheus.Histogram txFeeHistogram prometheus.Histogram
LatencyConfirmedTx prometheus.Gauge LatencyConfirmedTx prometheus.Gauge
currentNonce prometheus.Gauge currentNonce prometheus.Gauge
pendingTxs prometheus.Gauge
txPublishError *prometheus.CounterVec txPublishError *prometheus.CounterVec
publishEvent metrics.Event publishEvent metrics.Event
confirmEvent metrics.EventVec confirmEvent metrics.EventVec
...@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics { ...@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
Help: "Current nonce of the from address", Help: "Current nonce of the from address",
Subsystem: "txmgr", Subsystem: "txmgr",
}), }),
pendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts",
Subsystem: "txmgr",
}),
txPublishError: factory.NewCounterVec(prometheus.CounterOpts{ txPublishError: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "tx_publish_error_count", Name: "tx_publish_error_count",
...@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) { ...@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
t.currentNonce.Set(float64(nonce)) t.currentNonce.Set(float64(nonce))
} }
func (t *TxMetrics) RecordPendingTx(pending int64) {
t.pendingTxs.Set(float64(pending))
}
// TxConfirmed records lots of information about the confirmed transaction // TxConfirmed records lots of information about the confirmed transaction
func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) { func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) {
fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei) fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei)
......
package txmgr
import (
"context"
"math"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)
type TxReceipt[T any] struct {
// ID can be used to identify unique tx receipts within the recept channel
ID T
// Receipt result from the transaction send
Receipt *types.Receipt
// Err contains any error that occurred during the tx send
Err error
}
type Queue[T any] struct {
ctx context.Context
txMgr TxManager
maxPending uint64
groupLock sync.Mutex
groupCtx context.Context
group *errgroup.Group
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter.
func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Queue[T] {
if maxPending > math.MaxInt {
// ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue
maxPending = math.MaxInt
}
return &Queue[T]{
ctx: ctx,
txMgr: txMgr,
maxPending: maxPending,
}
}
// Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() {
if q.group == nil {
return
}
_ = q.group.Wait()
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
}
return err
}
// groupContext returns a Group and a Context to use when sending a tx.
//
// If any of the pending transactions returned an error, the queue's shared error Group is
// canceled. This method will wait on that Group for all pending transactions to return,
// and create a new Group with the queue's global context as its parent.
func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) {
q.groupLock.Lock()
defer q.groupLock.Unlock()
if q.groupCtx == nil || q.groupCtx.Err() != nil {
// no group exists, or the existing context has an error, so we need to wait
// for existing group threads to complete (if any) and create a new group
if q.group != nil {
_ = q.group.Wait()
}
q.group, q.groupCtx = errgroup.WithContext(q.ctx)
if q.maxPending > 0 {
q.group.SetLimit(int(q.maxPending))
}
}
return q.group, q.groupCtx
}
package txmgr
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
type queueFunc func(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool
func sendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
q.Send(id, candidate, receiptCh)
return true
}
func trySendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
return q.TrySend(id, candidate, receiptCh)
}
type queueCall struct {
call queueFunc // queue call (either Send or TrySend, use function helpers above)
queued bool // true if the send was queued
txErr bool // true if the tx send should return an error
}
type testTx struct {
sendErr bool // error to return from send for this tx
}
type testCase struct {
name string // name of the test
max uint64 // max concurrency of the queue
calls []queueCall // calls to the queue
txs []testTx // txs to generate from the factory (and potentially error in send)
nonces []uint64 // expected sent tx nonces after all calls are made
total time.Duration // approx. total time it should take to complete all queue calls
}
type mockBackendWithNonce struct {
mockBackend
}
func newMockBackendWithNonce(g *gasPricer) *mockBackendWithNonce {
return &mockBackendWithNonce{
mockBackend: mockBackend{
g: g,
minedTxs: make(map[common.Hash]minedTxInfo),
},
}
}
func (b *mockBackendWithNonce) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return uint64(len(b.minedTxs)), nil
}
func TestSend(t *testing.T) {
testCases := []testCase{
{
name: "success",
max: 5,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "no limit",
max: 0,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "single threaded",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: trySendQueueFunc, queued: false},
},
txs: []testTx{
{},
},
nonces: []uint64{0},
total: 1 * time.Second,
},
{
name: "single threaded blocking",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
},
nonces: []uint64{0, 1, 2},
total: 3 * time.Second,
},
{
name: "dual threaded blocking",
max: 2,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
{},
{},
},
nonces: []uint64{0, 1, 2, 3, 4},
total: 3 * time.Second,
},
{
name: "subsequent txs fail after tx failure",
max: 1,
calls: []queueCall{
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true, txErr: true},
{call: sendQueueFunc, queued: true, txErr: true},
},
txs: []testTx{
{},
{sendErr: true},
{},
},
nonces: []uint64{0, 1, 1},
total: 3 * time.Second,
},
}
for _, test := range testCases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
conf := configWithNumConfs(1)
conf.ReceiptQueryInterval = 1 * time.Second // simulate a network send
conf.ResubmissionTimeout = 2 * time.Second // resubmit to detect errors
conf.SafeAbortNonceTooLowCount = 1
backend := newMockBackendWithNonce(newGasPricer(3))
mgr := &SimpleTxManager{
chainID: conf.ChainID,
name: "TEST",
cfg: conf,
backend: backend,
l: testlog.Logger(t, log.LvlCrit),
metr: &metrics.NoopTxMetrics{},
}
// track the nonces, and return any expected errors from tx sending
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index := int(tx.Data()[0])
nonces = append(nonces, tx.Nonce())
var testTx *testTx
if index < len(test.txs) {
testTx = &test.txs[index]
}
if testTx != nil && testTx.sendErr {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
backend.mine(&txHash, tx.GasFeeCap())
return nil
}
backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
queue := NewQueue[int](ctx, mgr, test.max)
// make all the queue calls given in the test case
start := time.Now()
for i, c := range test.calls {
msg := fmt.Sprintf("Call %d", i)
c := c
receiptCh := make(chan TxReceipt[int], 1)
candidate := TxCandidate{
TxData: []byte{byte(i)},
To: &common.Address{},
}
queued := c.call(i, candidate, receiptCh, queue)
require.Equal(t, c.queued, queued, msg)
go func() {
r := <-receiptCh
if c.txErr {
require.Error(t, r.Err, msg)
} else {
require.NoError(t, r.Err, msg)
}
}()
}
// wait for the queue to drain (all txs complete or failed)
queue.Wait()
duration := time.Since(start)
// expect the execution time within a certain window
now := time.Now()
require.WithinDuration(t, now.Add(test.total), now.Add(duration), 500*time.Millisecond, "unexpected queue transaction timing")
// check that the nonces match
slices.Sort(nonces)
require.Equal(t, test.nonces, nonces, "expected nonces do not match")
})
}
}
...@@ -4,10 +4,10 @@ import ( ...@@ -4,10 +4,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
...@@ -38,7 +38,7 @@ type TxManager interface { ...@@ -38,7 +38,7 @@ type TxManager interface {
// It can be stopped by cancelling the provided context; however, the transaction // It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled. // may be included on L1 even if the context is cancelled.
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send can be called concurrently, the nonce will be managed internally.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)
// From returns the sending address associated with the instance of the transaction manager. // From returns the sending address associated with the instance of the transaction manager.
...@@ -84,6 +84,11 @@ type SimpleTxManager struct { ...@@ -84,6 +84,11 @@ type SimpleTxManager struct {
backend ETHBackend backend ETHBackend
l log.Logger l log.Logger
metr metrics.TxMetricer metr metrics.TxMetricer
nonce *uint64
nonceLock sync.RWMutex
pending atomic.Int64
} }
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
...@@ -126,8 +131,21 @@ type TxCandidate struct { ...@@ -126,8 +131,21 @@ type TxCandidate struct {
// The transaction manager handles all signing. If and only if the gas limit is 0, the // The transaction manager handles all signing. If and only if the gas limit is 0, the
// transaction manager will do a gas estimation. // transaction manager will do a gas estimation.
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)
if err != nil {
m.resetNonce()
}
return receipt, err
}
// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 { if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout) ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
...@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err) return nil, fmt.Errorf("failed to create the tx: %w", err)
} }
return m.send(ctx, tx) return m.sendTx(ctx, tx)
} }
// craftTx creates the signed transaction // craftTx creates the signed transaction
...@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
} }
gasFeeCap := calcGasFeeCap(basefee, gasTipCap) gasFeeCap := calcGasFeeCap(basefee, gasTipCap)
// Fetch the sender's nonce from the latest known block (nil `blockNumber`) nonce, err := m.nextNonce(ctx)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil { if err != nil {
m.metr.RPCError() return nil, err
return nil, fmt.Errorf("failed to get nonce: %w", err)
} }
m.metr.RecordNonce(nonce)
rawTx := &types.DynamicFeeTx{ rawTx := &types.DynamicFeeTx{
ChainID: m.chainID, ChainID: m.chainID,
...@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
rawTx.Gas = gas rawTx.Gas = gas
} }
ctx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout) ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel() defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx)) return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
} }
// nextNonce returns a nonce to use for the next transaction. It uses
// eth_getTransactionCount with "latest" once, and then subsequent calls simply
// increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again.
func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
if m.nonce == nil {
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil {
m.metr.RPCError()
return 0, fmt.Errorf("failed to get nonce: %w", err)
}
m.nonce = &nonce
} else {
*m.nonce++
}
m.metr.RecordNonce(*m.nonce)
return *m.nonce, nil
}
// resetNonce resets the internal nonce tracking. This is called if any pending send
// returns an error.
func (m *SimpleTxManager) resetNonce() {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
m.nonce = nil
}
// send submits the same transaction several times with increasing gas prices as necessary. // send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain. // It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
......
...@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) { ...@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) { ...@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded) require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt) require.Nil(t, receipt)
} }
...@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) { ...@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) { ...@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded) require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt) require.Nil(t, receipt)
} }
...@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) { ...@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
...@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) { ...@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) { ...@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.send(ctx, tx) receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed) require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
...@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) { ...@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) {
}) })
} }
} }
func TestNonceReset(t *testing.T) {
conf := configWithNumConfs(1)
conf.SafeAbortNonceTooLowCount = 1
h := newTestHarnessWithConfig(t, conf)
index := -1
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index++
nonces = append(nonces, tx.Nonce())
// fail every 3rd tx
if index%3 == 0 {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
return nil
}
h.backend.setTxSender(sendTx)
ctx := context.Background()
for i := 0; i < 8; i++ {
_, err := h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
// expect every 3rd tx to fail
if i%3 == 0 {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
// internal nonce tracking should be reset every 3rd tx
require.Equal(t, []uint64{0, 0, 1, 2, 0, 1, 2, 0}, nonces)
}
...@@ -31,6 +31,7 @@ require ( ...@@ -31,6 +31,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fjl/memsize v0.0.1 // indirect github.com/fjl/memsize v0.0.1 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
......
...@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j ...@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
......
package avg_sliding_window
import (
"time"
lm "github.com/emirpasic/gods/maps/linkedhashmap"
)
type Clock interface {
Now() time.Time
}
// DefaultClock provides a clock that gets current time from the system time
type DefaultClock struct{}
func NewDefaultClock() *DefaultClock {
return &DefaultClock{}
}
func (c DefaultClock) Now() time.Time {
return time.Now()
}
// AdjustableClock provides a static clock to easily override the system time
type AdjustableClock struct {
now time.Time
}
func NewAdjustableClock(now time.Time) *AdjustableClock {
return &AdjustableClock{now: now}
}
func (c *AdjustableClock) Now() time.Time {
return c.now
}
func (c *AdjustableClock) Set(now time.Time) {
c.now = now
}
type bucket struct {
sum float64
qty uint
}
// AvgSlidingWindow calculates moving averages efficiently.
// Data points are rounded to nearest bucket of size `bucketSize`,
// and evicted when they are too old based on `windowLength`
type AvgSlidingWindow struct {
bucketSize time.Duration
windowLength time.Duration
clock Clock
buckets *lm.Map
qty uint
sum float64
}
type SlidingWindowOpts func(sw *AvgSlidingWindow)
func NewSlidingWindow(opts ...SlidingWindowOpts) *AvgSlidingWindow {
sw := &AvgSlidingWindow{
buckets: lm.New(),
}
for _, opt := range opts {
opt(sw)
}
if sw.bucketSize == 0 {
sw.bucketSize = time.Second
}
if sw.windowLength == 0 {
sw.windowLength = 5 * time.Minute
}
if sw.clock == nil {
sw.clock = NewDefaultClock()
}
return sw
}
func WithWindowLength(windowLength time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.windowLength = windowLength
}
}
func WithBucketSize(bucketSize time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.bucketSize = bucketSize
}
}
func WithClock(clock Clock) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.clock = clock
}
}
func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
return windowStart.Before(t) && !t.After(now)
}
// Add inserts a new data point into the window, with value `val` with the current time
func (sw *AvgSlidingWindow) Add(val float64) {
t := sw.clock.Now()
sw.AddWithTime(t, val)
}
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
sw.advance()
key := t.Round(sw.bucketSize)
if !sw.inWindow(key) {
return
}
var b *bucket
current, found := sw.buckets.Get(key)
if !found {
b = &bucket{}
} else {
b = current.(*bucket)
}
// update bucket
bsum := b.sum
b.qty += 1
b.sum = bsum + val
// update window
wsum := sw.sum
sw.qty += 1
sw.sum = wsum - bsum + b.sum
sw.buckets.Put(key, b)
}
// advance evicts old data points
func (sw *AvgSlidingWindow) advance() {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
keys := sw.buckets.Keys()
for _, k := range keys {
if k.(time.Time).After(windowStart) {
break
}
val, _ := sw.buckets.Get(k)
b := val.(*bucket)
sw.buckets.Remove(k)
if sw.buckets.Size() > 0 {
sw.qty -= b.qty
sw.sum = sw.sum - b.sum
} else {
sw.qty = 0
sw.sum = 0.0
}
}
}
// Avg retrieves the current average for the sliding window
func (sw *AvgSlidingWindow) Avg() float64 {
sw.advance()
if sw.qty == 0 {
return 0
}
return sw.sum / float64(sw.qty)
}
// Sum retrieves the current sum for the sliding window
func (sw *AvgSlidingWindow) Sum() float64 {
sw.advance()
return sw.sum
}
// Count retrieves the data point count for the sliding window
func (sw *AvgSlidingWindow) Count() uint {
sw.advance()
return sw.qty
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment