Commit d332a210 authored by Felipe Andrade's avatar Felipe Andrade

metrics, instrumented clients, already known nonce handling and retry

parent 6ffc10fb
...@@ -4,31 +4,61 @@ go 1.20 ...@@ -4,31 +4,61 @@ go 1.20
require ( require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/ethereum-optimism/optimism/op-service v0.10.14-0.20230209153120-0338ea88dff7
github.com/ethereum-optimism/optimism/op-signer v0.1.1 github.com/ethereum-optimism/optimism/op-signer v0.1.1
github.com/ethereum/go-ethereum v1.12.0 github.com/ethereum/go-ethereum v1.12.0
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/rs/cors v1.8.2 github.com/rs/cors v1.8.2
) )
require ( require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/VictoriaMetrics/fastcache v1.10.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/ethereum-optimism/optimism/op-service v0.10.14-0.20230209153120-0338ea88dff7 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/gorilla/websocket v1.5.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rivo/uniseg v0.3.4 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.5.0 // indirect github.com/tklauser/numcpus v0.5.0 // indirect
github.com/urfave/cli v1.22.9 // indirect github.com/urfave/cli v1.22.9 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.1.0 // indirect golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/sys v0.7.0 // indirect golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
) )
This diff is collapsed.
...@@ -57,12 +57,17 @@ type ProviderConfig struct { ...@@ -57,12 +57,17 @@ type ProviderConfig struct {
Disabled bool `toml:"disabled"` Disabled bool `toml:"disabled"`
Network string `toml:"network"` Network string `toml:"network"`
URL string `toml:"url"` URL string `toml:"url"`
ReadOnly bool `toml:"read_only"` ReadOnly bool `toml:"read_only"`
ReadInterval TOMLDuration `toml:"read_interval"` ReadInterval TOMLDuration `toml:"read_interval"`
SendInterval TOMLDuration `toml:"send_interval"` SendInterval TOMLDuration `toml:"send_interval"`
Wallet string `toml:"wallet"` SendTransactionRetryInterval TOMLDuration `toml:"send_transaction_retry_interval"`
SendTransactionRetryTimeout TOMLDuration `toml:"send_transaction_retry_timeout"`
ReceiptRetrievalInterval TOMLDuration `toml:"receipt_retrieval_interval"` ReceiptRetrievalInterval TOMLDuration `toml:"receipt_retrieval_interval"`
ReceiptRetrievalTimeout TOMLDuration `toml:"receipt_retrieval_timeout"` ReceiptRetrievalTimeout TOMLDuration `toml:"receipt_retrieval_timeout"`
Wallet string `toml:"wallet"`
} }
func New(file string) (*Config, error) { func New(file string) (*Config, error) {
...@@ -143,8 +148,11 @@ func (c *Config) Validate() error { ...@@ -143,8 +148,11 @@ func (c *Config) Validate() error {
if provider.SendInterval == 0 { if provider.SendInterval == 0 {
return errors.Errorf("provider [%s] send_interval is missing", name) return errors.Errorf("provider [%s] send_interval is missing", name)
} }
if provider.Wallet == "" { if provider.SendTransactionRetryInterval == 0 {
return errors.Errorf("provider [%s] wallet is missing", name) return errors.Errorf("provider [%s] send_transaction_retry_interval is missing", name)
}
if provider.SendTransactionRetryTimeout == 0 {
return errors.Errorf("provider [%s] send_transaction_retry_timeout is missing", name)
} }
if provider.ReceiptRetrievalInterval == 0 { if provider.ReceiptRetrievalInterval == 0 {
return errors.Errorf("provider [%s] receipt_retrieval_interval is missing", name) return errors.Errorf("provider [%s] receipt_retrieval_interval is missing", name)
...@@ -152,6 +160,9 @@ func (c *Config) Validate() error { ...@@ -152,6 +160,9 @@ func (c *Config) Validate() error {
if provider.ReceiptRetrievalTimeout == 0 { if provider.ReceiptRetrievalTimeout == 0 {
return errors.Errorf("provider [%s] receipt_retrieval_timeout is missing", name) return errors.Errorf("provider [%s] receipt_retrieval_timeout is missing", name)
} }
if provider.Wallet == "" {
return errors.Errorf("provider [%s] wallet is missing", name)
}
if _, ok := c.Wallets[provider.Wallet]; !ok { if _, ok := c.Wallets[provider.Wallet]; !ok {
return errors.Errorf("provider [%s] has an invalid wallet [%s]", name, provider.Wallet) return errors.Errorf("provider [%s] has an invalid wallet [%s]", name, provider.Wallet)
} }
......
package clients
import (
"context"
"op-ufm/pkg/metrics"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
type InstrumentedEthClient struct {
c *ethclient.Client
providerName string
}
func Dial(providerName string, url string) (*InstrumentedEthClient, error) {
start := time.Now()
c, err := ethclient.Dial(url)
if err != nil {
metrics.RecordError(providerName, "ethclient.Dial")
return nil, err
}
metrics.RecordRPCLatency(providerName, "ethclient", "Dial", time.Since(start))
return &InstrumentedEthClient{c: c, providerName: providerName}, nil
}
func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) {
start := time.Now()
tx, isPending, err := i.c.TransactionByHash(ctx, hash)
if err != nil {
if !i.IgnorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionByHash")
}
return nil, false, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "TransactionByHash", time.Since(start))
return tx, isPending, err
}
func (i *InstrumentedEthClient) PendingNonceAt(ctx context.Context, address string) (uint64, error) {
start := time.Now()
nonce, err := i.c.PendingNonceAt(ctx, common.HexToAddress(address))
if err != nil {
metrics.RecordError(i.providerName, "ethclient.PendingNonceAt")
return 0, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "PendingNonceAt", time.Since(start))
return nonce, err
}
func (i *InstrumentedEthClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
start := time.Now()
receipt, err := i.c.TransactionReceipt(ctx, txHash)
if err != nil {
if !i.IgnorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionReceipt")
}
return nil, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "TransactionReceipt", time.Since(start))
return receipt, err
}
func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.Transaction) error {
start := time.Now()
err := i.c.SendTransaction(ctx, tx)
if err != nil {
if !i.IgnorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.SendTransaction")
}
return err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "SendTransaction", time.Since(start))
return err
}
func (i *InstrumentedEthClient) IgnorableErrors(err error) bool {
msg := err.Error()
// we dont use errors.Is because eth client actually uses errors.New,
// therefore creating an incomparable instance :(
return msg == ethereum.NotFound.Error() ||
msg == txpool.ErrAlreadyKnown.Error() ||
msg == core.ErrNonceTooLow.Error()
}
package clients
import (
"context"
"math/big"
"op-ufm/pkg/metrics"
"time"
optls "github.com/ethereum-optimism/optimism/op-service/tls"
signer "github.com/ethereum-optimism/optimism/op-signer/client"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type InstrumentedSignerClient struct {
c *signer.SignerClient
providerName string
}
func NewSignerClient(providerName string, logger log.Logger, endpoint string, tlsConfig optls.CLIConfig) (*InstrumentedSignerClient, error) {
start := time.Now()
c, err := signer.NewSignerClient(logger, endpoint, tlsConfig)
if err != nil {
metrics.RecordError(providerName, "signer.NewSignerClient")
return nil, err
}
metrics.RecordRPCLatency(providerName, "signer", "NewSignerClient", time.Since(start))
return &InstrumentedSignerClient{c: c, providerName: providerName}, nil
}
func (i *InstrumentedSignerClient) SignTransaction(ctx context.Context, chainId *big.Int, tx *types.Transaction) (*types.Transaction, error) {
start := time.Now()
tx, err := i.c.SignTransaction(ctx, chainId, tx)
if err != nil {
metrics.RecordError(i.providerName, "signer.SignTransaction")
return nil, err
}
metrics.RecordRPCLatency(i.providerName, "signer", "SignTransaction", time.Since(start))
return tx, err
}
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
MetricsNamespace = "ufm"
)
var (
errorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "errors_total",
Help: "Count of errors.",
}, []string{
"provider",
"error",
})
rpcLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "rpc_latency",
Help: "RPC latency per provider, client and method (ms)",
}, []string{
"provider",
"client",
"method",
})
roundTripLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "roundtrip_latency",
Help: "Round trip latency per provider (ms)",
}, []string{
"provider",
})
gasUsed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "gas_used",
Help: "Gas used per provider",
}, []string{
"provider",
})
firstSeenLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "first_seen_latency",
Help: "First seen latency latency per provider (ms)",
}, []string{
"provider_source",
"provider_seen",
})
providerToProviderLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "provider_to_provider_latency",
Help: "Provider to provider latency (ms)",
}, []string{
"provider_source",
"provider_seen",
})
networkTransactionsInFlight = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "transactions_inflight",
Help: "Transactions in flight, per network",
}, []string{
"network",
})
)
func RecordError(provider string, errorLabel string) {
errorsTotal.WithLabelValues(provider, errorLabel).Inc()
}
func RecordRPCLatency(provider string, client string, method string, latency time.Duration) {
rpcLatency.WithLabelValues(provider, client, method).Set(float64(latency.Milliseconds()))
}
func RecordRoundTripLatency(provider string, latency time.Duration) {
roundTripLatency.WithLabelValues(provider).Set(float64(latency.Milliseconds()))
}
func RecordGasUsed(provider string, val uint64) {
gasUsed.WithLabelValues(provider).Set(float64(val))
}
func RecordFirstSeenLatency(provider_source string, provider_seen string, latency time.Duration) {
firstSeenLatency.WithLabelValues(provider_source, provider_seen).Set(float64(latency.Milliseconds()))
}
func RecordProviderToProviderLatency(provider_source string, provider_seen string, latency time.Duration) {
firstSeenLatency.WithLabelValues(provider_source, provider_seen).Set(float64(latency.Milliseconds()))
}
func RecordTransactionsInFlight(network string, count int) {
networkTransactionsInFlight.WithLabelValues(network).Set(float64(count))
}
package provider
import (
"context"
"github.com/ethereum/go-ethereum/ethclient"
)
func (p *Provider) dial(ctx context.Context) (*ethclient.Client, error) {
return ethclient.Dial(p.config.URL)
}
...@@ -2,6 +2,8 @@ package provider ...@@ -2,6 +2,8 @@ package provider
import ( import (
"context" "context"
"op-ufm/pkg/metrics"
"op-ufm/pkg/metrics/clients"
"time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
...@@ -11,43 +13,69 @@ import ( ...@@ -11,43 +13,69 @@ import (
// Heartbeat polls for expected in-flight transactions // Heartbeat polls for expected in-flight transactions
func (p *Provider) Heartbeat(ctx context.Context) { func (p *Provider) Heartbeat(ctx context.Context) {
log.Debug("heartbeat", "provider", p.name) log.Debug("heartbeat", "provider", p.name, "inflight", len(p.txPool.Transactions))
if len(p.txPool.Transactions) == 0 { metrics.RecordTransactionsInFlight(p.config.Network, len(p.txPool.Transactions))
log.Debug("no in-flight txs", "provider", p.name)
// let's exclude transactions already seen by this provider, or originated by it
expectedTransactions := make([]*TransactionState, 0, len(p.txPool.Transactions))
alreadySeen := 0
for _, st := range p.txPool.Transactions {
if st.ProviderSentTo == p.name {
continue
}
if _, exist := st.SeenBy[p.name]; exist {
alreadySeen++
continue
}
expectedTransactions = append(expectedTransactions, st)
}
if len(expectedTransactions) == 0 {
log.Debug("no expected txs", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen)
return return
} }
ethClient, err := p.dial(ctx) client, err := clients.Dial(p.name, p.config.URL)
if err != nil { if err != nil {
log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err) log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err)
} }
log.Debug("checking in-flight tx", "count", len(p.txPool.Transactions), "provider", p.name) log.Debug("checking in-flight tx", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen)
for hash, st := range p.txPool.Transactions { for _, st := range expectedTransactions {
log.Debug(hash, "st", st) hash := st.Hash.Hex()
_, isPending, err := ethClient.TransactionByHash(ctx, st.Hash) _, isPending, err := client.TransactionByHash(ctx, st.Hash)
if err != nil && !errors.Is(err, ethereum.NotFound) { if err != nil && !errors.Is(err, ethereum.NotFound) {
log.Error("cant check transaction", "provider", p.name, "url", p.config.URL, "err", err) log.Error("cant check transaction", "provider", p.name, "hash", hash, "url", p.config.URL, "err", err)
continue continue
} }
log.Debug("got transaction", "provider", p.name, "hash", hash, "isPending", isPending) log.Debug("got transaction", "provider", p.name, "hash", hash, "isPending", isPending)
// mark transaction as seen by this provider
st.M.Lock() st.M.Lock()
if st.FirstSeen.IsZero() { if st.FirstSeen.IsZero() {
st.FirstSeen = time.Now() st.FirstSeen = time.Now()
firstSeenLatency := time.Since(st.SentAt)
metrics.RecordFirstSeenLatency(st.ProviderSentTo, p.name, time.Since(st.SentAt))
log.Info("transaction first seen",
"hash", hash,
"firstSeenLatency", firstSeenLatency,
"provider_source", st.ProviderSentTo,
"provider_seen", p.name)
} }
if _, exist := st.SeenBy[p.name]; !exist { if _, exist := st.SeenBy[p.name]; !exist {
st.SeenBy[p.name] = time.Now() st.SeenBy[p.name] = time.Now()
metrics.RecordProviderToProviderLatency(st.ProviderSentTo, p.name, time.Since(st.SentAt))
} }
st.M.Unlock() st.M.Unlock()
// check if transaction have been seen by all providers
p.txPool.M.Lock() p.txPool.M.Lock()
// every provider has seen this transaction
if len(st.SeenBy) == p.txPool.Expected { if len(st.SeenBy) == p.txPool.Expected {
log.Debug("transaction seen by all", "hash", hash) log.Debug("transaction seen by all", "hash", hash, "expected", p.txPool.Expected, "seenBy", len(st.SeenBy))
delete(p.txPool.Transactions, hash) delete(p.txPool.Transactions, st.Hash.Hex())
} }
p.txPool.M.Unlock() p.txPool.M.Unlock()
} }
......
...@@ -39,7 +39,7 @@ func (p *Provider) Start(ctx context.Context) { ...@@ -39,7 +39,7 @@ func (p *Provider) Start(ctx context.Context) {
p.cancelFunc = cancelFunc p.cancelFunc = cancelFunc
schedule(providerCtx, time.Duration(p.config.ReadInterval), p.Heartbeat) schedule(providerCtx, time.Duration(p.config.ReadInterval), p.Heartbeat)
if !p.config.ReadOnly { if !p.config.ReadOnly {
schedule(providerCtx, time.Duration(p.config.SendInterval), p.Roundtrip) schedule(providerCtx, time.Duration(p.config.SendInterval), p.RoundTrip)
} }
} }
...@@ -49,6 +49,14 @@ func (p *Provider) Shutdown() { ...@@ -49,6 +49,14 @@ func (p *Provider) Shutdown() {
} }
} }
func (p *Provider) Name() string {
return p.name
}
func (p *Provider) URL() string {
return p.config.URL
}
func schedule(ctx context.Context, interval time.Duration, handler func(ctx context.Context)) { func schedule(ctx context.Context, interval time.Duration, handler func(ctx context.Context)) {
go func() { go func() {
for { for {
......
...@@ -2,80 +2,116 @@ package provider ...@@ -2,80 +2,116 @@ package provider
import ( import (
"context" "context"
"op-ufm/pkg/metrics"
iclients "op-ufm/pkg/metrics/clients"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/tls" "github.com/ethereum-optimism/optimism/op-service/tls"
signer "github.com/ethereum-optimism/optimism/op-signer/client"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// Roundtrip send a new transaction to measure round trip latency // RoundTrip send a new transaction to measure round trip latency
func (p *Provider) Roundtrip(ctx context.Context) { func (p *Provider) RoundTrip(ctx context.Context) {
log.Debug("roundtrip", "provider", p.name) log.Debug("roundtrip", "provider", p.name)
ethClient, err := p.dial(ctx) client, err := iclients.Dial(p.name, p.config.URL)
if err != nil { if err != nil {
log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err) log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err)
return
} }
nonce, err := p.nonce(ctx, ethClient) nonce, err := client.PendingNonceAt(ctx, p.walletConfig.Address)
if err != nil { if err != nil {
log.Error("cant get nounce", "provider", p.name, "err", err) log.Error("cant get nounce", "provider", p.name, "err", err)
return
} }
txHash := common.Hash{}
attempt := 0
startedAt := time.Now()
for {
tx := p.createTx(nonce) tx := p.createTx(nonce)
txHash = tx.Hash()
signedTx, err := p.sign(ctx, tx) signedTx, err := p.sign(ctx, tx)
if err != nil { if err != nil {
log.Error("cant sign tx", "tx", tx, "err", err) log.Error("cant sign tx", "provider", p.name, "tx", tx, "err", err)
return
} }
err = ethClient.SendTransaction(ctx, signedTx) txHash = signedTx.Hash()
err = client.SendTransaction(ctx, signedTx)
if err != nil { if err != nil {
if err.Error() == txpool.ErrAlreadyKnown.Error() || err.Error() == core.ErrNonceTooLow.Error() {
if time.Since(startedAt) >= time.Duration(p.config.SendTransactionRetryTimeout) {
log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), "elapsed", time.Since(startedAt), "attempt", attempt, "nonce", nonce)
metrics.RecordError(p.name, "ethclient.SendTransaction.nonce")
return
}
log.Warn("tx already known, incrementing nonce and trying again", "provider", p.name, "nonce", nonce)
time.Sleep(time.Duration(p.config.SendTransactionRetryInterval))
nonce++
attempt++
if attempt%10 == 0 {
log.Debug("retrying send transaction...", "provider", p.name, "attempt", attempt, "nonce", nonce, "elapsed", time.Since(startedAt))
}
} else {
log.Error("cant send transaction", "provider", p.name, "err", err) log.Error("cant send transaction", "provider", p.name, "err", err)
return
}
} else {
break
}
} }
txHash := signedTx.Hash()
log.Info("transaction sent", "hash", txHash.Hex()) log.Info("transaction sent", "provider", p.name, "hash", txHash.Hex(), "nonce", nonce)
// add to pool // add to pool
sentAt := time.Now() sentAt := time.Now()
p.txPool.M.Lock() p.txPool.M.Lock()
p.txPool.Transactions[txHash.Hex()] = &TransactionState{ p.txPool.Transactions[txHash.Hex()] = &TransactionState{
Hash: txHash, Hash: txHash,
ProviderSentTo: p.name,
SentAt: sentAt, SentAt: sentAt,
SeenBy: make(map[string]time.Time), SeenBy: make(map[string]time.Time),
} }
p.txPool.M.Unlock() p.txPool.M.Unlock()
var receipt *types.Receipt var receipt *types.Receipt
attempt := 0 attempt = 0
for receipt == nil { for receipt == nil {
if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) { if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) {
log.Error("receipt retrieval timed out", "provider", p.name, "hash", "elapsed", time.Since(sentAt)) log.Error("receipt retrieval timed out", "provider", p.name, "hash", "elapsed", time.Since(sentAt))
break return
} }
time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval)) time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval))
if attempt%10 == 0 { if attempt%10 == 0 {
log.Debug("checking for receipt...", "attempt", attempt, "elapsed", time.Since(sentAt)) log.Debug("checking for receipt...", "provider", p.name, "attempt", attempt, "elapsed", time.Since(sentAt))
} }
receipt, err = ethClient.TransactionReceipt(ctx, txHash) receipt, err = client.TransactionReceipt(ctx, txHash)
if err != nil && !errors.Is(err, ethereum.NotFound) { if err != nil && !errors.Is(err, ethereum.NotFound) {
log.Error("cant get receipt for transaction", "provider", p.name, "hash", txHash.Hex(), "err", err) log.Error("cant get receipt for transaction", "provider", p.name, "hash", txHash.Hex(), "err", err)
break return
} }
attempt++ attempt++
} }
roundtrip := time.Since(sentAt) roundtrip := time.Since(sentAt)
metrics.RecordRoundTripLatency(p.name, roundtrip)
metrics.RecordGasUsed(p.name, receipt.GasUsed)
log.Info("got transaction receipt", "hash", txHash.Hex(), log.Info("got transaction receipt", "hash", txHash.Hex(),
"roundtrip", roundtrip, "roundtrip", roundtrip,
"provider", p.name,
"blockNumber", receipt.BlockNumber, "blockNumber", receipt.BlockNumber,
"blockHash", receipt.BlockHash, "blockHash", receipt.BlockHash,
"gasUsed", receipt.GasUsed) "gasUsed", receipt.GasUsed)
...@@ -94,7 +130,6 @@ func (p *Provider) createTx(nonce uint64) *types.Transaction { ...@@ -94,7 +130,6 @@ func (p *Provider) createTx(nonce uint64) *types.Transaction {
Value: &p.walletConfig.TxValue, Value: &p.walletConfig.TxValue,
Data: data, Data: data,
}) })
// log.Debug("tx", "tx", tx)
return tx return tx
} }
...@@ -112,7 +147,7 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran ...@@ -112,7 +147,7 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran
TLSCert: p.signerConfig.TLSCert, TLSCert: p.signerConfig.TLSCert,
TLSKey: p.signerConfig.TLSKey, TLSKey: p.signerConfig.TLSKey,
} }
client, err := signer.NewSignerClient(log.Root(), p.signerConfig.URL, tlsConfig) client, err := iclients.NewSignerClient(p.name, log.Root(), p.signerConfig.URL, tlsConfig)
log.Debug("signerclient", "client", client, "err", err) log.Debug("signerclient", "client", client, "err", err)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -132,8 +167,3 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran ...@@ -132,8 +167,3 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran
return nil, errors.New("invalid signer method") return nil, errors.New("invalid signer method")
} }
} }
func (p *Provider) nonce(ctx context.Context, client *ethclient.Client) (uint64, error) {
fromAddress := common.HexToAddress(p.walletConfig.Address)
return client.PendingNonceAt(ctx, fromAddress)
}
...@@ -25,6 +25,7 @@ type TransactionState struct { ...@@ -25,6 +25,7 @@ type TransactionState struct {
M sync.Mutex M sync.Mutex
SentAt time.Time SentAt time.Time
ProviderSentTo string
FirstSeen time.Time FirstSeen time.Time
......
...@@ -5,9 +5,6 @@ import ( ...@@ -5,9 +5,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/ethereum/go-ethereum/log"
"github.com/pkg/errors"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/rs/cors" "github.com/rs/cors"
) )
...@@ -17,8 +14,7 @@ type Healthz struct { ...@@ -17,8 +14,7 @@ type Healthz struct {
server *http.Server server *http.Server
} }
func (h *Healthz) Start(ctx context.Context, host string, port int) { func (h *Healthz) Start(ctx context.Context, host string, port int) error {
go func() {
hdlr := mux.NewRouter() hdlr := mux.NewRouter()
hdlr.HandleFunc("/healthz", h.Handle).Methods("GET") hdlr.HandleFunc("/healthz", h.Handle).Methods("GET")
addr := fmt.Sprintf("%s:%d", host, port) addr := fmt.Sprintf("%s:%d", host, port)
...@@ -31,11 +27,7 @@ func (h *Healthz) Start(ctx context.Context, host string, port int) { ...@@ -31,11 +27,7 @@ func (h *Healthz) Start(ctx context.Context, host string, port int) {
} }
h.server = server h.server = server
h.ctx = ctx h.ctx = ctx
err := h.server.ListenAndServe() return h.server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Crit("error starting healthz server", "err", err)
}
}()
} }
func (h *Healthz) Shutdown() error { func (h *Healthz) Shutdown() error {
......
...@@ -2,10 +2,13 @@ package service ...@@ -2,10 +2,13 @@ package service
import ( import (
"context" "context"
"fmt"
"net/http"
"op-ufm/pkg/config" "op-ufm/pkg/config"
"op-ufm/pkg/provider" "op-ufm/pkg/provider"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
type Service struct { type Service struct {
...@@ -26,8 +29,23 @@ func New(cfg *config.Config) *Service { ...@@ -26,8 +29,23 @@ func New(cfg *config.Config) *Service {
func (s *Service) Start(ctx context.Context) { func (s *Service) Start(ctx context.Context) {
log.Info("service starting") log.Info("service starting")
if s.Config.Healthz.Enabled { if s.Config.Healthz.Enabled {
s.Healthz.Start(ctx, s.Config.Healthz.Host, s.Config.Healthz.Port) addr := fmt.Sprintf("%s:%d", s.Config.Healthz.Host, s.Config.Healthz.Port)
log.Info("healthz started") log.Info("starting healthz server", "addr", addr)
go func() {
if err := s.Healthz.Start(ctx, s.Config.Healthz.Host, s.Config.Healthz.Port); err != nil {
log.Error("error starting healthz server", "err", err)
}
}()
}
if s.Config.Metrics.Enabled {
addr := fmt.Sprintf("%s:%d", s.Config.Metrics.Host, s.Config.Metrics.Port)
log.Info("starting metrics server", "addr", addr)
go func() {
if err := http.ListenAndServe(addr, promhttp.Handler()); err != nil {
log.Error("error starting metrics server", "err", err)
}
}()
} }
// map networks to its providers // map networks to its providers
...@@ -46,7 +64,9 @@ func (s *Service) Start(ctx context.Context) { ...@@ -46,7 +64,9 @@ func (s *Service) Start(ctx context.Context) {
} }
(*txpool)[name] = &provider.NetworkTransactionPool{} (*txpool)[name] = &provider.NetworkTransactionPool{}
(*txpool)[name].Transactions = make(map[string]*provider.TransactionState) (*txpool)[name].Transactions = make(map[string]*provider.TransactionState)
(*txpool)[name].Expected = len(providers) // set expected number of providers for this network
// -1 since we don't wait for acking from the same provider
(*txpool)[name].Expected = len(providers) - 1
} }
for name, providerConfig := range s.Config.Providers { for name, providerConfig := range s.Config.Providers {
...@@ -62,6 +82,7 @@ func (s *Service) Start(ctx context.Context) { ...@@ -62,6 +82,7 @@ func (s *Service) Start(ctx context.Context) {
s.Providers[name].Start(ctx) s.Providers[name].Start(ctx)
log.Info("provider started", "provider", name) log.Info("provider started", "provider", name)
} }
log.Info("service started") log.Info("service started")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment