Commit 7a678883 authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] sequencer health monitor (#8714)

* Implement sequencer health monitor

* Remove unnecessary state variables
parent c205f685
package health
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/dial"
)
// HealthMonitor defines the interface for monitoring the health of the sequencer.
type HealthMonitor interface {
// Subscribe returns a channel that will be notified for every health check.
Subscribe() <-chan bool
// Start starts the health check.
Start() error
// Stop stops the health check.
Stop() error
}
// NewSequencerHealthMonitor creates a new sequencer health monitor.
// interval is the interval between health checks measured in seconds.
// safeInterval is the interval between safe head progress measured in seconds.
// minPeerCount is the minimum number of peers required for the sequencer to be healthy.
func NewSequencerHealthMonitor(log log.Logger, interval, safeInterval, minPeerCount uint64, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor {
return &SequencerHealthMonitor{
log: log,
done: make(chan struct{}),
interval: interval,
healthUpdateCh: make(chan bool),
rollupCfg: rollupCfg,
safeInterval: safeInterval,
minPeerCount: minPeerCount,
node: node,
p2p: p2p,
}
}
// SequencerHealthMonitor monitors sequencer health.
type SequencerHealthMonitor struct {
log log.Logger
done chan struct{}
wg sync.WaitGroup
rollupCfg *rollup.Config
safeInterval uint64
minPeerCount uint64
interval uint64
healthUpdateCh chan bool
node dial.RollupClientInterface
p2p p2p.API
}
var _ HealthMonitor = (*SequencerHealthMonitor)(nil)
// Start implements HealthMonitor.
func (hm *SequencerHealthMonitor) Start() error {
hm.log.Info("starting health monitor")
hm.wg.Add(1)
go hm.loop()
hm.log.Info("health monitor started")
return nil
}
// Stop implements HealthMonitor.
func (hm *SequencerHealthMonitor) Stop() error {
hm.log.Info("stopping health monitor")
close(hm.done)
hm.wg.Wait()
hm.log.Info("health monitor stopped")
return nil
}
// Subscribe implements HealthMonitor.
func (hm *SequencerHealthMonitor) Subscribe() <-chan bool {
return hm.healthUpdateCh
}
func (hm *SequencerHealthMonitor) loop() {
defer hm.wg.Done()
duration := time.Duration(hm.interval) * time.Second
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-hm.done:
return
case <-ticker.C:
hm.healthUpdateCh <- hm.healthCheck()
}
}
}
// healthCheck checks the health of the sequencer by 3 criteria:
// 1. unsafe head is progressing per block time
// 2. safe head is progressing every configured batch submission interval
// 3. peer count is above the configured minimum
func (hm *SequencerHealthMonitor) healthCheck() bool {
ctx := context.Background()
status, err := hm.node.SyncStatus(ctx)
if err != nil {
hm.log.Error("health monitor failed to get sync status", "err", err)
return false
}
now := uint64(time.Now().Unix())
// allow at most one block drift for unsafe head
if now-status.UnsafeL2.Time > hm.interval+hm.rollupCfg.BlockTime {
hm.log.Error("unsafe head is not progressing", "lastSeenUnsafeBlock", status.UnsafeL2)
return false
}
if now-status.SafeL2.Time > hm.safeInterval {
hm.log.Error("safe head is not progressing", "safe_head_time", status.SafeL2.Time, "now", now)
return false
}
stats, err := hm.p2p.PeerStats(ctx)
if err != nil {
hm.log.Error("health monitor failed to get peer stats", "err", err)
return false
}
if uint64(stats.Connected) < hm.minPeerCount {
hm.log.Error("peer count is below minimum", "connected", stats.Connected, "minPeerCount", hm.minPeerCount)
return false
}
return true
}
package health
import (
"context"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/suite"
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
const (
unhealthyPeerCount = 0
minPeerCount = 1
healthyPeerCount = 2
blockTime = 2
)
type HealthMonitorTestSuite struct {
suite.Suite
log log.Logger
rc *testutils.MockRollupClient
pc *p2pMocks.API
interval uint64
safeInterval uint64
minPeerCount uint64
rollupCfg *rollup.Config
monitor HealthMonitor
}
func (s *HealthMonitorTestSuite) SetupSuite() {
s.log = testlog.Logger(s.T(), log.LvlInfo)
s.rc = &testutils.MockRollupClient{}
s.pc = &p2pMocks.API{}
s.interval = 1
s.safeInterval = 5
s.minPeerCount = minPeerCount
s.rollupCfg = &rollup.Config{
BlockTime: blockTime,
}
}
func (s *HealthMonitorTestSuite) SetupTest() {
s.monitor = NewSequencerHealthMonitor(s.log, s.interval, s.safeInterval, s.minPeerCount, s.rollupCfg, s.rc, s.pc)
err := s.monitor.Start()
s.NoError(err)
}
func (s *HealthMonitorTestSuite) TearDownTest() {
err := s.monitor.Stop()
s.NoError(err)
}
func (s *HealthMonitorTestSuite) TestUnhealthyLowPeerCount() {
now := uint64(time.Now().Unix())
ss1 := &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Time: now - 1,
},
SafeL2: eth.L2BlockRef{
Time: now - 2,
},
}
s.rc.ExpectSyncStatus(ss1, nil)
ps1 := &p2p.PeerStats{
Connected: unhealthyPeerCount,
}
s.pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(1)
healthUpdateCh := s.monitor.Subscribe()
healthy := <-healthUpdateCh
s.False(healthy)
}
func (s *HealthMonitorTestSuite) TestUnhealthyUnsafeHeadNotProgressing() {
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
s.pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(3)
now := uint64(time.Now().Unix())
ss1 := &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Time: now - 1,
},
SafeL2: eth.L2BlockRef{
Time: now - 2,
},
}
s.rc.ExpectSyncStatus(ss1, nil)
s.rc.ExpectSyncStatus(ss1, nil)
s.rc.ExpectSyncStatus(ss1, nil)
healthUpdateCh := s.monitor.Subscribe()
for i := 0; i < 3; i++ {
healthy := <-healthUpdateCh
if i < 2 {
s.True(healthy)
} else {
s.False(healthy)
}
}
}
func (s *HealthMonitorTestSuite) TestUnhealthySafeHeadNotProgressing() {
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
s.pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(6)
now := uint64(time.Now().Unix())
syncStatusGenerator := func(unsafeTime uint64) *eth.SyncStatus {
return &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Time: unsafeTime,
},
SafeL2: eth.L2BlockRef{
Time: now,
},
}
}
s.rc.ExpectSyncStatus(syncStatusGenerator(now), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+2), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+2), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+4), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+4), nil)
healthUpdateCh := s.monitor.Subscribe()
for i := 0; i < 6; i++ {
healthy := <-healthUpdateCh
if i < 5 {
s.True(healthy)
} else {
s.False(healthy)
}
}
}
func TestHealthMonitor(t *testing.T) {
suite.Run(t, new(HealthMonitorTestSuite))
}
This diff is collapsed.
package p2p
package p2p_test
import (
"context"
......@@ -8,15 +8,6 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
//nolint:all
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
testlog "github.com/ethereum-optimism/optimism/op-service/testlog"
log "github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
......@@ -26,11 +17,20 @@ import (
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
tswarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
//nolint:all
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
tswarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/clock"
testlog "github.com/ethereum-optimism/optimism/op-service/testlog"
)
// PeerScoresTestSuite tests peer parameterization.
......@@ -86,7 +86,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h
type discriminatingAppScorer struct {
badPeer peer.ID
NoopApplicationScorer
p2p.NoopApplicationScorer
}
func (d *discriminatingAppScorer) ApplicationScore(id peer.ID) float64 {
......@@ -112,11 +112,11 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore, 1*time.Hour)
require.NoError(testSuite.T(), err)
scorer := NewScorer(
scorer := p2p.NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)},
extPeerStore, testSuite.mockMetricer, &discriminatingAppScorer{badPeer: hosts[0].ID()}, logger)
opts = append(opts, ConfigurePeerScoring(&Config{
ScoringParams: &ScoringParams{
opts = append(opts, p2p.ConfigurePeerScoring(&p2p.Config{
ScoringParams: &p2p.ScoringParams{
PeerScoring: pubsub.PeerScoreParams{
AppSpecificWeight: 1,
DecayInterval: time.Second,
......
......@@ -5,11 +5,11 @@ import (
"net"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
)
type PeerInfo struct {
......@@ -20,8 +20,8 @@ type PeerInfo struct {
ENR string `json:"ENR"` // might not always be known, e.g. if the peer connected us instead of us discovering them
Addresses []string `json:"addresses"` // multi-addresses. may be mix of LAN / docker / external IPs. All of them are communicated.
Protocols []string `json:"protocols"` // negotiated protocols list
//GossipScore float64
//PeerScore float64
// GossipScore float64
// PeerScore float64
Connectedness network.Connectedness `json:"connectedness"` // "NotConnected", "Connected", "CanConnect" (gracefully disconnected), or "CannotConnect" (tried but failed)
Direction network.Direction `json:"direction"` // "Unknown", "Inbound" (if the peer contacted us), "Outbound" (if we connected to them)
Protected bool `json:"protected"` // Protected peers do not get
......@@ -41,6 +41,7 @@ type PeerDump struct {
BannedSubnets []*net.IPNet `json:"bannedSubnets"`
}
//go:generate mockery --name API --output mocks/ --with-expecter=true
type API interface {
Self(ctx context.Context) (*PeerInfo, error)
Peers(ctx context.Context, connected bool) (*PeerDump, error)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment