Commit 3a7b54ec authored by Matthew Slipper's avatar Matthew Slipper

op-heartbeat: Add implementation

parent 20c0474e
GITCOMMIT := $(shell git rev-parse HEAD)
GITDATE := $(shell git show -s --format='%ct')
VERSION := v0.0.0
LDFLAGSSTRING +=-X main.GitCommit=$(GITCOMMIT)
LDFLAGSSTRING +=-X main.GitDate=$(GITDATE)
LDFLAGSSTRING +=-X main.Version=$(VERSION)
LDFLAGS := -ldflags "$(LDFLAGSSTRING)"
op-heartbeat:
env GO111MODULE=on go build -v $(LDFLAGS) -o ./bin/op-heartbeat ./cmd
clean:
rm bin/op-heartbeat
test:
go test -v ./...
lint:
golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint -e "errors.As" -e "errors.Is"
.PHONY: \
clean \
op-heartbeat \
test \
lint
package main
import (
"fmt"
"os"
heartbeat "github.com/ethereum-optimism/optimism/op-heartbeat"
"github.com/ethereum-optimism/optimism/op-heartbeat/flags"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
)
var (
Version = ""
GitCommit = ""
GitDate = ""
)
func main() {
oplog.SetupDefaults()
app := cli.NewApp()
app.Flags = flags.Flags
app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate)
app.Name = "op-heartbeat"
app.Usage = "Heartbeat recorder"
app.Description = "Service that records opt-in heartbeats from op nodes"
app.Action = heartbeat.Main(app.Version)
err := app.Run(os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
}
package op_heartbeat
import (
"errors"
"github.com/ethereum-optimism/optimism/op-heartbeat/flags"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/urfave/cli"
)
type Config struct {
HTTPAddr string
HTTPPort int
Log oplog.CLIConfig
Metrics opmetrics.CLIConfig
Pprof oppprof.CLIConfig
}
func (c Config) Check() error {
if c.HTTPAddr == "" {
return errors.New("must specify a valid HTTP address")
}
if c.HTTPPort <= 0 {
return errors.New("must specify a valid HTTP port")
}
if err := c.Log.Check(); err != nil {
return err
}
if err := c.Metrics.Check(); err != nil {
return err
}
if err := c.Pprof.Check(); err != nil {
return err
}
return nil
}
func NewConfig(ctx *cli.Context) Config {
return Config{
HTTPAddr: ctx.GlobalString(flags.HTTPAddrFlag.Name),
HTTPPort: ctx.GlobalInt(flags.HTTPPortFlag.Name),
Log: oplog.ReadCLIConfig(ctx),
Metrics: opmetrics.ReadCLIConfig(ctx),
Pprof: oppprof.ReadCLIConfig(ctx),
}
}
package flags
import (
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/urfave/cli"
)
const envPrefix = "OP_HEARTBEAT"
const (
HTTPAddrFlagName = "http.addr"
HTTPPortFlagName = "http.port"
HTTPMaxBodySizeFlagName = "http.max-body-size"
AllowedChainIDsFlagName = "allowed-chain-ids"
)
var (
HTTPAddrFlag = cli.StringFlag{
Name: HTTPAddrFlagName,
Usage: "Address the server should listen on",
Value: "0.0.0.0",
EnvVar: opservice.PrefixEnvVar(envPrefix, "HTTP_ADDR"),
}
HTTPPortFlag = cli.IntFlag{
Name: HTTPPortFlagName,
Usage: "Port the server should listen on",
Value: 8080,
EnvVar: opservice.PrefixEnvVar(envPrefix, "HTTP_PORT"),
}
)
var Flags []cli.Flag
func init() {
Flags = []cli.Flag{
HTTPAddrFlag,
HTTPPortFlag,
}
Flags = append(Flags, oplog.CLIFlags(envPrefix)...)
Flags = append(Flags, opmetrics.CLIFlags(envPrefix)...)
}
package op_heartbeat
import (
"strconv"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const MetricsNamespace = "op_heartbeat"
type Metrics interface {
RecordHeartbeat(payload heartbeat.Payload)
RecordVersion(version string)
}
type metrics struct {
heartbeats *prometheus.CounterVec
version *prometheus.GaugeVec
}
func NewMetrics(r *prometheus.Registry) Metrics {
m := &metrics{
heartbeats: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "heartbeats",
Help: "Counts number of heartbeats by chain ID",
}, []string{
"chain_id",
"version",
}),
version: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "version",
Help: "version pseudo-metrics",
}, []string{
"version",
}),
}
return m
}
func (m *metrics) RecordHeartbeat(payload heartbeat.Payload) {
var chainID string
if AllowedChainIDs[payload.ChainID] {
chainID = strconv.FormatUint(payload.ChainID, 10)
} else {
chainID = "unknown"
}
var version string
if AllowedVersions[payload.Version] {
version = payload.Version
} else {
version = "unknown"
}
m.heartbeats.WithLabelValues(chainID, version).Inc()
}
func (m *metrics) RecordVersion(version string) {
m.version.WithLabelValues(version).Set(1)
}
package op_heartbeat
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-service/httputil"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
)
const HTTPMaxBodySize = 1024 * 1024
func Main(version string) func(ctx *cli.Context) error {
return func(cliCtx *cli.Context) error {
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid CLI flags: %w", err)
}
l := oplog.NewLogger(cfg.Log)
l.Info("starting heartbeat monitor", "version", version)
ctx, cancel := context.WithCancel(context.Background())
go func() {
if err := Start(ctx, l, cfg, version); err != nil {
l.Crit("error starting application", "err", err)
}
}()
doneCh := make(chan os.Signal, 1)
signal.Notify(doneCh, []os.Signal{
os.Interrupt,
os.Kill,
syscall.SIGTERM,
syscall.SIGQUIT,
}...)
<-doneCh
cancel()
return nil
}
}
func Start(ctx context.Context, l log.Logger, cfg Config, version string) error {
registry := opmetrics.NewRegistry()
metricsCfg := cfg.Metrics
if metricsCfg.Enabled {
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", err)
}
}()
}
pprofCfg := cfg.Pprof
if pprofCfg.Enabled {
l.Info("starting pprof server", "addr", pprofCfg.ListenAddr, "port", pprofCfg.ListenPort)
go func() {
if err := oppprof.ListenAndServe(ctx, pprofCfg.ListenAddr, pprofCfg.ListenPort); err != nil {
l.Error("error starting pprof server", err)
}
}()
}
metrics := NewMetrics(registry)
metrics.RecordVersion(version)
handler := Handler(l, metrics)
recorder := opmetrics.NewPromHTTPRecorder(registry, MetricsNamespace)
mw := opmetrics.NewHTTPRecordingMiddleware(recorder, handler)
server := &http.Server{
Addr: net.JoinHostPort(cfg.HTTPAddr, strconv.Itoa(cfg.HTTPPort)),
MaxHeaderBytes: HTTPMaxBodySize,
Handler: mw,
WriteTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
ReadTimeout: 30 * time.Second,
}
return httputil.ListenAndServeContext(ctx, server)
}
func Handler(l log.Logger, metrics Metrics) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
innerL := l.New(
"xff", r.Header.Get("X-Forwarded-For"),
"user_agent", r.Header.Get("User-Agent"),
"remote_addr", r.RemoteAddr,
)
var payload heartbeat.Payload
dec := json.NewDecoder(io.LimitReader(r.Body, int64(HTTPMaxBodySize)))
if err := dec.Decode(&payload); err != nil {
innerL.Info("error decoding request payload", "err", err)
w.WriteHeader(400)
return
}
innerL.Info(
"got heartbeat",
"version", payload.Version,
"meta", payload.Meta,
"moniker", payload.Moniker,
"peer_id", payload.PeerID,
"chain_id", payload.ChainID,
)
metrics.RecordHeartbeat(payload)
w.WriteHeader(204)
}
}
package op_heartbeat
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"io"
"net/http"
"testing"
)
func TestService(t *testing.T) {
cfg := Config{
HTTPAddr: "127.0.0.1",
HTTPPort: 8080,
HTTPMaxBodySize: 1024 * 1024,
Metrics: opmetrics.CLIConfig{
Enabled: true,
ListenAddr: "127.0.0.1",
ListenPort: 7300,
},
}
ctx, cancel := context.WithCancel(context.Background())
exitC := make(chan error, 1)
go func() {
exitC <- Start(ctx, log.New(), cfg, "foobar")
}()
tests := []struct {
name string
hb heartbeat.Payload
metricName string
metricValue int
}{
{
"no whitelisted version",
heartbeat.Payload{
Version: "not_whitelisted",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 10,
},
`op_heartbeat_heartbeats{chain_id="10",version="unknown"}`,
1,
},
{
"no whitelisted chain",
heartbeat.Payload{
Version: "v0.1.0-beta.1",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 999,
},
`op_heartbeat_heartbeats{chain_id="unknown",version="v0.1.0-beta.1"}`,
1,
},
{
"both whitelisted",
heartbeat.Payload{
Version: "v0.1.0-beta.1",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 10,
},
`op_heartbeat_heartbeats{chain_id="10",version="v0.1.0-beta.1"}`,
1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := json.Marshal(tt.hb)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, "POST", "http://127.0.0.1:8080", bytes.NewReader(data))
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, res.StatusCode, 204)
metricsRes, err := http.Get("http://127.0.0.1:7300")
defer metricsRes.Body.Close()
require.NoError(t, err)
metricsBody, err := io.ReadAll(metricsRes.Body)
require.NoError(t, err)
require.Contains(t, string(metricsBody), fmt.Sprintf("%s %d", tt.metricName, tt.metricValue))
})
}
cancel()
require.NoError(t, <-exitC)
}
package op_heartbeat
var AllowedChainIDs = map[uint64]bool{
420: true,
902: true,
10: true,
}
var AllowedVersions = map[string]bool{
"": true,
"v0.1.0-beta.1": true,
"v0.1.0-goerli-rehearsal.1": true,
}
......@@ -38,6 +38,7 @@ func Beat(
send := func() {
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payloadJSON))
req.Header.Set("User-Agent", fmt.Sprintf("op-node/%s", payload.Version))
req.Header.Set("Content-Type", "application/json")
if err != nil {
log.Error("error creating heartbeat HTTP request", "err", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment