Commit 0e4364f7 authored by Matthew Slipper's avatar Matthew Slipper

code review updates

parent b2d7a73e
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/libp2p/go-libp2p-core/metrics" libp2pmetrics "github.com/libp2p/go-libp2p-core/metrics"
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
...@@ -389,7 +389,23 @@ func (m *Metrics) RecordGossipEvent(evType int32) { ...@@ -389,7 +389,23 @@ func (m *Metrics) RecordGossipEvent(evType int32) {
m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc() m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc()
} }
func (m *Metrics) RecordBandwidth(ctx context.Context, bwc *metrics.BandwidthCounter) { func (m *Metrics) IncPeerCount() {
m.PeerCount.Inc()
}
func (m *Metrics) DecPeerCount() {
m.PeerCount.Dec()
}
func (m *Metrics) IncStreamCount() {
m.StreamCount.Inc()
}
func (m *Metrics) DecStreamCount() {
m.StreamCount.Dec()
}
func (m *Metrics) RecordBandwidth(ctx context.Context, bwc *libp2pmetrics.BandwidthCounter) {
tick := time.NewTicker(10 * time.Second) tick := time.NewTicker(10 * time.Second)
defer tick.Stop() defer tick.Stop()
......
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/golang/snappy" "github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
...@@ -47,6 +46,10 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0} ...@@ -47,6 +46,10 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
const MaxGossipSize = 1 << 20 const MaxGossipSize = 1 << 20
type GossipMetricer interface {
RecordGossipEvent(evType int32)
}
func blocksTopicV1(cfg *rollup.Config) string { func blocksTopicV1(cfg *rollup.Config) string {
return fmt.Sprintf("/optimism/%s/0/blocks", cfg.L2ChainID.String()) return fmt.Sprintf("/optimism/%s/0/blocks", cfg.L2ChainID.String())
} }
...@@ -116,7 +119,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams { ...@@ -116,7 +119,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
return params return params
} }
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, metrics *metrics.Metrics) (*pubsub.PubSub, error) { func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, r gossipMetrics) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second) denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -133,7 +136,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, metri ...@@ -133,7 +136,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, metri
pubsub.WithPeerExchange(false), pubsub.WithPeerExchange(false),
pubsub.WithBlacklist(denyList), pubsub.WithBlacklist(denyList),
pubsub.WithGossipSubParams(BuildGlobalGossipParams(cfg)), pubsub.WithGossipSubParams(BuildGlobalGossipParams(cfg)),
pubsub.WithEventTracer(&gossipTracer{m: metrics}), pubsub.WithEventTracer(&gossipTracer{r: r}),
) )
// TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores // TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores
} }
...@@ -445,9 +448,11 @@ func LogTopicEvents(ctx context.Context, log log.Logger, evHandler *pubsub.Topic ...@@ -445,9 +448,11 @@ func LogTopicEvents(ctx context.Context, log log.Logger, evHandler *pubsub.Topic
} }
type gossipTracer struct { type gossipTracer struct {
m *metrics.Metrics r GossipMetricer
} }
func (g *gossipTracer) Trace(evt *pb.TraceEvent) { func (g *gossipTracer) Trace(evt *pb.TraceEvent) {
g.m.RecordGossipEvent(int32(*evt.Type)) if g.r != nil {
g.r.RecordGossipEvent(int32(*evt.Type))
}
} }
...@@ -8,11 +8,16 @@ import ( ...@@ -8,11 +8,16 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// TODO: add metrics here as well type NotificationsMetricer interface {
IncPeerCount()
DecPeerCount()
IncStreamCount()
DecStreamCount()
}
type notifications struct { type notifications struct {
log log.Logger log log.Logger
m *metrics.Metrics m NotificationsMetricer
} }
func (notif *notifications) Listen(n network.Network, a ma.Multiaddr) { func (notif *notifications) Listen(n network.Network, a ma.Multiaddr) {
...@@ -22,20 +27,28 @@ func (notif *notifications) ListenClose(n network.Network, a ma.Multiaddr) { ...@@ -22,20 +27,28 @@ func (notif *notifications) ListenClose(n network.Network, a ma.Multiaddr) {
notif.log.Info("stopped listening network address", "addr", a) notif.log.Info("stopped listening network address", "addr", a)
} }
func (notif *notifications) Connected(n network.Network, v network.Conn) { func (notif *notifications) Connected(n network.Network, v network.Conn) {
notif.m.PeerCount.Inc() if notif.m != nil {
notif.m.IncPeerCount()
}
notif.log.Info("connected to peer", "peer", v.RemotePeer(), "addr", v.RemoteMultiaddr()) notif.log.Info("connected to peer", "peer", v.RemotePeer(), "addr", v.RemoteMultiaddr())
} }
func (notif *notifications) Disconnected(n network.Network, v network.Conn) { func (notif *notifications) Disconnected(n network.Network, v network.Conn) {
notif.m.PeerCount.Dec() if notif.m != nil {
notif.m.DecPeerCount()
}
notif.log.Info("disconnected from peer", "peer", v.RemotePeer(), "addr", v.RemoteMultiaddr()) notif.log.Info("disconnected from peer", "peer", v.RemotePeer(), "addr", v.RemoteMultiaddr())
} }
func (notif *notifications) OpenedStream(n network.Network, v network.Stream) { func (notif *notifications) OpenedStream(n network.Network, v network.Stream) {
notif.m.StreamCount.Inc() if notif.m != nil {
notif.m.IncStreamCount()
}
c := v.Conn() c := v.Conn()
notif.log.Trace("opened stream", "protocol", v.Protocol(), "peer", c.RemotePeer(), "addr", c.RemoteMultiaddr()) notif.log.Trace("opened stream", "protocol", v.Protocol(), "peer", c.RemotePeer(), "addr", c.RemoteMultiaddr())
} }
func (notif *notifications) ClosedStream(n network.Network, v network.Stream) { func (notif *notifications) ClosedStream(n network.Network, v network.Stream) {
notif.m.StreamCount.Dec() if notif.m != nil {
notif.m.DecStreamCount()
}
c := v.Conn() c := v.Conn()
notif.log.Trace("opened stream", "protocol", v.Protocol(), "peer", c.RemotePeer(), "addr", c.RemoteMultiaddr()) notif.log.Trace("opened stream", "protocol", v.Protocol(), "peer", c.RemotePeer(), "addr", c.RemoteMultiaddr())
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment