Commit cee753f4 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

op-node: Add heartbeater (#3391)

* op-node: Add heartbeater

* Update op-node/heartbeat/service.go
Co-authored-by: default avatarJoshua Gutow <jgutow@optimism.io>

* imports
Co-authored-by: default avatarJoshua Gutow <jgutow@optimism.io>
parent a28bfa56
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/cmd/genesis" "github.com/ethereum-optimism/optimism/op-node/cmd/genesis"
"github.com/ethereum-optimism/optimism/op-node/cmd/p2p" "github.com/ethereum-optimism/optimism/op-node/cmd/p2p"
...@@ -117,6 +119,30 @@ func RollupNodeMain(ctx *cli.Context) error { ...@@ -117,6 +119,30 @@ func RollupNodeMain(ctx *cli.Context) error {
m.RecordUp() m.RecordUp()
log.Info("Rollup node started") log.Info("Rollup node started")
if cfg.Heartbeat.Enabled {
var peerID string
if cfg.P2P == nil {
peerID = "disabled"
} else {
peerID = n.P2P().Host().ID().String()
}
beatCtx, beatCtxCancel := context.WithCancel(context.Background())
payload := &heartbeat.Payload{
Version: version.Version,
Meta: version.Meta,
Moniker: cfg.Heartbeat.Moniker,
PeerID: peerID,
ChainID: cfg.Rollup.L2ChainID.Uint64(),
}
go func() {
if err := heartbeat.Beat(beatCtx, log, cfg.Heartbeat.URL, payload); err != nil {
log.Error("heartbeat goroutine crashed", "err", err)
}
}()
defer beatCtxCancel()
}
if cfg.Pprof.Enabled { if cfg.Pprof.Enabled {
var srv http.Server var srv http.Server
srv.Addr = net.JoinHostPort(cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort) srv.Addr = net.JoinHostPort(cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort)
......
...@@ -140,12 +140,27 @@ var ( ...@@ -140,12 +140,27 @@ var (
Value: 6060, Value: 6060,
EnvVar: prefixEnvVar("PPROF_PORT"), EnvVar: prefixEnvVar("PPROF_PORT"),
} }
SnapshotLog = cli.StringFlag{ SnapshotLog = cli.StringFlag{
Name: "snapshotlog.file", Name: "snapshotlog.file",
Usage: "Path to the snapshot log file", Usage: "Path to the snapshot log file",
EnvVar: prefixEnvVar("SNAPSHOT_LOG"), EnvVar: prefixEnvVar("SNAPSHOT_LOG"),
} }
HeartbeatEnabledFlag = cli.BoolFlag{
Name: "heartbeat.enabled",
Usage: "Enables or disables heartbeating",
EnvVar: prefixEnvVar("HEARTBEAT_ENABLED"),
}
HeartbeatMonikerFlag = cli.StringFlag{
Name: "heartbeat.moniker",
Usage: "Sets a moniker for this node",
EnvVar: prefixEnvVar("HEARTBEAT_MONIKER"),
}
HeartbeatURLFlag = cli.StringFlag{
Name: "heartbeat.url",
Usage: "Sets the URL to heartbeat to",
EnvVar: prefixEnvVar("HEARTBEAT_URL"),
Value: "https://heartbeat.bedrock-goerli.optimism.io",
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -174,6 +189,9 @@ var optionalFlags = append([]cli.Flag{ ...@@ -174,6 +189,9 @@ var optionalFlags = append([]cli.Flag{
PprofAddrFlag, PprofAddrFlag,
PprofPortFlag, PprofPortFlag,
SnapshotLog, SnapshotLog,
HeartbeatEnabledFlag,
HeartbeatMonikerFlag,
HeartbeatURLFlag,
}, p2pFlags...) }, p2pFlags...)
// Flags contains the list of configuration options available to the binary. // Flags contains the list of configuration options available to the binary.
......
package heartbeat
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/ethereum/go-ethereum/log"
)
var SendInterval = 10 * time.Minute
type Payload struct {
Version string `json:"version"`
Meta string `json:"meta"`
Moniker string `json:"moniker"`
PeerID string `json:"peerID"`
ChainID uint64 `json:"chainID"`
}
func Beat(
ctx context.Context,
log log.Logger,
url string,
payload *Payload,
) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("telemetry crashed: %w", err)
}
client := &http.Client{
Timeout: 10 * time.Second,
}
send := func() {
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payloadJSON))
req.Header.Set("Content-Type", "application/json")
if err != nil {
log.Error("error creating heartbeat HTTP request", "err", err)
return
}
res, err := client.Do(req)
if err != nil {
log.Warn("error sending heartbeat", "err", err)
return
}
res.Body.Close()
if res.StatusCode < 200 || res.StatusCode > 204 {
log.Warn("heartbeat server returned non-200 status code", "status", res.StatusCode)
return
}
log.Info("sent heartbeat")
}
send()
tick := time.NewTicker(SendInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
send()
case <-ctx.Done():
return nil
}
}
}
package heartbeat
import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
const expHeartbeat = `{
"version": "v1.2.3",
"meta": "meta",
"moniker": "yeet",
"peerID": "1UiUfoobar",
"chainID": 1234
}`
func TestBeat(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
reqCh := make(chan string, 2)
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(204)
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
reqCh <- string(body)
r.Body.Close()
}))
defer s.Close()
doneCh := make(chan struct{})
go func() {
_ = Beat(ctx, log.Root(), s.URL, &Payload{
Version: "v1.2.3",
Meta: "meta",
Moniker: "yeet",
PeerID: "1UiUfoobar",
ChainID: 1234,
})
doneCh <- struct{}{}
}()
select {
case hb := <-reqCh:
require.JSONEq(t, expHeartbeat, hb)
cancel()
<-doneCh
case <-ctx.Done():
t.Fatalf("error: %v", ctx.Err())
}
}
...@@ -35,7 +35,8 @@ type Config struct { ...@@ -35,7 +35,8 @@ type Config struct {
L1EpochPollInterval time.Duration L1EpochPollInterval time.Duration
// Optional // Optional
Tracer Tracer Tracer Tracer
Heartbeat HeartbeatConfig
} }
type RPCConfig struct { type RPCConfig struct {
...@@ -76,6 +77,12 @@ func (p PprofConfig) Check() error { ...@@ -76,6 +77,12 @@ func (p PprofConfig) Check() error {
return nil return nil
} }
type HeartbeatConfig struct {
Enabled bool
Moniker string
URL string
}
// Check verifies that the given configuration makes sense // Check verifies that the given configuration makes sense
func (cfg *Config) Check() error { func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil { if err := cfg.L2.Check(); err != nil {
......
...@@ -81,6 +81,11 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -81,6 +81,11 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
P2P: p2pConfig, P2P: p2pConfig,
P2PSigner: p2pSignerSetup, P2PSigner: p2pSignerSetup,
L1EpochPollInterval: ctx.GlobalDuration(flags.L1EpochPollIntervalFlag.Name), L1EpochPollInterval: ctx.GlobalDuration(flags.L1EpochPollIntervalFlag.Name),
Heartbeat: node.HeartbeatConfig{
Enabled: ctx.GlobalBool(flags.HeartbeatEnabledFlag.Name),
Moniker: ctx.GlobalString(flags.HeartbeatMonikerFlag.Name),
URL: ctx.GlobalString(flags.HeartbeatURLFlag.Name),
},
} }
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return nil, err return nil, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment