Commit 48aa3d14 authored by Zach Howard's avatar Zach Howard Committed by GitHub

ENG-2835 websocket monitoring daemon (endpoint-monitor) (#3581)

parent 14591ed8
export ENDPOINT_MONITOR_LOG_LEVEL=debug
export ENDPOINT_MONITOR_PROVIDERS=goerli,mainnet
export ENDPOINT_MONITOR_GOERLI_URL=wss://ws-goerli.optimism.io
export ENDPOINT_MONITOR_MAINNET_URL=wss://ws-mainnet.optimism.io
FROM golang:1.18.0-alpine3.15 as builder
COPY ./endpoint-monitor /app
WORKDIR /app
RUN apk --no-cache add make jq bash git alpine-sdk
RUN make build
FROM alpine:3.15
RUN apk --no-cache add ca-certificates
RUN addgroup -S app && adduser -S app -G app
USER app
WORKDIR /app
COPY --from=builder /app/bin/endpoint-monitor /app
ENTRYPOINT ["/app/endpoint-monitor"]
GITCOMMIT := $(shell git rev-parse HEAD)
GITDATE := $(shell git show -s --format='%ct')
VERSION := v0.0.0
LDFLAGSSTRING +=-X main.GitCommit=$(GITCOMMIT)
LDFLAGSSTRING +=-X main.GitDate=$(GITDATE)
LDFLAGSSTRING +=-X main.Version=$(VERSION)
LDFLAGS := -ldflags "$(LDFLAGSSTRING)"
all: build
build:
env GO111MODULE=on go build -v $(LDFLAGS) -o ./bin/endpoint-monitor ./cmd
clean:
rm ./bin/endpoint-monitor
test:
go test -v ./...
lint:
golangci-lint run ./...
.PHONY: \
build \
clean \
test \
lint
# @eth-optimism/endpoint-monitor
The endpoint-monitor runs websocket checks on edge-proxyd endpoints and downstream infra provider endpoints.
## Setup
Install go1.18
```bash
make build
source .env.example # (or copy to .envrc if using direnv)
./bin/endpoint-monitor
```
package main
import (
"fmt"
"os"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
endpointMonitor "github.com/ethereum-optimism/optimism/endpoint-monitor"
)
var (
Version = ""
GitCommit = ""
GitDate = ""
)
func main() {
oplog.SetupDefaults()
app := cli.NewApp()
app.Flags = endpointMonitor.CLIFlags("ENDPOINT_MONITOR")
app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate)
app.Name = "endpoint-monitor"
app.Usage = "Endpoint Monitoring Service"
app.Description = ""
app.Action = endpointMonitor.Main(Version)
err := app.Run(os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
}
package app
import (
"fmt"
"os"
"strings"
"time"
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/urfave/cli"
)
type ProviderConfig struct {
Name string
Url string
}
const (
ProvidersFlagName = "providers"
CheckIntervalFlagName = "check-interval"
CheckDurationFlagName = "check-duration"
)
func CLIFlags(envPrefix string) []cli.Flag {
flags := []cli.Flag{
cli.StringSliceFlag{
Name: ProvidersFlagName,
Usage: "List of providers",
Required: true,
EnvVar: opservice.PrefixEnvVar(envPrefix, "PROVIDERS"),
},
cli.DurationFlag{
Name: CheckIntervalFlagName,
Usage: "Check interval duration",
Value: 5 * time.Minute,
EnvVar: opservice.PrefixEnvVar(envPrefix, "CHECK_INTERVAL"),
},
cli.DurationFlag{
Name: CheckDurationFlagName,
Usage: "Check duration",
Value: 4 * time.Minute,
EnvVar: opservice.PrefixEnvVar(envPrefix, "CHECK_DURATION"),
},
}
flags = append(flags, opmetrics.CLIFlags(envPrefix)...)
flags = append(flags, oplog.CLIFlags(envPrefix)...)
return flags
}
type Config struct {
Providers []string `envconfig:"PROVIDERS" required:"true"`
CheckInterval time.Duration `envconfig:"CHECK_INTERVAL" default:"5m"`
CheckDuration time.Duration `envconfig:"CHECK_DURATION" default:"4m"`
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
}
func (c Config) Check() error {
if c.CheckDuration >= c.CheckInterval {
return fmt.Errorf("%s must be less than %s", CheckDurationFlagName, CheckIntervalFlagName)
}
if err := c.LogConfig.Check(); err != nil {
return err
}
if err := c.MetricsConfig.Check(); err != nil {
return err
}
return nil
}
func NewConfig(ctx *cli.Context) Config {
return Config{
Providers: ctx.GlobalStringSlice(ProvidersFlagName),
CheckInterval: ctx.GlobalDuration(CheckIntervalFlagName),
CheckDuration: ctx.GlobalDuration(CheckDurationFlagName),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
}
}
// GetProviderConfigs fetches endpoint provider configurations from the environment
// Each provider should have a corresponding env var with the url, ex: PROVIDER1_URL=<provider-url>
func (c Config) GetProviderConfigs() []ProviderConfig {
result := make([]ProviderConfig, 0)
for _, provider := range c.Providers {
envKey := fmt.Sprintf("ENDPOINT_MONITOR_%s_URL", strings.ToUpper(provider))
url := os.Getenv(envKey)
if url == "" {
panic(fmt.Sprintf("%s is not set", envKey))
}
result = append(result, ProviderConfig{Name: provider, Url: url})
}
return result
}
package app
import (
"context"
"fmt"
"strings"
"time"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/l2geth/core/types"
"github.com/ethereum-optimism/optimism/l2geth/ethclient"
)
var (
MetricWsSubscribeStatus = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "ws_subscribe_status",
Help: "eth_subscribe over websocket check status"},
[]string{"status", "provider", "error"},
)
)
func Main(version string) func(cliCtx *cli.Context) error {
return func(cliCtx *cli.Context) error {
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid CLI flags: %w", err)
}
l := oplog.NewLogger(cfg.LogConfig)
endpointMonitor := NewEndpointMonitor(cfg, l)
l.Info(fmt.Sprintf("starting endpoint monitor with checkInterval=%s checkDuration=%s", cfg.CheckInterval, cfg.CheckDuration))
endpointMonitor.Start()
ctx := context.Background()
registry := opmetrics.NewRegistry()
registry.MustRegister(MetricWsSubscribeStatus)
metricsCfg := cfg.MetricsConfig
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", err)
return err
}
return nil
}
}
type EndpointMonitor struct {
cfg Config
logger log.Logger
}
func NewEndpointMonitor(cfg Config, l log.Logger) EndpointMonitor {
return EndpointMonitor{cfg: cfg, logger: l}
}
func (e EndpointMonitor) Start() {
for _, providerConfig := range e.cfg.GetProviderConfigs() {
go e.runWebsocketCheckLoop(providerConfig, e.cfg.CheckInterval, e.cfg.CheckDuration)
}
}
// getWrappingErrorMsg returns the most recently wrapped error message
// it's used in this case to get the error type reported by runSubscribeCallCheck
func getWrappingErrorMsg(err error) string {
cause := errors.Cause(err)
return strings.TrimSuffix(err.Error(), fmt.Sprintf(": %s", cause.Error()))
}
// runWebsocketCheckLoop runs subscribe call checks every checkInterval and reports status metrics to prometheus
func (e EndpointMonitor) runWebsocketCheckLoop(p ProviderConfig, checkInterval, checkDuration time.Duration) {
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
e.logger.Info("running websocket check", "provider", p.Name)
err := e.runWebsocketCheck(p, checkDuration)
if err != nil {
errType := getWrappingErrorMsg(err)
MetricWsSubscribeStatus.With(prometheus.Labels{"provider": p.Name, "status": "error", "error": errType}).Inc()
e.logger.Error("finished websocket check", "provider", p.Name, "error", errType)
} else {
MetricWsSubscribeStatus.With(prometheus.Labels{"provider": p.Name, "status": "success", "error": ""}).Inc()
e.logger.Info("finished websocket check", "provider", p.Name)
}
<-ticker.C
}
}
// runWebsocketCheck creates a client and subscribes to blockchain head notifications and returns any errors encountered for reporting
func (e EndpointMonitor) runWebsocketCheck(p ProviderConfig, duration time.Duration) error {
client, err := ethclient.Dial(p.Url)
if err != nil {
return errors.Wrap(err, "dial")
}
defer client.Close()
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {
return errors.Wrap(err, "eth_subscribe_failed")
}
receivedData := false
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sub.Unsubscribe()
if !receivedData {
return errors.New("nodata")
}
return nil
case err := <-sub.Err():
return errors.Wrap(err, "read")
case header := <-headers:
e.logger.Debug(header.Hash().Hex(), "provider", p.Name)
receivedData = true
}
}
}
module github.com/ethereum-optimism/optimism/endpoint-monitor
go 1.18
require (
github.com/ethereum-optimism/optimism/l2geth v0.0.0-20220923210602-7121648c1f26
github.com/ethereum-optimism/optimism/op-service v0.8.8
github.com/ethereum/go-ethereum v1.10.23
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
github.com/urfave/cli v1.22.9
)
require (
github.com/VictoriaMetrics/fastcache v1.9.0 // indirect
github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/elastic/gosigar v0.12.0 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/sys v0.0.0-20220701225701-179beb0bd1a1 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
)
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -3,6 +3,7 @@ go 1.18 ...@@ -3,6 +3,7 @@ go 1.18
use ( use (
./batch-submitter ./batch-submitter
./bss-core ./bss-core
./endpoint-monitor
./gas-oracle ./gas-oracle
./indexer ./indexer
./l2geth ./l2geth
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment