Commit 0da4ba1c authored by protolambda's avatar protolambda Committed by GitHub

op-node,op-heartbeat: remove stale node heartbeat monitoring service (#11622)

parent ced78897
......@@ -1517,10 +1517,6 @@ workflows:
on_changes: op-e2e,packages/contracts-bedrock/src
uses_artifacts: true
requires: ["go-mod-download", "pnpm-monorepo"]
- go-test:
name: op-heartbeat-tests
module: op-heartbeat
requires: [ "go-mod-download" ]
- go-test:
name: op-batcher-tests
module: op-batcher
......@@ -1607,7 +1603,6 @@ workflows:
- go-mod-download
- op-batcher-tests
- op-chain-ops-tests
- op-heartbeat-tests
- op-node-tests
- op-proposer-tests
- op-challenger-tests
......@@ -1657,11 +1652,6 @@ workflows:
docker_name: op-conductor
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
# op-conductor is not part of the devnet, we don't save it.
- docker-build:
name: op-heartbeat-docker-build
docker_name: op-heartbeat
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
# op-heartbeat is not part of the devnet, we don't save it.
- docker-build:
name: da-server-docker-build
docker_name: da-server
......@@ -1712,26 +1702,6 @@ workflows:
only: /^(da-server|ci-builder(-rust)?|ufm-[a-z0-9\-]*|op-[a-z0-9\-]*)\/v.*/
branches:
ignore: /.*/
- docker-build:
name: op-heartbeat-release
filters:
tags:
only: /^op-heartbeat\/v.*/
branches:
ignore: /.*/
docker_name: op-heartbeat
docker_tags: <<pipeline.git.revision>>
requires: ['hold']
platforms: "linux/amd64,linux/arm64"
publish: true
release: true
context:
- oplabs-gcr-release
- check-cross-platform:
name: op-heartbeat-cross-platform
op_component: op-heartbeat
requires:
- op-heartbeat-release
- docker-build:
name: op-node-docker-release
filters:
......@@ -2107,20 +2077,6 @@ workflows:
op_component: op-conductor
requires:
- op-conductor-docker-publish
- docker-build:
name: op-heartbeat-docker-publish
docker_name: op-heartbeat
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
platforms: "linux/amd64,linux/arm64"
publish: true
context:
- oplabs-gcr
- slack
- check-cross-platform:
name: op-heartbeat-cross-platform
op_component: op-heartbeat
requires:
- op-heartbeat-docker-publish
- docker-build:
name: op-supervisor-docker-publish
docker_name: op-supervisor
......
......@@ -11,7 +11,6 @@
/op-challenger @ethereum-optimism/go-reviewers
/op-dispute-mon @ethereum-optimism/go-reviewers
/op-e2e @ethereum-optimism/go-reviewers
/op-heartbeat @ethereum-optimism/go-reviewers
/op-node @ethereum-optimism/go-reviewers
/op-node/rollup @protolambda @ajsutton
/op-alt-da @ethereum-optimism/go-reviewers
......
......@@ -20,7 +20,6 @@ on:
options:
- ci-builder
- ci-builder-rust
- op-heartbeat
- op-node
- op-batcher
- op-proposer
......
......@@ -69,7 +69,6 @@ The Optimism Immunefi program offers up to $2,000,042 for in-scope critical vuln
├── <a href="./op-chain-ops">op-chain-ops</a>: State surgery utilities
├── <a href="./op-challenger">op-challenger</a>: Dispute game challenge agent
├── <a href="./op-e2e">op-e2e</a>: End-to-End testing of all bedrock components in Go
├── <a href="./op-heartbeat">op-heartbeat</a>: Heartbeat monitor service
├── <a href="./op-node">op-node</a>: rollup consensus-layer client
├── <a href="./op-preimage">op-preimage</a>: Go bindings for Preimage Oracle
├── <a href="./op-program">op-program</a>: Fault proof program
......@@ -118,11 +117,8 @@ The full set of components that have releases are:
- `op-batcher`
- `op-contracts`
- `op-challenger`
- `op-heartbeat`
- `op-node`
- `op-proposer`
- `op-ufm`
- `proxyd`
All other components and packages should be considered development components only and do not have releases.
......
......@@ -53,10 +53,6 @@ variable "OP_DISPUTE_MON_VERSION" {
default = "${GIT_VERSION}"
}
variable "OP_HEARTBEAT_VERSION" {
default = "${GIT_VERSION}"
}
variable "OP_PROGRAM_VERSION" {
default = "${GIT_VERSION}"
}
......@@ -152,19 +148,6 @@ target "op-conductor" {
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-conductor:${tag}"]
}
target "op-heartbeat" {
dockerfile = "ops/docker/op-stack-go/Dockerfile"
context = "."
args = {
GIT_COMMIT = "${GIT_COMMIT}"
GIT_DATE = "${GIT_DATE}"
OP_HEARTBEAT_VERSION = "${OP_HEARTBEAT_VERSION}"
}
target = "op-heartbeat-target"
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-heartbeat:${tag}"]
}
target "da-server" {
dockerfile = "ops/docker/op-stack-go/Dockerfile"
context = "."
......
GITCOMMIT ?= $(shell git rev-parse HEAD)
GITDATE ?= $(shell git show -s --format='%ct')
VERSION ?= v0.0.0
LDFLAGSSTRING +=-X main.GitCommit=$(GITCOMMIT)
LDFLAGSSTRING +=-X main.GitDate=$(GITDATE)
LDFLAGSSTRING +=-X main.Version=$(VERSION)
LDFLAGS := -ldflags "$(LDFLAGSSTRING)"
op-heartbeat:
env GO111MODULE=on GOOS=$(TARGETOS) GOARCH=$(TARGETARCH) CGO_ENABLED=0 go build -v $(LDFLAGS) -o ./bin/op-heartbeat ./cmd
clean:
rm bin/op-heartbeat
test:
go test -v ./...
.PHONY: \
clean \
op-heartbeat \
test
package main
import (
"os"
heartbeat "github.com/ethereum-optimism/optimism/op-heartbeat"
"github.com/ethereum-optimism/optimism/op-heartbeat/flags"
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
)
var (
Version = ""
GitCommit = ""
GitDate = ""
)
func main() {
oplog.SetupDefaults()
app := cli.NewApp()
app.Flags = flags.Flags
app.Version = opservice.FormatVersion(Version, GitCommit, GitDate, "")
app.Name = "op-heartbeat"
app.Usage = "Heartbeat recorder"
app.Description = "Service that records opt-in heartbeats from op nodes"
app.Action = heartbeat.Main(app.Version)
err := app.Run(os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
}
package op_heartbeat
import (
"errors"
"github.com/ethereum-optimism/optimism/op-heartbeat/flags"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
"github.com/urfave/cli/v2"
)
type Config struct {
HTTPAddr string
HTTPPort int
Log oplog.CLIConfig
Metrics opmetrics.CLIConfig
Pprof oppprof.CLIConfig
}
func (c Config) Check() error {
if c.HTTPAddr == "" {
return errors.New("must specify a valid HTTP address")
}
if c.HTTPPort <= 0 {
return errors.New("must specify a valid HTTP port")
}
if err := c.Metrics.Check(); err != nil {
return err
}
if err := c.Pprof.Check(); err != nil {
return err
}
return nil
}
func NewConfig(ctx *cli.Context) Config {
return Config{
HTTPAddr: ctx.String(flags.HTTPAddrFlag.Name),
HTTPPort: ctx.Int(flags.HTTPPortFlag.Name),
Log: oplog.ReadCLIConfig(ctx),
Metrics: opmetrics.ReadCLIConfig(ctx),
Pprof: oppprof.ReadCLIConfig(ctx),
}
}
package flags
import (
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/urfave/cli/v2"
)
const envPrefix = "OP_HEARTBEAT"
func prefixEnvVars(name string) []string {
return opservice.PrefixEnvVar(envPrefix, name)
}
const (
HTTPAddrFlagName = "http.addr"
HTTPPortFlagName = "http.port"
)
var (
HTTPAddrFlag = &cli.StringFlag{
Name: HTTPAddrFlagName,
Usage: "Address the server should listen on",
Value: "0.0.0.0",
EnvVars: prefixEnvVars("HTTP_ADDR"),
}
HTTPPortFlag = &cli.IntFlag{
Name: HTTPPortFlagName,
Usage: "Port the server should listen on",
Value: 8080,
EnvVars: prefixEnvVars("HTTP_PORT"),
}
)
var Flags []cli.Flag
func init() {
Flags = []cli.Flag{
HTTPAddrFlag,
HTTPPortFlag,
}
Flags = append(Flags, oplog.CLIFlags(envPrefix)...)
Flags = append(Flags, opmetrics.CLIFlags(envPrefix)...)
}
package op_heartbeat
import (
"fmt"
"strconv"
"sync/atomic"
"time"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
)
const (
MetricsNamespace = "op_heartbeat"
MinHeartbeatInterval = 10*time.Minute - 10*time.Second
UsersCacheSize = 10_000
)
type Metrics interface {
RecordHeartbeat(payload heartbeat.Payload, ip string)
RecordVersion(version string)
}
type metrics struct {
heartbeats *prometheus.CounterVec
version *prometheus.GaugeVec
sameIP *prometheus.HistogramVec
// Groups heartbeats per unique IP, version and chain ID combination.
// string(IP ++ version ++ chainID) -> *heartbeatEntry
heartbeatUsers *lru.Cache[string, *heartbeatEntry]
}
type heartbeatEntry struct {
// Count number of heartbeats per interval, atomically updated
Count uint64
// Changes once per heartbeat interval
Time time.Time
}
func NewMetrics(r *prometheus.Registry) Metrics {
lruCache, _ := lru.New[string, *heartbeatEntry](UsersCacheSize)
m := &metrics{
heartbeats: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "heartbeats",
Help: "Counts number of heartbeats by chain ID, version and filtered to unique IPs",
}, []string{
"chain_id",
"version",
}),
version: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "version",
Help: "version pseudo-metrics",
}, []string{
"version",
}),
sameIP: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "heartbeat_same_ip",
Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
Help: "Histogram of events within same heartbeat interval per unique IP, by chain ID and version",
}, []string{
"chain_id",
"version",
}),
heartbeatUsers: lruCache,
}
return m
}
func (m *metrics) RecordHeartbeat(payload heartbeat.Payload, ip string) {
var chainID string
if AllowedChainIDs[payload.ChainID] {
chainID = strconv.FormatUint(payload.ChainID, 10)
} else {
chainID = "unknown"
}
var version string
if AllowedVersions[payload.Version] {
version = payload.Version
} else {
version = "unknown"
}
key := fmt.Sprintf("%s;%s;%s", ip, version, chainID)
now := time.Now()
entry, ok, _ := m.heartbeatUsers.PeekOrAdd(key, &heartbeatEntry{Time: now, Count: 1})
if !ok {
// if it's a new entry, observe it and exit.
m.sameIP.WithLabelValues(chainID, version).Observe(1)
m.heartbeats.WithLabelValues(chainID, version).Inc()
return
}
if now.Sub(entry.Time) < MinHeartbeatInterval {
// if the span is still going, then add it up
atomic.AddUint64(&entry.Count, 1)
} else {
// if the span ended, then meter it, and reset it
m.sameIP.WithLabelValues(chainID, version).Observe(float64(atomic.LoadUint64(&entry.Count)))
entry.Time = now
atomic.StoreUint64(&entry.Count, 1)
m.heartbeats.WithLabelValues(chainID, version).Inc()
}
// always add, to keep LRU accurate
m.heartbeatUsers.Add(key, entry)
}
func (m *metrics) RecordVersion(version string) {
m.version.WithLabelValues(version).Set(1)
}
package op_heartbeat
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-service/httputil"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
)
const (
HTTPMaxHeaderSize = 10 * 1024
HTTPMaxBodySize = 1024 * 1024
)
func Main(version string) func(ctx *cli.Context) error {
return func(cliCtx *cli.Context) error {
cfg := NewConfig(cliCtx)
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid CLI flags: %w", err)
}
l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.Log)
oplog.SetGlobalLogHandler(l.Handler())
l.Info("starting heartbeat monitor", "version", version)
srv, err := Start(cliCtx.Context, l, cfg, version)
if err != nil {
l.Crit("error starting application", "err", err)
}
doneCh := make(chan os.Signal, 1)
signal.Notify(doneCh, []os.Signal{
os.Interrupt,
os.Kill,
syscall.SIGTERM,
syscall.SIGQUIT,
}...)
<-doneCh
return srv.Stop(context.Background())
}
}
type HeartbeatService struct {
metrics, http *httputil.HTTPServer
pprofService *oppprof.Service
}
func (hs *HeartbeatService) Stop(ctx context.Context) error {
var result error
if hs.pprofService != nil {
result = errors.Join(result, hs.pprofService.Stop(ctx))
}
if hs.metrics != nil {
result = errors.Join(result, hs.metrics.Stop(ctx))
}
if hs.http != nil {
result = errors.Join(result, hs.http.Stop(ctx))
}
return result
}
func Start(ctx context.Context, l log.Logger, cfg Config, version string) (*HeartbeatService, error) {
hs := &HeartbeatService{}
registry := opmetrics.NewRegistry()
metricsCfg := cfg.Metrics
if metricsCfg.Enabled {
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := opmetrics.StartServer(registry, metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start metrics server: %w", err), hs.Stop(ctx))
}
hs.metrics = metricsSrv
l.Info("started metrics server", "addr", metricsSrv.Addr())
}
pprofCfg := cfg.Pprof
hs.pprofService = oppprof.New(
pprofCfg.ListenEnabled,
pprofCfg.ListenAddr,
pprofCfg.ListenPort,
pprofCfg.ProfileType,
pprofCfg.ProfileDir,
pprofCfg.ProfileFilename,
)
if err := hs.pprofService.Start(); err != nil {
return nil, fmt.Errorf("failed to start pprof service: %w", err)
}
metrics := NewMetrics(registry)
metrics.RecordVersion(version)
mux := http.NewServeMux()
mux.HandleFunc("/healthz", HealthzHandler)
mux.Handle("/", Handler(l, metrics))
recorder := opmetrics.NewPromHTTPRecorder(registry, MetricsNamespace)
mw := opmetrics.NewHTTPRecordingMiddleware(recorder, mux)
srv, err := httputil.StartHTTPServer(
net.JoinHostPort(cfg.HTTPAddr, strconv.Itoa(cfg.HTTPPort)),
mw,
httputil.WithTimeouts(httputil.HTTPTimeouts{
ReadTimeout: 30 * time.Second,
ReadHeaderTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
}),
httputil.WithMaxHeaderBytes(HTTPMaxHeaderSize))
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start HTTP server: %w", err), hs.Stop(ctx))
}
hs.http = srv
return hs, nil
}
func Handler(l log.Logger, metrics Metrics) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ipStr := r.Header.Get("X-Forwarded-For")
// XFF can be a comma-separated list. Left-most is the original client.
if i := strings.Index(ipStr, ","); i >= 0 {
ipStr = ipStr[:i]
}
innerL := l.New(
"ip", ipStr,
"user_agent", r.Header.Get("User-Agent"),
"remote_addr", r.RemoteAddr,
)
var payload heartbeat.Payload
dec := json.NewDecoder(io.LimitReader(r.Body, int64(HTTPMaxBodySize)))
if err := dec.Decode(&payload); err != nil {
innerL.Info("error decoding request payload", "err", err)
w.WriteHeader(400)
return
}
innerL.Info(
"got heartbeat",
"version", payload.Version,
"meta", payload.Meta,
"moniker", payload.Moniker,
"peer_id", payload.PeerID,
"chain_id", payload.ChainID,
)
metrics.RecordHeartbeat(payload, ipStr)
w.WriteHeader(204)
}
}
func HealthzHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(204)
}
package op_heartbeat
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"testing"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
)
func TestService(t *testing.T) {
httpPort := freePort(t)
metricsPort := freePort(t)
cfg := Config{
HTTPAddr: "127.0.0.1",
HTTPPort: httpPort,
Metrics: opmetrics.CLIConfig{
Enabled: true,
ListenAddr: "127.0.0.1",
ListenPort: metricsPort,
},
}
ctx, cancel := context.WithCancel(context.Background())
srv, err := Start(ctx, log.New(), cfg, "foobar")
// Make sure that the service properly starts
require.NoError(t, err)
defer cancel()
defer func() {
require.NoError(t, srv.Stop(ctx), "close heartbeat server")
}()
tests := []struct {
name string
hbs []heartbeat.Payload
metric string
ip string
}{
{
"no whitelisted version",
[]heartbeat.Payload{{
Version: "not_whitelisted",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 10,
}},
`op_heartbeat_heartbeats{chain_id="10",version="unknown"} 1`,
"1.2.3.100",
},
{
"no whitelisted chain",
[]heartbeat.Payload{{
Version: "v0.1.0-beta.1",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 999,
}},
`op_heartbeat_heartbeats{chain_id="unknown",version="v0.1.0-beta.1"} 1`,
"1.2.3.101",
},
{
"both whitelisted",
[]heartbeat.Payload{{
Version: "v0.1.0-beta.1",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 10,
}},
`op_heartbeat_heartbeats{chain_id="10",version="v0.1.0-beta.1"} 1`,
"1.2.3.102",
},
{
"spamming",
[]heartbeat.Payload{
{
Version: "v0.1.0-goerli-rehearsal.1",
Meta: "whatever",
Moniker: "alice",
PeerID: "1X2398ug",
ChainID: 10,
},
{
Version: "v0.1.0-goerli-rehearsal.1",
Meta: "whatever",
Moniker: "bob",
PeerID: "1X2398ug",
ChainID: 10,
},
},
`op_heartbeat_heartbeat_same_ip_bucket{chain_id="10",version="v0.1.0-goerli-rehearsal.1",le="32"} 1`,
"1.2.3.103",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, hb := range tt.hbs {
data, err := json.Marshal(hb)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://127.0.0.1:%d", httpPort), bytes.NewReader(data))
require.NoError(t, err)
req.Header.Set("X-Forwarded-For", tt.ip)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
res.Body.Close()
require.Equal(t, res.StatusCode, 204)
}
metricsRes, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d", metricsPort))
require.NoError(t, err)
defer metricsRes.Body.Close()
require.NoError(t, err)
metricsBody, err := io.ReadAll(metricsRes.Body)
require.NoError(t, err)
require.Contains(t, string(metricsBody), tt.metric)
})
}
}
func freePort(t *testing.T) int {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
require.NoError(t, err)
l, err := net.ListenTCP("tcp", addr)
require.NoError(t, err)
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
package op_heartbeat
var AllowedChainIDs = map[uint64]bool{
420: true,
902: true,
10: true,
}
var AllowedVersions = map[string]bool{
"": true,
"v0.1.0-beta.1": true,
"v0.1.0-goerli-rehearsal.1": true,
"v0.10.9": true,
"v0.10.10": true,
"v0.10.11": true,
"v0.10.12": true,
"v0.10.13": true,
"v0.10.14": true,
"v0.11.0": true,
}
......@@ -279,22 +279,24 @@ var (
}
HeartbeatEnabledFlag = &cli.BoolFlag{
Name: "heartbeat.enabled",
Usage: "Enables or disables heartbeating",
Usage: "Deprecated, no-op flag.",
EnvVars: prefixEnvVars("HEARTBEAT_ENABLED"),
Category: OperationsCategory,
Hidden: true,
}
HeartbeatMonikerFlag = &cli.StringFlag{
Name: "heartbeat.moniker",
Usage: "Sets a moniker for this node",
Usage: "Deprecated, no-op flag.",
EnvVars: prefixEnvVars("HEARTBEAT_MONIKER"),
Category: OperationsCategory,
Hidden: true,
}
HeartbeatURLFlag = &cli.StringFlag{
Name: "heartbeat.url",
Usage: "Sets the URL to heartbeat to",
Usage: "Deprecated, no-op flag.",
EnvVars: prefixEnvVars("HEARTBEAT_URL"),
Value: "https://heartbeat.optimism.io",
Category: OperationsCategory,
Hidden: true,
}
RollupHalt = &cli.StringFlag{
Name: "rollup.halt",
......
// Package heartbeat provides a service for sending heartbeats to a server.
package heartbeat
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/ethereum/go-ethereum/log"
)
// SendInterval determines the delay between requests. This must be larger than the MinHeartbeatInterval in the server.
const SendInterval = 10 * time.Minute
type Payload struct {
Version string `json:"version"`
Meta string `json:"meta"`
Moniker string `json:"moniker"`
PeerID string `json:"peerID"`
ChainID uint64 `json:"chainID"`
}
// Beat sends a heartbeat to the server at the given URL. It will send a heartbeat immediately, and then every SendInterval.
// Beat spawns a goroutine that will send heartbeats until the context is canceled.
func Beat(
ctx context.Context,
log log.Logger,
url string,
payload *Payload,
) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("telemetry crashed: %w", err)
}
client := &http.Client{
Timeout: 10 * time.Second,
}
send := func() {
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payloadJSON))
req.Header.Set("User-Agent", fmt.Sprintf("op-node/%s", payload.Version))
req.Header.Set("Content-Type", "application/json")
if err != nil {
log.Error("error creating heartbeat HTTP request", "err", err)
return
}
res, err := client.Do(req)
if err != nil {
log.Warn("error sending heartbeat", "err", err)
return
}
res.Body.Close()
if res.StatusCode < 200 || res.StatusCode > 204 {
log.Warn("heartbeat server returned non-200 status code", "status", res.StatusCode)
return
}
log.Info("sent heartbeat")
}
send()
tick := time.NewTicker(SendInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
send()
case <-ctx.Done():
return nil
}
}
}
package heartbeat
import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
)
const expHeartbeat = `{
"version": "v1.2.3",
"meta": "meta",
"moniker": "yeet",
"peerID": "1UiUfoobar",
"chainID": 1234
}`
func TestBeat(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
reqCh := make(chan string, 2)
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(204)
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
reqCh <- string(body)
r.Body.Close()
}))
defer s.Close()
doneCh := make(chan struct{})
go func() {
_ = Beat(ctx, log.Root(), s.URL, &Payload{
Version: "v1.2.3",
Meta: "meta",
Moniker: "yeet",
PeerID: "1UiUfoobar",
ChainID: 1234,
})
doneCh <- struct{}{}
}()
select {
case hb := <-reqCh:
require.JSONEq(t, expHeartbeat, hb)
cancel()
<-doneCh
case <-ctx.Done():
t.Fatalf("error: %v", ctx.Err())
}
}
......@@ -54,8 +54,7 @@ type Config struct {
RuntimeConfigReloadInterval time.Duration
// Optional
Tracer Tracer
Heartbeat HeartbeatConfig
Tracer Tracer
Sync sync.Config
......@@ -106,12 +105,6 @@ func (m MetricsConfig) Check() error {
return nil
}
type HeartbeatConfig struct {
Enabled bool
Moniker string
URL string
}
func (cfg *Config) LoadPersisted(log log.Logger) error {
if !cfg.Driver.SequencerEnabled {
return nil
......
......@@ -16,7 +16,6 @@ import (
"github.com/ethereum/go-ethereum/log"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/p2p"
......@@ -24,7 +23,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-node/version"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
......@@ -148,7 +146,6 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
}
n.metrics.RecordInfo(n.appVersion)
n.metrics.RecordUp()
n.initHeartbeat(cfg)
if err := n.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err)
}
......@@ -440,32 +437,6 @@ func (n *OpNode) initMetricsServer(cfg *Config) error {
return nil
}
func (n *OpNode) initHeartbeat(cfg *Config) {
if !cfg.Heartbeat.Enabled {
return
}
var peerID string
if cfg.P2P.Disabled() {
peerID = "disabled"
} else {
peerID = n.P2P().Host().ID().String()
}
payload := &heartbeat.Payload{
Version: version.Version,
Meta: version.Meta,
Moniker: cfg.Heartbeat.Moniker,
PeerID: peerID,
ChainID: cfg.Rollup.L2ChainID.Uint64(),
}
go func(url string) {
if err := heartbeat.Beat(n.resourcesCtx, n.log, url, payload); err != nil {
log.Error("heartbeat goroutine crashed", "err", err)
}
}(cfg.Heartbeat.URL)
}
func (n *OpNode) initPProf(cfg *Config) error {
n.pprofService = oppprof.New(
cfg.Pprof.ListenEnabled,
......
......@@ -75,6 +75,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
haltOption = ""
}
if ctx.IsSet(flags.HeartbeatEnabledFlag.Name) ||
ctx.IsSet(flags.HeartbeatMonikerFlag.Name) ||
ctx.IsSet(flags.HeartbeatURLFlag.Name) {
log.Warn("Heartbeat functionality is not supported anymore, CLI flags will be removed in following release.")
}
cfg := &node.Config{
L1: l1Endpoint,
L2: l2Endpoint,
......@@ -96,16 +102,11 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
P2PSigner: p2pSignerSetup,
L1EpochPollInterval: ctx.Duration(flags.L1EpochPollIntervalFlag.Name),
RuntimeConfigReloadInterval: ctx.Duration(flags.RuntimeConfigReloadIntervalFlag.Name),
Heartbeat: node.HeartbeatConfig{
Enabled: ctx.Bool(flags.HeartbeatEnabledFlag.Name),
Moniker: ctx.String(flags.HeartbeatMonikerFlag.Name),
URL: ctx.String(flags.HeartbeatURLFlag.Name),
},
ConfigPersistence: configPersistence,
SafeDBPath: ctx.String(flags.SafeDBPath.Name),
Sync: *syncConfig,
RollupHalt: haltOption,
RethDBPath: ctx.String(flags.L1RethDBPath.Name),
ConfigPersistence: configPersistence,
SafeDBPath: ctx.String(flags.SafeDBPath.Name),
Sync: *syncConfig,
RollupHalt: haltOption,
RethDBPath: ctx.String(flags.L1RethDBPath.Name),
ConductorEnabled: ctx.Bool(flags.ConductorEnabledFlag.Name),
ConductorRpc: ctx.String(flags.ConductorRpcFlag.Name),
......
......@@ -57,11 +57,6 @@ ARG OP_PROGRAM_VERSION=v0.0.0
RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build cd op-program && make op-program-host \
GOOS=$TARGETOS GOARCH=$TARGETARCH GITCOMMIT=$GIT_COMMIT GITDATE=$GIT_DATE VERSION="$OP_PROGRAM_VERSION"
FROM --platform=$BUILDPLATFORM builder AS op-heartbeat-builder
ARG OP_HEARTBEAT_VERSION=v0.0.0
RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build cd op-heartbeat && make op-heartbeat \
GOOS=$TARGETOS GOARCH=$TARGETARCH GITCOMMIT=$GIT_COMMIT GITDATE=$GIT_DATE VERSION="$OP_HEARTBEAT_VERSION"
FROM --platform=$BUILDPLATFORM builder AS op-wheel-builder
ARG OP_WHEEL_VERSION=v0.0.0
RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build cd op-wheel && make op-wheel \
......@@ -114,10 +109,6 @@ FROM --platform=$TARGETPLATFORM $TARGET_BASE_IMAGE AS op-program-target
COPY --from=op-program-builder /app/op-program/bin/op-program /usr/local/bin/
CMD ["op-program"]
FROM --platform=$TARGETPLATFORM $TARGET_BASE_IMAGE AS op-heartbeat-target
COPY --from=op-heartbeat-builder /app/op-heartbeat/bin/op-heartbeat /usr/local/bin/
CMD ["op-heartbeat"]
FROM --platform=$TARGETPLATFORM $TARGET_BASE_IMAGE AS op-wheel-target
COPY --from=op-wheel-builder /app/op-wheel/bin/op-wheel /usr/local/bin/
CMD ["op-wheel"]
......
......@@ -10,7 +10,6 @@
!/packages/contracts-bedrock/snapshots
!/op-dispute-mon
!/op-conductor
!/op-heartbeat
!/op-node
!/op-preimage
!/op-program
......
......@@ -18,7 +18,6 @@ MIN_VERSIONS = {
'op-program': '0.0.0',
'op-dispute-mon': '0.0.0',
'op-proposer': '0.10.14',
'op-heartbeat': '0.1.0',
'op-contracts': '1.0.0',
'op-conductor': '0.0.0',
}
......
......@@ -11,7 +11,6 @@ SERVICES = [
'op-dispute-mon',
'op-proposer',
'da-server',
'op-heartbeat',
'op-contracts',
'test',
'op-stack', # special case for tagging op-node, op-batcher, and op-proposer together
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment