Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
cc738b3d
Unverified
Commit
cc738b3d
authored
Oct 18, 2022
by
mergify[bot]
Committed by
GitHub
Oct 18, 2022
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3707 from ethereum-optimism/feat/p2p-metrics
op-node: Add P2P metrics
parents
149d4f01
65f9aaec
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
242 additions
and
28 deletions
+242
-28
metrics.go
op-node/metrics/metrics.go
+174
-0
node.go
op-node/node/node.go
+1
-1
config.go
op-node/p2p/config.go
+1
-3
gossip.go
op-node/p2p/gossip.go
+16
-1
host.go
op-node/p2p/host.go
+3
-2
host_test.go
op-node/p2p/host_test.go
+8
-9
node.go
op-node/p2p/node.go
+14
-6
notifications.go
op-node/p2p/notifications.go
+17
-3
prepared.go
op-node/p2p/prepared.go
+2
-1
rpc_server.go
op-node/p2p/rpc_server.go
+6
-2
No files found.
op-node/metrics/metrics.go
View file @
cc738b3d
...
@@ -10,6 +10,8 @@ import (
...
@@ -10,6 +10,8 @@ import (
"strconv"
"strconv"
"time"
"time"
libp2pmetrics
"github.com/libp2p/go-libp2p-core/metrics"
pb
"github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promauto"
...
@@ -30,6 +32,32 @@ const (
...
@@ -30,6 +32,32 @@ const (
BatchMethod
=
"<batch>"
BatchMethod
=
"<batch>"
)
)
type
Metricer
interface
{
RecordInfo
(
version
string
)
RecordUp
()
RecordRPCServerRequest
(
method
string
)
func
()
RecordRPCClientRequest
(
method
string
)
func
(
err
error
)
RecordRPCClientResponse
(
method
string
,
err
error
)
SetDerivationIdle
(
status
bool
)
RecordPipelineReset
()
RecordSequencingError
()
RecordPublishingError
()
RecordDerivationError
()
RecordReceivedUnsafePayload
(
payload
*
eth
.
ExecutionPayload
)
recordRef
(
layer
string
,
name
string
,
num
uint64
,
timestamp
uint64
,
h
common
.
Hash
)
RecordL1Ref
(
name
string
,
ref
eth
.
L1BlockRef
)
RecordL2Ref
(
name
string
,
ref
eth
.
L2BlockRef
)
RecordUnsafePayloadsBuffer
(
length
uint64
,
memSize
uint64
,
next
eth
.
BlockID
)
CountSequencedTxs
(
count
int
)
RecordL1ReorgDepth
(
d
uint64
)
RecordGossipEvent
(
evType
int32
)
IncPeerCount
()
DecPeerCount
()
IncStreamCount
()
DecStreamCount
()
RecordBandwidth
(
ctx
context
.
Context
,
bwc
*
libp2pmetrics
.
BandwidthCounter
)
}
type
Metrics
struct
{
type
Metrics
struct
{
Info
*
prometheus
.
GaugeVec
Info
*
prometheus
.
GaugeVec
Up
prometheus
.
Gauge
Up
prometheus
.
Gauge
...
@@ -67,6 +95,12 @@ type Metrics struct {
...
@@ -67,6 +95,12 @@ type Metrics struct {
TransactionsSequencedTotal
prometheus
.
Counter
TransactionsSequencedTotal
prometheus
.
Counter
// P2P Metrics
PeerCount
prometheus
.
Gauge
StreamCount
prometheus
.
Gauge
GossipEventsTotal
*
prometheus
.
CounterVec
BandwidthTotal
*
prometheus
.
GaugeVec
registry
*
prometheus
.
Registry
registry
*
prometheus
.
Registry
}
}
...
@@ -217,6 +251,35 @@ func NewMetrics(procName string) *Metrics {
...
@@ -217,6 +251,35 @@ func NewMetrics(procName string) *Metrics {
Help
:
"Count of total transactions sequenced"
,
Help
:
"Count of total transactions sequenced"
,
}),
}),
PeerCount
:
promauto
.
With
(
registry
)
.
NewGauge
(
prometheus
.
GaugeOpts
{
Namespace
:
ns
,
Subsystem
:
"p2p"
,
Name
:
"peer_count"
,
Help
:
"Count of currently connected p2p peers"
,
}),
StreamCount
:
promauto
.
With
(
registry
)
.
NewGauge
(
prometheus
.
GaugeOpts
{
Namespace
:
ns
,
Subsystem
:
"p2p"
,
Name
:
"stream_count"
,
Help
:
"Count of currently connected p2p streams"
,
}),
GossipEventsTotal
:
promauto
.
With
(
registry
)
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Namespace
:
ns
,
Subsystem
:
"p2p"
,
Name
:
"gossip_events_total"
,
Help
:
"Count of gossip events by type"
,
},
[]
string
{
"type"
,
}),
BandwidthTotal
:
promauto
.
With
(
registry
)
.
NewGaugeVec
(
prometheus
.
GaugeOpts
{
Namespace
:
ns
,
Subsystem
:
"p2p"
,
Name
:
"bandwidth_bytes_total"
,
Help
:
"P2P bandwidth by direction"
,
},
[]
string
{
"direction"
,
}),
registry
:
registry
,
registry
:
registry
,
}
}
}
}
...
@@ -348,6 +411,42 @@ func (m *Metrics) RecordL1ReorgDepth(d uint64) {
...
@@ -348,6 +411,42 @@ func (m *Metrics) RecordL1ReorgDepth(d uint64) {
m
.
L1ReorgDepth
.
Observe
(
float64
(
d
))
m
.
L1ReorgDepth
.
Observe
(
float64
(
d
))
}
}
func
(
m
*
Metrics
)
RecordGossipEvent
(
evType
int32
)
{
m
.
GossipEventsTotal
.
WithLabelValues
(
pb
.
TraceEvent_Type_name
[
evType
])
.
Inc
()
}
func
(
m
*
Metrics
)
IncPeerCount
()
{
m
.
PeerCount
.
Inc
()
}
func
(
m
*
Metrics
)
DecPeerCount
()
{
m
.
PeerCount
.
Dec
()
}
func
(
m
*
Metrics
)
IncStreamCount
()
{
m
.
StreamCount
.
Inc
()
}
func
(
m
*
Metrics
)
DecStreamCount
()
{
m
.
StreamCount
.
Dec
()
}
func
(
m
*
Metrics
)
RecordBandwidth
(
ctx
context
.
Context
,
bwc
*
libp2pmetrics
.
BandwidthCounter
)
{
tick
:=
time
.
NewTicker
(
10
*
time
.
Second
)
defer
tick
.
Stop
()
for
{
select
{
case
<-
tick
.
C
:
bwTotals
:=
bwc
.
GetBandwidthTotals
()
m
.
BandwidthTotal
.
WithLabelValues
(
"in"
)
.
Set
(
float64
(
bwTotals
.
TotalIn
))
m
.
BandwidthTotal
.
WithLabelValues
(
"out"
)
.
Set
(
float64
(
bwTotals
.
TotalOut
))
case
<-
ctx
.
Done
()
:
return
}
}
}
// Serve starts the metrics server on the given hostname and port.
// Serve starts the metrics server on the given hostname and port.
// The server will be closed when the passed-in context is cancelled.
// The server will be closed when the passed-in context is cancelled.
func
(
m
*
Metrics
)
Serve
(
ctx
context
.
Context
,
hostname
string
,
port
int
)
error
{
func
(
m
*
Metrics
)
Serve
(
ctx
context
.
Context
,
hostname
string
,
port
int
)
error
{
...
@@ -364,3 +463,78 @@ func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
...
@@ -364,3 +463,78 @@ func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
}()
}()
return
server
.
ListenAndServe
()
return
server
.
ListenAndServe
()
}
}
type
noopMetricer
struct
{}
var
NoopMetrics
=
new
(
noopMetricer
)
func
(
n
*
noopMetricer
)
RecordInfo
(
version
string
)
{
}
func
(
n
*
noopMetricer
)
RecordUp
()
{
}
func
(
n
*
noopMetricer
)
RecordRPCServerRequest
(
method
string
)
func
()
{
return
func
()
{}
}
func
(
n
*
noopMetricer
)
RecordRPCClientRequest
(
method
string
)
func
(
err
error
)
{
return
func
(
err
error
)
{}
}
func
(
n
*
noopMetricer
)
RecordRPCClientResponse
(
method
string
,
err
error
)
{
}
func
(
n
*
noopMetricer
)
SetDerivationIdle
(
status
bool
)
{
}
func
(
n
*
noopMetricer
)
RecordPipelineReset
()
{
}
func
(
n
*
noopMetricer
)
RecordSequencingError
()
{
}
func
(
n
*
noopMetricer
)
RecordPublishingError
()
{
}
func
(
n
*
noopMetricer
)
RecordDerivationError
()
{
}
func
(
n
*
noopMetricer
)
RecordReceivedUnsafePayload
(
payload
*
eth
.
ExecutionPayload
)
{
}
func
(
n
*
noopMetricer
)
recordRef
(
layer
string
,
name
string
,
num
uint64
,
timestamp
uint64
,
h
common
.
Hash
)
{
}
func
(
n
*
noopMetricer
)
RecordL1Ref
(
name
string
,
ref
eth
.
L1BlockRef
)
{
}
func
(
n
*
noopMetricer
)
RecordL2Ref
(
name
string
,
ref
eth
.
L2BlockRef
)
{
}
func
(
n
*
noopMetricer
)
RecordUnsafePayloadsBuffer
(
length
uint64
,
memSize
uint64
,
next
eth
.
BlockID
)
{
}
func
(
n
*
noopMetricer
)
CountSequencedTxs
(
count
int
)
{
}
func
(
n
*
noopMetricer
)
RecordL1ReorgDepth
(
d
uint64
)
{
}
func
(
n
*
noopMetricer
)
RecordGossipEvent
(
evType
int32
)
{
}
func
(
n
*
noopMetricer
)
IncPeerCount
()
{
}
func
(
n
*
noopMetricer
)
DecPeerCount
()
{
}
func
(
n
*
noopMetricer
)
IncStreamCount
()
{
}
func
(
n
*
noopMetricer
)
DecStreamCount
()
{
}
func
(
n
*
noopMetricer
)
RecordBandwidth
(
ctx
context
.
Context
,
bwc
*
libp2pmetrics
.
BandwidthCounter
)
{
}
op-node/node/node.go
View file @
cc738b3d
...
@@ -196,7 +196,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
...
@@ -196,7 +196,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
func
(
n
*
OpNode
)
initP2P
(
ctx
context
.
Context
,
cfg
*
Config
)
error
{
func
(
n
*
OpNode
)
initP2P
(
ctx
context
.
Context
,
cfg
*
Config
)
error
{
if
cfg
.
P2P
!=
nil
{
if
cfg
.
P2P
!=
nil
{
p2pNode
,
err
:=
p2p
.
NewNodeP2P
(
n
.
resourcesCtx
,
&
cfg
.
Rollup
,
n
.
log
,
cfg
.
P2P
,
n
)
p2pNode
,
err
:=
p2p
.
NewNodeP2P
(
n
.
resourcesCtx
,
&
cfg
.
Rollup
,
n
.
log
,
cfg
.
P2P
,
n
,
n
.
metrics
)
if
err
!=
nil
||
p2pNode
==
nil
{
if
err
!=
nil
||
p2pNode
==
nil
{
return
err
return
err
}
}
...
...
op-node/p2p/config.go
View file @
cc738b3d
...
@@ -41,7 +41,7 @@ import (
...
@@ -41,7 +41,7 @@ import (
type
SetupP2P
interface
{
type
SetupP2P
interface
{
Check
()
error
Check
()
error
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host
(
log
log
.
Logger
)
(
host
.
Host
,
error
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
)
(
host
.
Host
,
error
)
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery
(
log
log
.
Logger
,
rollupCfg
*
rollup
.
Config
,
tcpPort
uint16
)
(
*
enode
.
LocalNode
,
*
discover
.
UDPv5
,
error
)
Discovery
(
log
log
.
Logger
,
rollupCfg
*
rollup
.
Config
,
tcpPort
uint16
)
(
*
enode
.
LocalNode
,
*
discover
.
UDPv5
,
error
)
TargetPeers
()
uint
TargetPeers
()
uint
...
@@ -91,8 +91,6 @@ type Config struct {
...
@@ -91,8 +91,6 @@ type Config struct {
ConnGater
func
(
conf
*
Config
)
(
connmgr
.
ConnectionGater
,
error
)
ConnGater
func
(
conf
*
Config
)
(
connmgr
.
ConnectionGater
,
error
)
ConnMngr
func
(
conf
*
Config
)
(
connmgr
.
ConnManager
,
error
)
ConnMngr
func
(
conf
*
Config
)
(
connmgr
.
ConnManager
,
error
)
// nil to disable bandwidth metrics
BandwidthMetrics
metrics
.
Reporter
}
}
type
ConnectionGater
interface
{
type
ConnectionGater
interface
{
...
...
op-node/p2p/gossip.go
View file @
cc738b3d
...
@@ -46,6 +46,10 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
...
@@ -46,6 +46,10 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
const
MaxGossipSize
=
1
<<
20
const
MaxGossipSize
=
1
<<
20
type
GossipMetricer
interface
{
RecordGossipEvent
(
evType
int32
)
}
func
blocksTopicV1
(
cfg
*
rollup
.
Config
)
string
{
func
blocksTopicV1
(
cfg
*
rollup
.
Config
)
string
{
return
fmt
.
Sprintf
(
"/optimism/%s/0/blocks"
,
cfg
.
L2ChainID
.
String
())
return
fmt
.
Sprintf
(
"/optimism/%s/0/blocks"
,
cfg
.
L2ChainID
.
String
())
}
}
...
@@ -115,7 +119,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
...
@@ -115,7 +119,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
return
params
return
params
}
}
func
NewGossipSub
(
p2pCtx
context
.
Context
,
h
host
.
Host
,
cfg
*
rollup
.
Config
)
(
*
pubsub
.
PubSub
,
error
)
{
func
NewGossipSub
(
p2pCtx
context
.
Context
,
h
host
.
Host
,
cfg
*
rollup
.
Config
,
m
GossipMetricer
)
(
*
pubsub
.
PubSub
,
error
)
{
denyList
,
err
:=
pubsub
.
NewTimeCachedBlacklist
(
30
*
time
.
Second
)
denyList
,
err
:=
pubsub
.
NewTimeCachedBlacklist
(
30
*
time
.
Second
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
...
@@ -132,6 +136,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config) (*pub
...
@@ -132,6 +136,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config) (*pub
pubsub
.
WithPeerExchange
(
false
),
pubsub
.
WithPeerExchange
(
false
),
pubsub
.
WithBlacklist
(
denyList
),
pubsub
.
WithBlacklist
(
denyList
),
pubsub
.
WithGossipSubParams
(
BuildGlobalGossipParams
(
cfg
)),
pubsub
.
WithGossipSubParams
(
BuildGlobalGossipParams
(
cfg
)),
pubsub
.
WithEventTracer
(
&
gossipTracer
{
m
:
m
}),
)
)
// TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores
// TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores
}
}
...
@@ -441,3 +446,13 @@ func LogTopicEvents(ctx context.Context, log log.Logger, evHandler *pubsub.Topic
...
@@ -441,3 +446,13 @@ func LogTopicEvents(ctx context.Context, log log.Logger, evHandler *pubsub.Topic
}
}
}
}
}
}
type
gossipTracer
struct
{
m
GossipMetricer
}
func
(
g
*
gossipTracer
)
Trace
(
evt
*
pb
.
TraceEvent
)
{
if
g
.
m
!=
nil
{
g
.
m
.
RecordGossipEvent
(
int32
(
*
evt
.
Type
))
}
}
op-node/p2p/host.go
View file @
cc738b3d
...
@@ -8,6 +8,7 @@ import (
...
@@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
lconf
"github.com/libp2p/go-libp2p/config"
lconf
"github.com/libp2p/go-libp2p/config"
...
@@ -41,7 +42,7 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager {
...
@@ -41,7 +42,7 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager {
var
_
ExtraHostFeatures
=
(
*
extraHost
)(
nil
)
var
_
ExtraHostFeatures
=
(
*
extraHost
)(
nil
)
func
(
conf
*
Config
)
Host
(
log
log
.
Logger
)
(
host
.
Host
,
error
)
{
func
(
conf
*
Config
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
)
(
host
.
Host
,
error
)
{
if
conf
.
DisableP2P
{
if
conf
.
DisableP2P
{
return
nil
,
nil
return
nil
,
nil
}
}
...
@@ -115,7 +116,7 @@ func (conf *Config) Host(log log.Logger) (host.Host, error) {
...
@@ -115,7 +116,7 @@ func (conf *Config) Host(log log.Logger) (host.Host, error) {
ResourceManager
:
nil
,
// TODO use resource manager interface to manage resources per peer better.
ResourceManager
:
nil
,
// TODO use resource manager interface to manage resources per peer better.
NATManager
:
nat
,
NATManager
:
nat
,
Peerstore
:
ps
,
Peerstore
:
ps
,
Reporter
:
conf
.
BandwidthMetrics
,
// may be nil if disabled
Reporter
:
reporter
,
// may be nil if disabled
MultiaddrResolver
:
madns
.
DefaultResolver
,
MultiaddrResolver
:
madns
.
DefaultResolver
,
// Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
// Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
DisablePing
:
false
,
DisablePing
:
false
,
...
...
op-node/p2p/host_test.go
View file @
cc738b3d
...
@@ -21,7 +21,6 @@ import (
...
@@ -21,7 +21,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
...
@@ -65,10 +64,10 @@ func TestingConfig(t *testing.T) *Config {
...
@@ -65,10 +64,10 @@ func TestingConfig(t *testing.T) *Config {
func
TestP2PSimple
(
t
*
testing
.
T
)
{
func
TestP2PSimple
(
t
*
testing
.
T
)
{
confA
:=
TestingConfig
(
t
)
confA
:=
TestingConfig
(
t
)
confB
:=
TestingConfig
(
t
)
confB
:=
TestingConfig
(
t
)
hostA
,
err
:=
confA
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"A"
))
hostA
,
err
:=
confA
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"A"
)
,
nil
)
require
.
NoError
(
t
,
err
,
"failed to launch host A"
)
require
.
NoError
(
t
,
err
,
"failed to launch host A"
)
defer
hostA
.
Close
()
defer
hostA
.
Close
()
hostB
,
err
:=
confB
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"B"
))
hostB
,
err
:=
confB
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"B"
)
,
nil
)
require
.
NoError
(
t
,
err
,
"failed to launch host B"
)
require
.
NoError
(
t
,
err
,
"failed to launch host B"
)
defer
hostB
.
Close
()
defer
hostB
.
Close
()
err
=
hostA
.
Connect
(
context
.
Background
(),
peer
.
AddrInfo
{
ID
:
hostB
.
ID
(),
Addrs
:
hostB
.
Addrs
()})
err
=
hostA
.
Connect
(
context
.
Background
(),
peer
.
AddrInfo
{
ID
:
hostB
.
ID
(),
Addrs
:
hostB
.
Addrs
()})
...
@@ -132,7 +131,7 @@ func TestP2PFull(t *testing.T) {
...
@@ -132,7 +131,7 @@ func TestP2PFull(t *testing.T) {
// TODO: maybe swap the order of sec/mux preferences, to test that negotiation works
// TODO: maybe swap the order of sec/mux preferences, to test that negotiation works
logA
:=
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"A"
)
logA
:=
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"A"
)
nodeA
,
err
:=
NewNodeP2P
(
context
.
Background
(),
&
rollup
.
Config
{},
logA
,
&
confA
,
&
mockGossipIn
{})
nodeA
,
err
:=
NewNodeP2P
(
context
.
Background
(),
&
rollup
.
Config
{},
logA
,
&
confA
,
&
mockGossipIn
{}
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
nodeA
.
Close
()
defer
nodeA
.
Close
()
...
@@ -143,7 +142,7 @@ func TestP2PFull(t *testing.T) {
...
@@ -143,7 +142,7 @@ func TestP2PFull(t *testing.T) {
conns
<-
conn
conns
<-
conn
}})
}})
backend
:=
NewP2PAPIBackend
(
nodeA
,
logA
,
metrics
.
NewMetrics
(
""
)
)
backend
:=
NewP2PAPIBackend
(
nodeA
,
logA
,
nil
)
srv
:=
rpc
.
NewServer
()
srv
:=
rpc
.
NewServer
()
require
.
NoError
(
t
,
srv
.
RegisterName
(
"opp2p"
,
backend
))
require
.
NoError
(
t
,
srv
.
RegisterName
(
"opp2p"
,
backend
))
client
:=
rpc
.
DialInProc
(
srv
)
client
:=
rpc
.
DialInProc
(
srv
)
...
@@ -155,7 +154,7 @@ func TestP2PFull(t *testing.T) {
...
@@ -155,7 +154,7 @@ func TestP2PFull(t *testing.T) {
logB
:=
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"B"
)
logB
:=
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"B"
)
nodeB
,
err
:=
NewNodeP2P
(
context
.
Background
(),
&
rollup
.
Config
{},
logB
,
&
confB
,
&
mockGossipIn
{})
nodeB
,
err
:=
NewNodeP2P
(
context
.
Background
(),
&
rollup
.
Config
{},
logB
,
&
confB
,
&
mockGossipIn
{}
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
nodeB
.
Close
()
defer
nodeB
.
Close
()
hostB
:=
nodeB
.
Host
()
hostB
:=
nodeB
.
Host
()
...
@@ -289,7 +288,7 @@ func TestDiscovery(t *testing.T) {
...
@@ -289,7 +288,7 @@ func TestDiscovery(t *testing.T) {
resourcesCtx
,
resourcesCancel
:=
context
.
WithCancel
(
context
.
Background
())
resourcesCtx
,
resourcesCancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
resourcesCancel
()
defer
resourcesCancel
()
nodeA
,
err
:=
NewNodeP2P
(
context
.
Background
(),
rollupCfg
,
logA
,
&
confA
,
&
mockGossipIn
{})
nodeA
,
err
:=
NewNodeP2P
(
context
.
Background
(),
rollupCfg
,
logA
,
&
confA
,
&
mockGossipIn
{}
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
nodeA
.
Close
()
defer
nodeA
.
Close
()
hostA
:=
nodeA
.
Host
()
hostA
:=
nodeA
.
Host
()
...
@@ -304,7 +303,7 @@ func TestDiscovery(t *testing.T) {
...
@@ -304,7 +303,7 @@ func TestDiscovery(t *testing.T) {
confB
.
DiscoveryDB
=
discDBC
confB
.
DiscoveryDB
=
discDBC
// Start B
// Start B
nodeB
,
err
:=
NewNodeP2P
(
context
.
Background
(),
rollupCfg
,
logB
,
&
confB
,
&
mockGossipIn
{})
nodeB
,
err
:=
NewNodeP2P
(
context
.
Background
(),
rollupCfg
,
logB
,
&
confB
,
&
mockGossipIn
{}
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
nodeB
.
Close
()
defer
nodeB
.
Close
()
hostB
:=
nodeB
.
Host
()
hostB
:=
nodeB
.
Host
()
...
@@ -319,7 +318,7 @@ func TestDiscovery(t *testing.T) {
...
@@ -319,7 +318,7 @@ func TestDiscovery(t *testing.T) {
}})
}})
// Start C
// Start C
nodeC
,
err
:=
NewNodeP2P
(
context
.
Background
(),
rollupCfg
,
logC
,
&
confC
,
&
mockGossipIn
{})
nodeC
,
err
:=
NewNodeP2P
(
context
.
Background
(),
rollupCfg
,
logC
,
&
confC
,
&
mockGossipIn
{}
,
nil
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
defer
nodeC
.
Close
()
defer
nodeC
.
Close
()
hostC
:=
nodeC
.
Host
()
hostC
:=
nodeC
.
Host
()
...
...
op-node/p2p/node.go
View file @
cc738b3d
...
@@ -6,9 +6,11 @@ import (
...
@@ -6,9 +6,11 @@ import (
"fmt"
"fmt"
"strconv"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/host"
p2pmetrics
"github.com/libp2p/go-libp2p-core/metrics"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma
"github.com/multiformats/go-multiaddr"
ma
"github.com/multiformats/go-multiaddr"
...
@@ -30,12 +32,12 @@ type NodeP2P struct {
...
@@ -30,12 +32,12 @@ type NodeP2P struct {
gsOut
GossipOut
// p2p gossip application interface for publishing
gsOut
GossipOut
// p2p gossip application interface for publishing
}
}
func
NewNodeP2P
(
resourcesCtx
context
.
Context
,
rollupCfg
*
rollup
.
Config
,
log
log
.
Logger
,
setup
SetupP2P
,
gossipIn
GossipIn
)
(
*
NodeP2P
,
error
)
{
func
NewNodeP2P
(
resourcesCtx
context
.
Context
,
rollupCfg
*
rollup
.
Config
,
log
log
.
Logger
,
setup
SetupP2P
,
gossipIn
GossipIn
,
metrics
metrics
.
Metricer
)
(
*
NodeP2P
,
error
)
{
if
setup
==
nil
{
if
setup
==
nil
{
return
nil
,
errors
.
New
(
"p2p node cannot be created without setup"
)
return
nil
,
errors
.
New
(
"p2p node cannot be created without setup"
)
}
}
var
n
NodeP2P
var
n
NodeP2P
if
err
:=
n
.
init
(
resourcesCtx
,
rollupCfg
,
log
,
setup
,
gossipIn
);
err
!=
nil
{
if
err
:=
n
.
init
(
resourcesCtx
,
rollupCfg
,
log
,
setup
,
gossipIn
,
metrics
);
err
!=
nil
{
closeErr
:=
n
.
Close
()
closeErr
:=
n
.
Close
()
if
closeErr
!=
nil
{
if
closeErr
!=
nil
{
log
.
Error
(
"failed to close p2p after starting with err"
,
"closeErr"
,
closeErr
,
"err"
,
err
)
log
.
Error
(
"failed to close p2p after starting with err"
,
"closeErr"
,
closeErr
,
"err"
,
err
)
...
@@ -48,10 +50,12 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
...
@@ -48,10 +50,12 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
return
&
n
,
nil
return
&
n
,
nil
}
}
func
(
n
*
NodeP2P
)
init
(
resourcesCtx
context
.
Context
,
rollupCfg
*
rollup
.
Config
,
log
log
.
Logger
,
setup
SetupP2P
,
gossipIn
GossipIn
)
error
{
func
(
n
*
NodeP2P
)
init
(
resourcesCtx
context
.
Context
,
rollupCfg
*
rollup
.
Config
,
log
log
.
Logger
,
setup
SetupP2P
,
gossipIn
GossipIn
,
metrics
metrics
.
Metricer
)
error
{
bwc
:=
p2pmetrics
.
NewBandwidthCounter
()
var
err
error
var
err
error
// nil if disabled.
// nil if disabled.
n
.
host
,
err
=
setup
.
Host
(
log
)
n
.
host
,
err
=
setup
.
Host
(
log
,
bwc
)
if
err
!=
nil
{
if
err
!=
nil
{
if
n
.
dv5Udp
!=
nil
{
if
n
.
dv5Udp
!=
nil
{
n
.
dv5Udp
.
Close
()
n
.
dv5Udp
.
Close
()
...
@@ -66,10 +70,10 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
...
@@ -66,10 +70,10 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n
.
connMgr
=
extra
.
ConnectionManager
()
n
.
connMgr
=
extra
.
ConnectionManager
()
}
}
// notify of any new connections/streams/etc.
// notify of any new connections/streams/etc.
n
.
host
.
Network
()
.
Notify
(
NewNetworkNotifier
(
log
))
n
.
host
.
Network
()
.
Notify
(
NewNetworkNotifier
(
log
,
metrics
))
// unregister identify-push handler. Only identifying on dial is fine, and more robust against spam
// unregister identify-push handler. Only identifying on dial is fine, and more robust against spam
n
.
host
.
RemoveStreamHandler
(
identify
.
IDDelta
)
n
.
host
.
RemoveStreamHandler
(
identify
.
IDDelta
)
n
.
gs
,
err
=
NewGossipSub
(
resourcesCtx
,
n
.
host
,
rollupCfg
)
n
.
gs
,
err
=
NewGossipSub
(
resourcesCtx
,
n
.
host
,
rollupCfg
,
metrics
)
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start gossipsub router: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to start gossipsub router: %w"
,
err
)
}
}
...
@@ -90,6 +94,10 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
...
@@ -90,6 +94,10 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start discv5: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to start discv5: %w"
,
err
)
}
}
if
metrics
!=
nil
{
go
metrics
.
RecordBandwidth
(
resourcesCtx
,
bwc
)
}
}
}
return
nil
return
nil
}
}
...
...
op-node/p2p/notifications.go
View file @
cc738b3d
package
p2p
package
p2p
import
(
import
(
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/network"
ma
"github.com/multiformats/go-multiaddr"
ma
"github.com/multiformats/go-multiaddr"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
)
)
// TODO: add metrics here as well
type
NotificationsMetricer
interface
{
IncPeerCount
()
DecPeerCount
()
IncStreamCount
()
DecStreamCount
()
}
type
notifications
struct
{
type
notifications
struct
{
log
log
.
Logger
log
log
.
Logger
m
NotificationsMetricer
}
}
func
(
notif
*
notifications
)
Listen
(
n
network
.
Network
,
a
ma
.
Multiaddr
)
{
func
(
notif
*
notifications
)
Listen
(
n
network
.
Network
,
a
ma
.
Multiaddr
)
{
...
@@ -20,20 +27,27 @@ func (notif *notifications) ListenClose(n network.Network, a ma.Multiaddr) {
...
@@ -20,20 +27,27 @@ func (notif *notifications) ListenClose(n network.Network, a ma.Multiaddr) {
notif
.
log
.
Info
(
"stopped listening network address"
,
"addr"
,
a
)
notif
.
log
.
Info
(
"stopped listening network address"
,
"addr"
,
a
)
}
}
func
(
notif
*
notifications
)
Connected
(
n
network
.
Network
,
v
network
.
Conn
)
{
func
(
notif
*
notifications
)
Connected
(
n
network
.
Network
,
v
network
.
Conn
)
{
notif
.
m
.
IncPeerCount
()
notif
.
log
.
Info
(
"connected to peer"
,
"peer"
,
v
.
RemotePeer
(),
"addr"
,
v
.
RemoteMultiaddr
())
notif
.
log
.
Info
(
"connected to peer"
,
"peer"
,
v
.
RemotePeer
(),
"addr"
,
v
.
RemoteMultiaddr
())
}
}
func
(
notif
*
notifications
)
Disconnected
(
n
network
.
Network
,
v
network
.
Conn
)
{
func
(
notif
*
notifications
)
Disconnected
(
n
network
.
Network
,
v
network
.
Conn
)
{
notif
.
m
.
DecPeerCount
()
notif
.
log
.
Info
(
"disconnected from peer"
,
"peer"
,
v
.
RemotePeer
(),
"addr"
,
v
.
RemoteMultiaddr
())
notif
.
log
.
Info
(
"disconnected from peer"
,
"peer"
,
v
.
RemotePeer
(),
"addr"
,
v
.
RemoteMultiaddr
())
}
}
func
(
notif
*
notifications
)
OpenedStream
(
n
network
.
Network
,
v
network
.
Stream
)
{
func
(
notif
*
notifications
)
OpenedStream
(
n
network
.
Network
,
v
network
.
Stream
)
{
notif
.
m
.
IncStreamCount
()
c
:=
v
.
Conn
()
c
:=
v
.
Conn
()
notif
.
log
.
Trace
(
"opened stream"
,
"protocol"
,
v
.
Protocol
(),
"peer"
,
c
.
RemotePeer
(),
"addr"
,
c
.
RemoteMultiaddr
())
notif
.
log
.
Trace
(
"opened stream"
,
"protocol"
,
v
.
Protocol
(),
"peer"
,
c
.
RemotePeer
(),
"addr"
,
c
.
RemoteMultiaddr
())
}
}
func
(
notif
*
notifications
)
ClosedStream
(
n
network
.
Network
,
v
network
.
Stream
)
{
func
(
notif
*
notifications
)
ClosedStream
(
n
network
.
Network
,
v
network
.
Stream
)
{
notif
.
m
.
DecStreamCount
()
c
:=
v
.
Conn
()
c
:=
v
.
Conn
()
notif
.
log
.
Trace
(
"opened stream"
,
"protocol"
,
v
.
Protocol
(),
"peer"
,
c
.
RemotePeer
(),
"addr"
,
c
.
RemoteMultiaddr
())
notif
.
log
.
Trace
(
"opened stream"
,
"protocol"
,
v
.
Protocol
(),
"peer"
,
c
.
RemotePeer
(),
"addr"
,
c
.
RemoteMultiaddr
())
}
}
func
NewNetworkNotifier
(
log
log
.
Logger
)
network
.
Notifiee
{
func
NewNetworkNotifier
(
log
log
.
Logger
,
m
metrics
.
Metricer
)
network
.
Notifiee
{
return
&
notifications
{
log
:
log
}
if
m
==
nil
{
m
=
metrics
.
NoopMetrics
}
return
&
notifications
{
log
:
log
,
m
:
m
}
}
}
op-node/p2p/prepared.go
View file @
cc738b3d
...
@@ -5,6 +5,7 @@ import (
...
@@ -5,6 +5,7 @@ import (
"fmt"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
...
@@ -38,7 +39,7 @@ func (p *Prepared) Check() error {
...
@@ -38,7 +39,7 @@ func (p *Prepared) Check() error {
}
}
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
func
(
p
*
Prepared
)
Host
(
log
log
.
Logger
)
(
host
.
Host
,
error
)
{
func
(
p
*
Prepared
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
)
(
host
.
Host
,
error
)
{
return
p
.
HostP2P
,
nil
return
p
.
HostP2P
,
nil
}
}
...
...
op-node/p2p/rpc_server.go
View file @
cc738b3d
...
@@ -54,12 +54,16 @@ type Node interface {
...
@@ -54,12 +54,16 @@ type Node interface {
type
APIBackend
struct
{
type
APIBackend
struct
{
node
Node
node
Node
log
log
.
Logger
log
log
.
Logger
m
*
metrics
.
Metrics
m
metrics
.
Metricer
}
}
var
_
API
=
(
*
APIBackend
)(
nil
)
var
_
API
=
(
*
APIBackend
)(
nil
)
func
NewP2PAPIBackend
(
node
Node
,
log
log
.
Logger
,
m
*
metrics
.
Metrics
)
*
APIBackend
{
func
NewP2PAPIBackend
(
node
Node
,
log
log
.
Logger
,
m
metrics
.
Metricer
)
*
APIBackend
{
if
m
==
nil
{
m
=
metrics
.
NoopMetrics
}
return
&
APIBackend
{
return
&
APIBackend
{
node
:
node
,
node
:
node
,
log
:
log
,
log
:
log
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment