Commit 1b43cbd8 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6272 from ethereum-optimism/felipe/ufm

feat(ufm): user facing monitoring service
parents a88cd430 4e2e48ba
bin
tls
config.toml
FROM golang:1.20.4-alpine3.18 as builder
ARG GITCOMMIT=docker
ARG GITDATE=docker
ARG GITVERSION=docker
RUN apk add make jq git gcc musl-dev linux-headers
COPY ./op-ufm /app
WORKDIR /app
RUN make ufm
FROM alpine:3.18
COPY --from=builder /app/entrypoint.sh /bin/entrypoint.sh
COPY --from=builder /app/bin/ufm /bin/ufm
RUN apk update && \
apk add ca-certificates && \
chmod +x /bin/entrypoint.sh
VOLUME /etc/ufm
EXPOSE 8080
ENTRYPOINT ["/bin/entrypoint.sh"]
CMD ["/bin/ufm", "/etc/ufm/config.toml"]
LDFLAGSSTRING +=-X main.GitCommit=$(GITCOMMIT)
LDFLAGSSTRING +=-X main.GitDate=$(GITDATE)
LDFLAGSSTRING +=-X main.GitVersion=$(GITVERSION)
LDFLAGS := -ldflags "$(LDFLAGSSTRING)"
ufm:
go build -v $(LDFLAGS) -o ./bin/ufm ./cmd/ufm
.PHONY: ufm
fmt:
go mod tidy
gofmt -w .
.PHONY: fmt
test:
go test -race -v ./...
.PHONY: test
lint:
go vet ./...
.PHONY: test
# OP User Facing Monitoring
This project simulates a synthetic user interacting with a OP Stack chain.
It is intended to be used as a tool for monitoring
the health of the network by measuring end-to-end transaction latency.
## Metrics
* Round-trip duration time to get transaction receipt (from creation timestamp)
* First-seen duration time (from creation timestamp)
## Usage
Run `make ufm` to build the binary. No additional dependencies are necessary.
Copy `example.config.toml` to `config.toml` and edit the file to configure the service.
Start the service with `ufm config.toml`.
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/config"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/service"
"github.com/ethereum/go-ethereum/log"
)
var (
GitVersion = ""
GitCommit = ""
GitDate = ""
)
func main() {
log.Root().SetHandler(
log.LvlFilterHandler(
log.LvlInfo,
log.StreamHandler(os.Stdout, log.JSONFormat()),
),
)
log.Info("initializing", "version", GitVersion, "commit", GitCommit, "date", GitDate)
if len(os.Args) < 2 {
log.Crit("must specify a config file on the command line")
}
cfg := initConfig(os.Args[1])
ctx := context.Background()
svc := service.New(cfg)
svc.Start(ctx)
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
recvSig := <-sig
log.Info("caught signal, shutting down", "signal", recvSig)
svc.Shutdown()
}
func initConfig(cfgFile string) *config.Config {
cfg, err := config.New(cfgFile)
if err != nil {
log.Crit("error reading config file", "file", cfgFile, "err", err)
}
// update log level from config
logLevel, err := log.LvlFromString(cfg.LogLevel)
if err != nil {
logLevel = log.LvlInfo
if cfg.LogLevel != "" {
log.Warn("invalid server.log_level set: " + cfg.LogLevel)
}
}
log.Root().SetHandler(
log.LvlFilterHandler(
logLevel,
log.StreamHandler(os.Stdout, log.JSONFormat()),
),
)
// readable parsed config
jsonCfg, _ := json.MarshalIndent(cfg, "", " ")
fmt.Printf("%s", string(jsonCfg))
err = cfg.Validate()
if err != nil {
log.Crit("invalid config", "err", err)
}
return cfg
}
#!/bin/sh
echo "Updating CA certificates."
update-ca-certificates
echo "Running CMD."
exec "$@"
\ No newline at end of file
# Log level.
# Possible values: trace | debug | info | warn | error | crit
# Default: debug
log_level = "debug"
[signer_service]
# URL to the signer service
url = "http://localhost:1234"
tls_ca_cert = "tls/ca.crt"
tls_cert = "tls/tls.crt"
tls_key = "tls/tls.key"
[healthz]
# Whether or not to enable healthz endpoint
enabled = true
# Host for the healthz endpoint to listen on
host = "0.0.0.0"
# Port for the above.
port = "8080"
[metrics]
# Whether or not to enable Prometheus metrics
enabled = true
# Host for the Prometheus metrics endpoint to listen on.
host = "0.0.0.0"
# Port for the above.
port = "9761"
[wallets.default]
# OP Stack Chain ID
# see https://community.optimism.io/docs/useful-tools/networks/
chain_id = 420
# Signer method to use
# Possible values: signer | static
signer_method = "static"
# Address used to send transactions
address = "0x0000000000000000000000000000000000000000"
# For static signer method, the private key to use
private_key = "0000000000000000000000000000000000000000000000000000000000000000"
# Transaction value in wei
tx_value = 100000000000000
# Gas limit
gas_limit = 21000
# Gas tip cap
gas_tip_cap = 2000000000
# Fee cap
gas_fee_cap = 20000000000
[providers.p1]
# URL to the RPC provider
url = "http://localhost:8551"
# Read only providers are only used to check for transactions
read_only = true
# Interval to poll the provider for expected transactions
read_interval = "1s"
# Interval to submit new transactions to the provider
send_interval = "5s"
# Wallet to be used for sending transactions
wallet = "default"
# Network to pool transactions, i.e. providers in the same network will check transactions from each other
network = "op-goerli"
# Interval between receipt retrieval
receipt_retrieval_interval = "500ms"
# Max time to check for receipt
receipt_retrieval_timeout = "2m"
[providers.p2]
# Uncomment to disable this provider
# disabled=true
# URL to the RPC provider
url = "http://localhost:8552"
# Read only providers are only used to check for transactions
read_only = false
# Interval to poll the provider for expected transactions
read_interval = "2s"
# Interval to submit new transactions to the provider
send_interval = "3s"
# Wallet to be used for sending transactions
wallet = "default"
# Network to pool transactions, i.e. providers in the same network will check transactions from each other
network = "op-goerli"
# Interval between receipt retrieval
receipt_retrieval_interval = "500ms"
# Max time to check for receipt
receipt_retrieval_timeout = "2m"
module github.com/ethereum-optimism/optimism/op-ufm
go 1.20
require (
github.com/BurntSushi/toml v1.3.2
github.com/ethereum-optimism/optimism/op-service v0.10.14
github.com/ethereum-optimism/optimism/op-signer v0.1.1
github.com/ethereum/go-ethereum v1.12.0
github.com/gorilla/mux v1.8.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/rs/cors v1.9.0
)
require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/VictoriaMetrics/fastcache v1.10.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rivo/uniseg v0.3.4 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.5.0 // indirect
github.com/urfave/cli v1.22.9 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
)
This diff is collapsed.
package config
import (
"math/big"
"github.com/BurntSushi/toml"
"github.com/pkg/errors"
)
type Config struct {
LogLevel string `toml:"log_level"`
Signer SignerServiceConfig `toml:"signer_service"`
Metrics MetricsConfig `toml:"metrics"`
Healthz HealthzConfig `toml:"healthz"`
Wallets map[string]*WalletConfig `toml:"wallets"`
Providers map[string]*ProviderConfig `toml:"providers"`
}
type SignerServiceConfig struct {
URL string `toml:"url"`
TLSCaCert string `toml:"tls_ca_cert"`
TLSCert string `toml:"tls_cert"`
TLSKey string `toml:"tls_key"`
}
type MetricsConfig struct {
Enabled bool `toml:"enabled"`
Debug bool `toml:"debug"`
Host string `toml:"host"`
Port string `toml:"port"`
}
type HealthzConfig struct {
Enabled bool `toml:"enabled"`
Host string `toml:"host"`
Port string `toml:"port"`
}
type WalletConfig struct {
ChainID big.Int `toml:"chain_id"`
// signer | static
SignerMethod string `toml:"signer_method"`
Address string `toml:"address"`
// private key is used for static signing
PrivateKey string `toml:"private_key"`
// transaction parameters
TxValue big.Int `toml:"tx_value"`
GasLimit uint64 `toml:"gas_limit"`
GasTipCap big.Int `toml:"gas_tip_cap"`
GasFeeCap big.Int `toml:"gas_fee_cap"`
}
type ProviderConfig struct {
Network string `toml:"network"`
URL string `toml:"url"`
ReadOnly bool `toml:"read_only"`
ReadInterval TOMLDuration `toml:"read_interval"`
SendInterval TOMLDuration `toml:"send_interval"`
SendTransactionRetryInterval TOMLDuration `toml:"send_transaction_retry_interval"`
SendTransactionRetryTimeout TOMLDuration `toml:"send_transaction_retry_timeout"`
ReceiptRetrievalInterval TOMLDuration `toml:"receipt_retrieval_interval"`
ReceiptRetrievalTimeout TOMLDuration `toml:"receipt_retrieval_timeout"`
Wallet string `toml:"wallet"`
}
func New(file string) (*Config, error) {
cfg := &Config{}
if _, err := toml.DecodeFile(file, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (c *Config) Validate() error {
if c.Metrics.Enabled {
if c.Metrics.Host == "" || c.Metrics.Port == "" {
return errors.New("metrics is enabled but host or port are missing")
}
}
if c.Healthz.Enabled {
if c.Healthz.Host == "" || c.Healthz.Port == "" {
return errors.New("healthz is enabled but host or port are missing")
}
}
if len(c.Wallets) == 0 {
return errors.New("at least one wallet must be set")
}
if len(c.Providers) == 0 {
return errors.New("at least one provider must be set")
}
for name, wallet := range c.Wallets {
if wallet.ChainID.BitLen() == 0 {
return errors.Errorf("wallet [%s] chain_id is missing", name)
}
if wallet.SignerMethod != "signer" && wallet.SignerMethod != "static" {
return errors.Errorf("wallet [%s] signer_method is invalid", name)
}
if wallet.SignerMethod == "signer" {
if c.Signer.URL == "" {
return errors.New("signer url is missing")
}
if c.Signer.TLSCaCert == "" {
return errors.New("signer tls_ca_cert is missing")
}
if c.Signer.TLSCert == "" {
return errors.New("signer tls_cert is missing")
}
if c.Signer.TLSKey == "" {
return errors.New("signer tls_key is missing")
}
}
if wallet.SignerMethod == "static" {
if wallet.PrivateKey == "" {
return errors.Errorf("wallet [%s] private_key is missing", name)
}
}
if wallet.Address == "" {
return errors.Errorf("wallet [%s] address is missing", name)
}
if wallet.TxValue.BitLen() == 0 {
return errors.Errorf("wallet [%s] tx_value is missing", name)
}
if wallet.GasLimit == 0 {
return errors.Errorf("wallet [%s] gas_limit is missing", name)
}
if wallet.GasFeeCap.BitLen() == 0 {
return errors.Errorf("wallet [%s] gas_fee_cap is missing", name)
}
}
for name, provider := range c.Providers {
if provider.URL == "" {
return errors.Errorf("provider [%s] url is missing", name)
}
if provider.ReadInterval == 0 {
return errors.Errorf("provider [%s] read_interval is missing", name)
}
if provider.SendInterval == 0 {
return errors.Errorf("provider [%s] send_interval is missing", name)
}
if provider.SendTransactionRetryInterval == 0 {
return errors.Errorf("provider [%s] send_transaction_retry_interval is missing", name)
}
if provider.SendTransactionRetryTimeout == 0 {
return errors.Errorf("provider [%s] send_transaction_retry_timeout is missing", name)
}
if provider.ReceiptRetrievalInterval == 0 {
return errors.Errorf("provider [%s] receipt_retrieval_interval is missing", name)
}
if provider.ReceiptRetrievalTimeout == 0 {
return errors.Errorf("provider [%s] receipt_retrieval_timeout is missing", name)
}
if provider.Wallet == "" {
return errors.Errorf("provider [%s] wallet is missing", name)
}
if _, ok := c.Wallets[provider.Wallet]; !ok {
return errors.Errorf("provider [%s] has an invalid wallet [%s]", name, provider.Wallet)
}
}
if c.LogLevel == "" {
c.LogLevel = "debug"
}
return nil
}
package config
import "time"
type TOMLDuration time.Duration
func (t *TOMLDuration) UnmarshalText(b []byte) error {
d, err := time.ParseDuration(string(b))
if err != nil {
return err
}
*t = TOMLDuration(d)
return nil
}
package clients
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
type InstrumentedEthClient struct {
c *ethclient.Client
providerName string
}
func Dial(providerName string, url string) (*InstrumentedEthClient, error) {
start := time.Now()
c, err := ethclient.Dial(url)
if err != nil {
metrics.RecordError(providerName, "ethclient.Dial")
return nil, err
}
metrics.RecordRPCLatency(providerName, "ethclient", "Dial", time.Since(start))
return &InstrumentedEthClient{c: c, providerName: providerName}, nil
}
func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) {
start := time.Now()
log.Debug(">> TransactionByHash", "hash", hash, "provider", i.providerName)
tx, isPending, err := i.c.TransactionByHash(ctx, hash)
log.Debug("<< TransactionByHash", "tx", tx, "isPending", isPending, "err", err, "hash", hash, "provider", i.providerName)
if err != nil {
if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionByHash")
}
return nil, false, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "TransactionByHash", time.Since(start))
return tx, isPending, err
}
func (i *InstrumentedEthClient) PendingNonceAt(ctx context.Context, address string) (uint64, error) {
start := time.Now()
nonce, err := i.c.PendingNonceAt(ctx, common.HexToAddress(address))
if err != nil {
metrics.RecordError(i.providerName, "ethclient.PendingNonceAt")
return 0, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "PendingNonceAt", time.Since(start))
return nonce, err
}
func (i *InstrumentedEthClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
start := time.Now()
receipt, err := i.c.TransactionReceipt(ctx, txHash)
if err != nil {
if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionReceipt")
}
return nil, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "TransactionReceipt", time.Since(start))
return receipt, err
}
func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.Transaction) error {
start := time.Now()
err := i.c.SendTransaction(ctx, tx)
if err != nil {
if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.SendTransaction")
}
return err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "SendTransaction", time.Since(start))
return err
}
func (i *InstrumentedEthClient) ignorableErrors(err error) bool {
msg := err.Error()
// we dont use errors.Is because eth client actually uses errors.New,
// therefore creating an incomparable instance :(
return msg == ethereum.NotFound.Error() ||
msg == txpool.ErrAlreadyKnown.Error() ||
msg == core.ErrNonceTooLow.Error()
}
package clients
import (
"context"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
optls "github.com/ethereum-optimism/optimism/op-service/tls"
signer "github.com/ethereum-optimism/optimism/op-signer/client"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type InstrumentedSignerClient struct {
c *signer.SignerClient
providerName string
}
func NewSignerClient(providerName string, logger log.Logger, endpoint string, tlsConfig optls.CLIConfig) (*InstrumentedSignerClient, error) {
start := time.Now()
c, err := signer.NewSignerClient(logger, endpoint, tlsConfig)
if err != nil {
metrics.RecordError(providerName, "signer.NewSignerClient")
return nil, err
}
metrics.RecordRPCLatency(providerName, "signer", "NewSignerClient", time.Since(start))
return &InstrumentedSignerClient{c: c, providerName: providerName}, nil
}
func (i *InstrumentedSignerClient) SignTransaction(ctx context.Context, chainId *big.Int, tx *types.Transaction) (*types.Transaction, error) {
start := time.Now()
tx, err := i.c.SignTransaction(ctx, chainId, tx)
if err != nil {
metrics.RecordError(i.providerName, "signer.SignTransaction")
return nil, err
}
metrics.RecordRPCLatency(i.providerName, "signer", "SignTransaction", time.Since(start))
return tx, err
}
package metrics
import (
fmt "fmt"
"regexp"
"strings"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
MetricsNamespace = "ufm"
)
var (
Debug bool
errorsTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "errors_total",
Help: "Count of errors",
}, []string{
"provider",
"error",
})
rpcLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "rpc_latency",
Help: "RPC latency per provider, client and method (ms)",
}, []string{
"provider",
"client",
"method",
})
roundTripLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "roundtrip_latency",
Help: "Round trip latency per provider (ms)",
}, []string{
"provider",
})
gasUsed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "gas_used",
Help: "Gas used per provider",
}, []string{
"provider",
})
firstSeenLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "first_seen_latency",
Help: "First seen latency latency per provider (ms)",
}, []string{
"provider_source",
"provider_seen",
})
providerToProviderLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "provider_to_provider_latency",
Help: "Provider to provider latency (ms)",
}, []string{
"provider_source",
"provider_seen",
})
networkTransactionsInFlight = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "transactions_inflight",
Help: "Transactions in flight, per network",
}, []string{
"network",
})
)
var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z ]+`)
func RecordError(provider string, errorLabel string) {
if Debug {
log.Debug("metric inc", "m", "errors_total",
"provider", provider, "error", errorLabel)
}
errorsTotal.WithLabelValues(provider, errorLabel).Inc()
}
// RecordErrorDetails concats the error message to the label removing non-alpha chars
func RecordErrorDetails(provider string, label string, err error) {
errClean := nonAlphanumericRegex.ReplaceAllString(err.Error(), "")
errClean = strings.ReplaceAll(errClean, " ", "_")
label = fmt.Sprintf("%s.%s", label)
RecordError(provider, label)
}
func RecordRPCLatency(provider string, client string, method string, latency time.Duration) {
if Debug {
log.Debug("metric set", "m", "rpc_latency",
"provider", provider, "client", client, "method", method, "latency", latency)
}
rpcLatency.WithLabelValues(provider, client, method).Set(float64(latency.Milliseconds()))
}
func RecordRoundTripLatency(provider string, latency time.Duration) {
if Debug {
log.Debug("metric set", "m", "roundtrip_latency",
"provider", provider, "latency", latency)
}
roundTripLatency.WithLabelValues(provider).Set(float64(latency.Milliseconds()))
}
func RecordGasUsed(provider string, val uint64) {
if Debug {
log.Debug("metric add", "m", "gas_used",
"provider", provider, "val", val)
}
gasUsed.WithLabelValues(provider).Set(float64(val))
}
func RecordFirstSeenLatency(providerSource string, providerSeen string, latency time.Duration) {
if Debug {
log.Debug("metric set", "m", "first_seen_latency",
"provider_source", providerSource, "provider_seen", providerSeen, "latency", latency)
}
firstSeenLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds()))
}
func RecordProviderToProviderLatency(providerSource string, providerSeen string, latency time.Duration) {
if Debug {
log.Debug("metric set", "m", "provider_to_provider_latency",
"provider_source", providerSource, "provider_seen", providerSeen, "latency", latency)
}
providerToProviderLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds()))
}
func RecordTransactionsInFlight(network string, count int) {
if Debug {
log.Debug("metric set", "m", "transactions_inflight",
"network", network, "count", count)
}
networkTransactionsInFlight.WithLabelValues(network).Set(float64(count))
}
package provider
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics/clients"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/pkg/errors"
)
// Heartbeat polls for expected in-flight transactions
func (p *Provider) Heartbeat(ctx context.Context) {
log.Debug("heartbeat", "provider", p.name, "inflight", len(p.txPool.Transactions))
metrics.RecordTransactionsInFlight(p.config.Network, len(p.txPool.Transactions))
// let's exclude transactions already seen by this provider, or originated by it
expectedTransactions := make([]*TransactionState, 0, len(p.txPool.Transactions))
alreadySeen := 0
for _, st := range p.txPool.Transactions {
if st.ProviderSource == p.name {
continue
}
if _, exist := st.SeenBy[p.name]; exist {
alreadySeen++
continue
}
expectedTransactions = append(expectedTransactions, st)
}
if len(expectedTransactions) == 0 {
log.Debug("no expected txs", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen)
return
}
client, err := clients.Dial(p.name, p.config.URL)
if err != nil {
log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err)
}
log.Debug("checking in-flight tx", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen)
for _, st := range expectedTransactions {
hash := st.Hash.Hex()
_, isPending, err := client.TransactionByHash(ctx, st.Hash)
if err != nil && !errors.Is(err, ethereum.NotFound) {
log.Error("cant check transaction", "provider", p.name, "hash", hash, "url", p.config.URL, "err", err)
continue
}
log.Debug("got transaction", "provider", p.name, "hash", hash, "isPending", isPending)
// mark transaction as seen by this provider
st.M.Lock()
latency := time.Since(st.SentAt)
if st.FirstSeen.IsZero() {
st.FirstSeen = time.Now()
metrics.RecordFirstSeenLatency(st.ProviderSource, p.name, latency)
log.Info("transaction first seen",
"hash", hash,
"firstSeenLatency", latency,
"providerSource", st.ProviderSource,
"providerSeen", p.name)
}
if _, exist := st.SeenBy[p.name]; !exist {
st.SeenBy[p.name] = time.Now()
metrics.RecordProviderToProviderLatency(st.ProviderSource, p.name, latency)
}
st.M.Unlock()
// check if transaction have been seen by all providers
p.txPool.M.Lock()
if len(st.SeenBy) == p.txPool.Expected {
log.Debug("transaction seen by all", "hash", hash, "expected", p.txPool.Expected, "seenBy", len(st.SeenBy))
delete(p.txPool.Transactions, st.Hash.Hex())
}
p.txPool.M.Unlock()
}
}
package provider
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/config"
)
type Provider struct {
name string
config *config.ProviderConfig
signerConfig *config.SignerServiceConfig
walletConfig *config.WalletConfig
txPool *NetworkTransactionPool
cancelFunc context.CancelFunc
}
func New(name string, cfg *config.ProviderConfig,
signerConfig *config.SignerServiceConfig,
walletConfig *config.WalletConfig,
txPool *NetworkTransactionPool) *Provider {
p := &Provider{
name: name,
config: cfg,
signerConfig: signerConfig,
walletConfig: walletConfig,
txPool: txPool,
}
return p
}
func (p *Provider) Start(ctx context.Context) {
providerCtx, cancelFunc := context.WithCancel(ctx)
p.cancelFunc = cancelFunc
schedule(providerCtx, time.Duration(p.config.ReadInterval), p.Heartbeat)
if !p.config.ReadOnly {
schedule(providerCtx, time.Duration(p.config.SendInterval), p.RoundTrip)
}
}
func (p *Provider) Shutdown() {
if p.cancelFunc != nil {
p.cancelFunc()
}
}
func (p *Provider) Name() string {
return p.name
}
func (p *Provider) URL() string {
return p.config.URL
}
func schedule(ctx context.Context, interval time.Duration, handler func(ctx context.Context)) {
go func() {
for {
timer := time.NewTimer(interval)
handler(ctx)
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return
}
}
}()
}
package provider
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
iclients "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics/clients"
"github.com/ethereum-optimism/optimism/op-service/tls"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// RoundTrip send a new transaction to measure round trip latency
func (p *Provider) RoundTrip(ctx context.Context) {
log.Debug("roundTripLatency", "provider", p.name)
client, err := iclients.Dial(p.name, p.config.URL)
if err != nil {
log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err)
return
}
nonce, err := client.PendingNonceAt(ctx, p.walletConfig.Address)
if err != nil {
log.Error("cant get nounce", "provider", p.name, "err", err)
return
}
txHash := common.Hash{}
attempt := 0
// used for timeout
firstAttemptAt := time.Now()
// used for actual round trip time (disregard retry time)
roundTripStartedAt := time.Now()
for {
tx := p.createTx(nonce)
txHash = tx.Hash()
signedTx, err := p.sign(ctx, tx)
if err != nil {
log.Error("cant sign tx", "provider", p.name, "tx", tx, "err", err)
return
}
txHash = signedTx.Hash()
roundTripStartedAt = time.Now()
err = client.SendTransaction(ctx, signedTx)
if err != nil {
if err.Error() == txpool.ErrAlreadyKnown.Error() || err.Error() == core.ErrNonceTooLow.Error() {
if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) {
log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), "elapsed", time.Since(firstAttemptAt), "attempt", attempt, "nonce", nonce)
metrics.RecordError(p.name, "ethclient.SendTransaction.nonce")
return
}
log.Warn("tx already known, incrementing nonce and trying again", "provider", p.name, "nonce", nonce)
time.Sleep(time.Duration(p.config.SendTransactionRetryInterval))
nonce++
attempt++
if attempt%10 == 0 {
log.Debug("retrying send transaction...", "provider", p.name, "attempt", attempt, "nonce", nonce, "elapsed", time.Since(firstAttemptAt))
}
} else {
log.Error("cant send transaction", "provider", p.name, "err", err)
metrics.RecordErrorDetails(p.name, "ethclient.SendTransaction", err)
return
}
} else {
break
}
}
log.Info("transaction sent", "provider", p.name, "hash", txHash.Hex(), "nonce", nonce)
// add to pool
sentAt := time.Now()
p.txPool.M.Lock()
p.txPool.Transactions[txHash.Hex()] = &TransactionState{
Hash: txHash,
ProviderSource: p.name,
SentAt: sentAt,
SeenBy: make(map[string]time.Time),
}
p.txPool.M.Unlock()
var receipt *types.Receipt
attempt = 0
for receipt == nil {
if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) {
log.Error("receipt retrieval timed out", "provider", p.name, "hash", "elapsed", time.Since(sentAt))
return
}
time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval))
if attempt%10 == 0 {
log.Debug("checking for receipt...", "provider", p.name, "attempt", attempt, "elapsed", time.Since(sentAt))
}
receipt, err = client.TransactionReceipt(ctx, txHash)
if err != nil && !errors.Is(err, ethereum.NotFound) {
log.Error("cant get receipt for transaction", "provider", p.name, "hash", txHash.Hex(), "err", err)
return
}
attempt++
}
roundTripLatency := time.Since(roundTripStartedAt)
metrics.RecordRoundTripLatency(p.name, roundTripLatency)
metrics.RecordGasUsed(p.name, receipt.GasUsed)
log.Info("got transaction receipt", "hash", txHash.Hex(),
"roundTripLatency", roundTripLatency,
"provider", p.name,
"blockNumber", receipt.BlockNumber,
"blockHash", receipt.BlockHash,
"gasUsed", receipt.GasUsed)
}
func (p *Provider) createTx(nonce uint64) *types.Transaction {
toAddress := common.HexToAddress(p.walletConfig.Address)
var data []byte
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: &p.walletConfig.ChainID,
Nonce: nonce,
GasFeeCap: &p.walletConfig.GasFeeCap,
GasTipCap: &p.walletConfig.GasTipCap,
Gas: p.walletConfig.GasLimit,
To: &toAddress,
Value: &p.walletConfig.TxValue,
Data: data,
})
return tx
}
func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
if p.walletConfig.SignerMethod == "static" {
log.Debug("using static signer")
privateKey, err := crypto.HexToECDSA(p.walletConfig.PrivateKey)
if err != nil {
return nil, err
}
return types.SignTx(tx, types.LatestSignerForChainID(&p.walletConfig.ChainID), privateKey)
} else if p.walletConfig.SignerMethod == "signer" {
tlsConfig := tls.CLIConfig{
TLSCaCert: p.signerConfig.TLSCaCert,
TLSCert: p.signerConfig.TLSCert,
TLSKey: p.signerConfig.TLSKey,
}
client, err := iclients.NewSignerClient(p.name, log.Root(), p.signerConfig.URL, tlsConfig)
log.Debug("signerclient", "client", client, "err", err)
if err != nil {
return nil, err
}
if client == nil {
return nil, errors.New("could not initialize signer client")
}
signedTx, err := client.SignTransaction(ctx, &p.walletConfig.ChainID, tx)
if err != nil {
return nil, err
}
return signedTx, nil
} else {
return nil, errors.New("invalid signer method")
}
}
package provider
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
)
// TransactionPool is used locally to share transactions between providers under the same pool
type TransactionPool map[string]*NetworkTransactionPool
// NetworkTransactionPool is used locally to share transactions between providers under the same network
type NetworkTransactionPool struct {
M sync.Mutex
Transactions map[string]*TransactionState
Expected int
}
type TransactionState struct {
// Transaction hash
Hash common.Hash
// Mutex
M sync.Mutex
SentAt time.Time
ProviderSource string
FirstSeen time.Time
// Map of providers that have seen this transaction, and when
// Once all providers have seen the transaction it is removed from the pool
SeenBy map[string]time.Time
}
package service
import (
"context"
"net/http"
"github.com/gorilla/mux"
"github.com/rs/cors"
)
type HealthzServer struct {
ctx context.Context
server *http.Server
}
func (h *HealthzServer) Start(ctx context.Context, addr string) error {
hdlr := mux.NewRouter()
hdlr.HandleFunc("/healthz", h.Handle).Methods("GET")
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
})
server := &http.Server{
Handler: c.Handler(hdlr),
Addr: addr,
}
h.server = server
h.ctx = ctx
return h.server.ListenAndServe()
}
func (h *HealthzServer) Shutdown() error {
return h.server.Shutdown(h.ctx)
}
func (h *HealthzServer) Handle(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}
package service
import (
"context"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type MetricsServer struct {
ctx context.Context
server *http.Server
}
func (m *MetricsServer) Start(ctx context.Context, addr string) error {
server := &http.Server{
Handler: promhttp.Handler(),
Addr: addr,
}
m.server = server
m.ctx = ctx
return m.server.ListenAndServe()
}
func (m *MetricsServer) Shutdown() error {
return m.server.Shutdown(m.ctx)
}
package service
import (
"context"
"net"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/config"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/provider"
"github.com/ethereum/go-ethereum/log"
)
type Service struct {
Config *config.Config
Healthz *HealthzServer
Metrics *MetricsServer
Providers map[string]*provider.Provider
}
func New(cfg *config.Config) *Service {
s := &Service{
Config: cfg,
Healthz: &HealthzServer{},
Metrics: &MetricsServer{},
Providers: make(map[string]*provider.Provider, len(cfg.Providers)),
}
return s
}
func (s *Service) Start(ctx context.Context) {
log.Info("service starting")
if s.Config.Healthz.Enabled {
addr := net.JoinHostPort(s.Config.Healthz.Host, s.Config.Healthz.Port)
log.Info("starting healthz server", "addr", addr)
go func() {
if err := s.Healthz.Start(ctx, addr); err != nil {
log.Error("error starting healthz server", "err", err)
}
}()
}
metrics.Debug = s.Config.Metrics.Debug
if s.Config.Metrics.Enabled {
addr := net.JoinHostPort(s.Config.Metrics.Host, s.Config.Metrics.Port)
log.Info("starting metrics server", "addr", addr)
go func() {
if err := s.Metrics.Start(ctx, addr); err != nil {
log.Error("error starting metrics server", "err", err)
}
}()
}
// map networks to its providers
networks := make(map[string][]string)
for name, providerConfig := range s.Config.Providers {
networks[providerConfig.Network] = append(networks[providerConfig.Network], name)
}
txpool := &provider.TransactionPool{}
for name, providers := range networks {
if len(providers) == 1 {
log.Warn("can't measure first seen for network, please another provider", "network", name)
}
(*txpool)[name] = &provider.NetworkTransactionPool{}
(*txpool)[name].Transactions = make(map[string]*provider.TransactionState)
// set expected number of providers for this network
// -1 since we don't wait for acking from the same provider
(*txpool)[name].Expected = len(providers) - 1
}
for name, providerConfig := range s.Config.Providers {
s.Providers[name] = provider.New(name,
providerConfig,
&s.Config.Signer,
s.Config.Wallets[providerConfig.Wallet],
(*txpool)[providerConfig.Network])
s.Providers[name].Start(ctx)
log.Info("provider started", "provider", name)
}
log.Info("service started")
}
func (s *Service) Shutdown() {
log.Info("service shutting down")
if s.Config.Healthz.Enabled {
s.Healthz.Shutdown()
log.Info("healthz stopped")
}
if s.Config.Metrics.Enabled {
s.Metrics.Shutdown()
log.Info("metrics stopped")
}
for name, provider := range s.Providers {
provider.Shutdown()
log.Info("provider stopped", "provider", name)
}
log.Info("service stopped")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment