Commit 5b402fbb authored by Diederik Loerakker's avatar Diederik Loerakker Committed by GitHub

mirror p2p-discover specs work (#2570)

* opnode/p2p,specs/p2p: discover+connect background process

* opnode/flags: simplify p2p useragent flag

* opnode/p2p: peer discovery background process

* p2p: review cleanup - define consts for numbers, fix imports
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent a655feab
...@@ -144,7 +144,7 @@ var ( ...@@ -144,7 +144,7 @@ var (
} }
UserAgent = cli.StringFlag{ UserAgent = cli.StringFlag{
Name: "p2p.useragent", Name: "p2p.useragent",
Usage: "User-agent string to share via LibP2P identify. If empty it defaults to 'optimism-VERSIONHERE'.", Usage: "User-agent string to share via LibP2P identify. If empty it defaults to 'optimism'.",
Hidden: true, Hidden: true,
Required: false, Required: false,
Value: "optimism", Value: "optimism",
......
...@@ -34,7 +34,7 @@ type OpNode struct { ...@@ -34,7 +34,7 @@ type OpNode struct {
l2Engines []*driver.Driver // engines to keep synced l2Engines []*driver.Driver // engines to keep synced
l2Nodes []*rpc.Client // L2 Execution Engines to close at shutdown l2Nodes []*rpc.Client // L2 Execution Engines to close at shutdown
server *rpcServer // RPC server hosting the rollup-node API server *rpcServer // RPC server hosting the rollup-node API
p2pNode p2p.Node // P2P node functionality p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging tracer Tracer // tracer to get events for testing/debugging
...@@ -221,6 +221,9 @@ func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { ...@@ -221,6 +221,9 @@ func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
return err return err
} }
n.p2pNode = p2pNode n.p2pNode = p2pNode
if n.p2pNode.Dv5Udp() != nil {
go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
}
} }
return nil return nil
} }
......
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -44,7 +46,8 @@ type SetupP2P interface { ...@@ -44,7 +46,8 @@ type SetupP2P interface {
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled. // Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger) (host.Host, error) Host(log log.Logger) (host.Host, error)
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled. // Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5, error) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint
} }
// Config sets up a p2p host and discv5 service from configuration. // Config sets up a p2p host and discv5 service from configuration.
...@@ -175,6 +178,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { ...@@ -175,6 +178,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
return conf, nil return conf, nil
} }
func (conf *Config) TargetPeers() uint {
return conf.PeersLo
}
func (conf *Config) loadListenOpts(ctx *cli.Context) error { func (conf *Config) loadListenOpts(ctx *cli.Context) error {
listenIP := ctx.GlobalString(flags.ListenIP.Name) listenIP := ctx.GlobalString(flags.ListenIP.Name)
if listenIP != "" { // optional if listenIP != "" { // optional
......
package p2p package p2p
import ( import (
"bytes"
"context"
secureRand "crypto/rand"
"encoding/binary"
"fmt"
"io"
"math/rand"
"net" "net"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
) )
func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5, error) { const (
discoverIntervalFast = time.Second * 5
discoverIntervalSlow = time.Second * 20
connectionIntervalFast = time.Second * 5
connectionIntervalSlow = time.Second * 20
connectionWorkerCount = 4
connectionBufferSize = 10
discoveredNodesBuffer = 3
tableKickoffDelay = time.Second * 3
discoveredAddrTTL = time.Hour * 24
collectiveDialTimeout = time.Second * 30
)
func (conf *Config) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) {
if conf.NoDiscovery { if conf.NoDiscovery {
return nil, nil, nil return nil, nil, nil
} }
localNode := enode.NewLocalNode(conf.DiscoveryDB, conf.Priv) priv := *conf.Priv
// use the geth curve definition. Same crypto, but geth needs to detect it as *their* definition of the curve.
priv.Curve = gcrypto.S256()
localNode := enode.NewLocalNode(conf.DiscoveryDB, &priv)
if conf.AdvertiseIP != nil { if conf.AdvertiseIP != nil {
localNode.SetStaticIP(conf.AdvertiseIP) localNode.SetStaticIP(conf.AdvertiseIP)
} }
if conf.AdvertiseUDPPort != 0 { if conf.AdvertiseUDPPort != 0 {
localNode.SetFallbackUDP(int(conf.AdvertiseUDPPort)) localNode.SetFallbackUDP(int(conf.AdvertiseUDPPort))
} }
if conf.AdvertiseTCPPort != 0 { // explicitly advertised port gets priority
localNode.Set(enr.TCP(conf.AdvertiseTCPPort))
} else if tcpPort != 0 { // otherwise try to pick up whatever port LibP2P binded to (listen port, or dynamically picked)
localNode.Set(enr.TCP(tcpPort))
} else if conf.ListenTCPPort != 0 { // otherwise default to the port we configured it to listen on
localNode.Set(enr.TCP(conf.ListenTCPPort))
} else {
return nil, nil, fmt.Errorf("no TCP port to put in discovery record")
}
dat := OptimismENRData{
chainID: rollupCfg.L2ChainID.Uint64(),
version: 0,
}
localNode.Set(&dat)
udpAddr := &net.UDPAddr{ udpAddr := &net.UDPAddr{
IP: conf.ListenIP, IP: conf.ListenIP,
...@@ -29,9 +75,13 @@ func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5 ...@@ -29,9 +75,13 @@ func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if udpAddr.Port == 0 { // if we picked a port dynamically, then find the port we got, and update our node record
localUDPAddr := conn.LocalAddr().(*net.UDPAddr)
localNode.SetFallbackUDP(localUDPAddr.Port)
}
cfg := discover.Config{ cfg := discover.Config{
PrivateKey: conf.Priv, PrivateKey: &priv,
NetRestrict: nil, NetRestrict: nil,
Bootnodes: conf.Bootnodes, Bootnodes: conf.Bootnodes,
Unhandled: nil, // Not used in dv5 Unhandled: nil, // Not used in dv5
...@@ -43,8 +93,294 @@ func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5 ...@@ -43,8 +93,294 @@ func (conf *Config) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5
return nil, nil, err return nil, nil, err
} }
// TODO: periodically we can pull the external IP from libp2p NAT service, log.Info("started discovery service", "enr", localNode.Node(), "id", localNode.ID())
// TODO: periodically we can pull the external IP and TCP port from libp2p NAT service,
// and add it as a statement to keep the localNode accurate (if we trust the NAT device more than the discv5 statements) // and add it as a statement to keep the localNode accurate (if we trust the NAT device more than the discv5 statements)
return localNode, udpV5, nil return localNode, udpV5, nil
} }
func enrToAddrInfo(r *enode.Node) (*peer.AddrInfo, error) {
ip := r.IP()
ipScheme := "ip4"
if ip4 := ip.To4(); ip4 == nil {
ipScheme = "ip6"
} else {
ip = ip4
}
mAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/%s/%s/tcp/%d", ipScheme, ip.String(), r.TCP()))
if err != nil {
return nil, fmt.Errorf("could not construct multi addr: %v", err)
}
pub := r.Pubkey()
peerID, err := peer.IDFromPublicKey((*crypto.Secp256k1PublicKey)(pub))
if err != nil {
return nil, fmt.Errorf("could not compute peer ID from pubkey for multi-addr: %v", err)
}
return &peer.AddrInfo{
ID: peerID,
Addrs: []multiaddr.Multiaddr{mAddr},
}, nil
}
// The discovery ENRs are just key-value lists, and we filter them by records tagged with the "optimism" key,
// and then check the chain ID and version.
type OptimismENRData struct {
chainID uint64
version uint64
}
func (o *OptimismENRData) ENRKey() string {
return "optimism"
}
func (o *OptimismENRData) EncodeRLP(w io.Writer) error {
out := make([]byte, 2*binary.MaxVarintLen64)
offset := binary.PutUvarint(out, o.chainID)
offset += binary.PutUvarint(out[offset:], o.version)
out = out[:offset]
// encode as byte-string
return rlp.Encode(w, out)
}
func (o *OptimismENRData) DecodeRLP(s *rlp.Stream) error {
b, err := s.Bytes()
if err != nil {
return fmt.Errorf("failed to decode outer ENR entry: %v", err)
}
// We don't check the byte length: the below readers are limited, and the ENR itself has size limits.
// Future "optimism" entries may contain additional data, and will be tagged with a newer version etc.
r := bytes.NewReader(b)
chainID, err := binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("failed to read chain ID var int: %v", err)
}
version, err := binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("failed to read version var int: %v", err)
}
o.chainID = chainID
o.version = version
return nil
}
var _ enr.Entry = (*OptimismENRData)(nil)
func FilterEnodes(log log.Logger, cfg *rollup.Config) func(node *enode.Node) bool {
return func(node *enode.Node) bool {
var dat OptimismENRData
err := node.Load(&dat)
// if the entry does not exist, or if it is invalid, then ignore the node
if err != nil {
log.Debug("discovered node record has no optimism info", "node", node.ID(), "err", err)
return false
}
// check chain ID matches
if cfg.L2ChainID.Uint64() != dat.chainID {
log.Debug("discovered node record has no matching chain ID", "node", node.ID(), "got", dat.chainID, "expected", cfg.L2ChainID.Uint64())
return false
}
// check version matches
if dat.version != 0 {
log.Debug("discovered node record has no matching version", "node", node.ID(), "got", dat.version, "expected", 0)
return false
}
return true
}
}
// DiscoveryProcess runs a discovery process that randomly walks the DHT to fill the peerstore,
// and connects to nodes in the peerstore that we are not already connected to.
// Nodes from the peerstore will be shuffled, unsuccessful connection attempts will cause peers to be avoided,
// and only nodes with addresses (under TTL) will be connected to.
func (n *NodeP2P) DiscoveryProcess(ctx context.Context, log log.Logger, cfg *rollup.Config, connectGoal uint) {
if n.dv5Udp == nil {
log.Warn("peer discovery is disabled")
return
}
filter := FilterEnodes(log, cfg)
// We pull nodes from discv5 DHT in random order to find new peers.
// Eventually we'll find a peer record that matches our filter.
randomNodeIter := n.dv5Udp.RandomNodes()
randomNodeIter = enode.Filter(randomNodeIter, filter)
defer randomNodeIter.Close()
// We pull from the DHT in a slow/fast interval, depending on the need to find more peers
discoverTicker := time.NewTicker(discoverIntervalFast)
defer discoverTicker.Stop()
// We connect to the peers we know of to maintain a target,
// but do so with polling to avoid scanning the connection count continuously
connectTicker := time.NewTicker(connectionIntervalFast)
defer connectTicker.Stop()
// We can go faster/slower depending on the need
slower := func() {
discoverTicker.Reset(discoverIntervalSlow)
connectTicker.Reset(connectionIntervalSlow)
}
faster := func() {
discoverTicker.Reset(discoverIntervalFast)
connectTicker.Reset(connectionIntervalFast)
}
// We try to connect to peers in parallel: some may be slow to respond
connAttempts := make(chan peer.ID, connectionBufferSize)
connectWorker := func(ctx context.Context) {
for {
id, ok := <-connAttempts
if !ok {
return
}
addrs := n.Host().Peerstore().Addrs(id)
log.Info("attempting connection", "peer", id)
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
err := n.Host().Connect(ctx, peer.AddrInfo{ID: id, Addrs: addrs})
cancel()
if err != nil {
log.Debug("failed connection attempt", "peer", id, "err", err)
}
}
}
// stops all the workers when we are done
defer close(connAttempts)
// start workers to try connect to peers
for i := 0; i < connectionWorkerCount; i++ {
go connectWorker(ctx)
}
// buffer discovered nodes, so don't stall on the dht iteration as much
randomNodesCh := make(chan *enode.Node, discoveredNodesBuffer)
defer close(randomNodesCh)
bufferNodes := func() {
for {
select {
case <-discoverTicker.C:
if !randomNodeIter.Next() {
log.Info("discv5 DHT iteration stopped, closing peer discovery now...")
return
}
found := randomNodeIter.Node()
select {
// block once we have found enough nodes
case randomNodesCh <- found:
continue
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
// Walk the DHT in parallel, the discv5 interface does not use channels for the iteration
go bufferNodes()
// Kick off by trying the nodes we have in our table (previous nodes from last run and/or bootnodes)
go func() {
<-time.After(tableKickoffDelay)
// At the start we might have trouble walking the DHT,
// but we do have a table with some nodes,
// so take the table and feed it into the discovery process
for _, rec := range n.dv5Udp.AllNodes() {
if filter(rec) {
select {
case randomNodesCh <- rec:
continue
case <-ctx.Done():
return
}
}
}
}()
pstore := n.Host().Peerstore()
for {
select {
case <-ctx.Done():
log.Info("stopped peer discovery")
return // no ctx error, expected close
case found := <-randomNodesCh:
var dat OptimismENRData
if err := found.Load(&dat); err != nil { // we already filtered on chain ID and version
continue
}
info, err := enrToAddrInfo(found)
if err != nil {
continue
}
// We add the addresses to the peerstore, and update the address TTL.
//After that we stop using the address, assuming it may not be valid anymore (until we rediscover the node)
pstore.AddAddrs(info.ID, info.Addrs, discoveredAddrTTL)
_ = pstore.AddPubKey(info.ID, (*crypto.Secp256k1PublicKey)(found.Pubkey()))
// Tag the peer, we'd rather have the connection manager prune away old peers,
// or peers on different chains, or anyone we have not seen via discovery.
// There is no tag score decay yet, so just set it to 42.
n.ConnectionManager().TagPeer(info.ID, fmt.Sprintf("optimism-%d-%d", dat.chainID, dat.version), 42)
log.Debug("discovered peer", "peer", info.ID, "nodeID", found.ID(), "addr", info.Addrs[0])
case <-connectTicker.C:
connected := n.Host().Network().Peers()
log.Debug("peering tick", "connected", len(connected),
"advertised_udp", n.dv5Local.Node().UDP(),
"advertised_tcp", n.dv5Local.Node().TCP(),
"advertised_ip", n.dv5Local.Node().IP())
if uint(len(connected)) < connectGoal {
// Start looking for more peers more actively again
faster()
peersWithAddrs := n.Host().Peerstore().PeersWithAddrs()
if err := shufflePeers(peersWithAddrs); err != nil {
continue
}
existing := make(map[peer.ID]struct{})
for _, p := range connected {
existing[p] = struct{}{}
}
// Keep using these peers, and don't try new discovery/connections.
// We don't need to search for more peers and try new connections if we already have plenty
ctx, cancel := context.WithTimeout(ctx, collectiveDialTimeout)
peerLoop:
for _, id := range peersWithAddrs {
// never dial ourselves
if n.Host().ID() == id {
continue
}
// skip peers that we are already connected to
if _, ok := existing[id]; ok {
continue
}
// skip peers that we were just connected to
if n.Host().Network().Connectedness(id) == network.CannotConnect {
continue
}
// schedule, if there is still space to schedule (this may block)
select {
case connAttempts <- id:
case <-ctx.Done():
break peerLoop
}
}
cancel()
} else {
// we have enough connections, slow down actively filling the peerstore
slower()
}
}
}
}
// shuffle the slice of peer IDs in-place with a RNG seeded by secure randomness.
func shufflePeers(ids peer.IDSlice) error {
var x [8]byte // shuffling is not critical, just need to avoid basic predictability by outside peers
if _, err := io.ReadFull(secureRand.Reader, x[:]); err != nil {
return err
}
rng := rand.New(rand.NewSource(int64(binary.LittleEndian.Uint64(x[:]))))
rng.Shuffle(len(ids), ids.Swap)
return nil
}
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"crypto/rand" "crypto/rand"
"math/big"
"net" "net"
"testing" "testing"
"time" "time"
...@@ -12,6 +13,7 @@ import ( ...@@ -12,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-datastore/sync"
...@@ -222,6 +224,123 @@ func TestP2PFull(t *testing.T) { ...@@ -222,6 +224,123 @@ func TestP2PFull(t *testing.T) {
require.NoError(t, p2pClientA.UnprotectPeer(ctx, hostB.ID())) require.NoError(t, p2pClientA.UnprotectPeer(ctx, hostB.ID()))
} }
func TestDiscovery(t *testing.T) {
pA, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
require.NoError(t, err, "failed to generate new p2p priv key")
pB, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
require.NoError(t, err, "failed to generate new p2p priv key")
pC, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
require.NoError(t, err, "failed to generate new p2p priv key")
logA := testlog.Logger(t, log.LvlError).New("host", "A")
logB := testlog.Logger(t, log.LvlError).New("host", "B")
logC := testlog.Logger(t, log.LvlError).New("host", "C")
mplexC, err := mplexC()
require.NoError(t, err)
yamuxC, err := yamuxC()
require.NoError(t, err)
noiseC, err := noiseC()
require.NoError(t, err)
tlsC, err := tlsC()
require.NoError(t, err)
discDBA, err := enode.OpenDB("") // "" = memory db
require.NoError(t, err)
discDBB, err := enode.OpenDB("")
require.NoError(t, err)
discDBC, err := enode.OpenDB("")
require.NoError(t, err)
rollupCfg := &rollup.Config{L2ChainID: big.NewInt(901)}
confA := Config{
Priv: (*ecdsa.PrivateKey)((pA).(*crypto.Secp256k1PrivateKey)),
DisableP2P: false,
NoDiscovery: false,
AdvertiseIP: net.IP{127, 0, 0, 1},
ListenUDPPort: 0, // bind to any available port
ListenIP: net.IP{127, 0, 0, 1},
ListenTCPPort: 0, // bind to any available port
StaticPeers: nil,
HostMux: []lconf.MsMuxC{yamuxC, mplexC},
HostSecurity: []lconf.MsSecC{noiseC, tlsC},
NoTransportSecurity: false,
PeersLo: 1,
PeersHi: 10,
PeersGrace: time.Second * 10,
NAT: false,
UserAgent: "optimism-testing",
TimeoutNegotiation: time.Second * 2,
TimeoutAccept: time.Second * 2,
TimeoutDial: time.Second * 2,
Store: sync.MutexWrap(ds.NewMapDatastore()),
DiscoveryDB: discDBA,
ConnGater: DefaultConnGater,
ConnMngr: DefaultConnManager,
}
// copy config A, and change the settings for B
confB := confA
confB.Priv = (*ecdsa.PrivateKey)((pB).(*crypto.Secp256k1PrivateKey))
confB.Store = sync.MutexWrap(ds.NewMapDatastore())
confB.DiscoveryDB = discDBB
resourcesCtx, resourcesCancel := context.WithCancel(context.Background())
defer resourcesCancel()
nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{})
require.NoError(t, err)
defer nodeA.Close()
hostA := nodeA.Host()
go nodeA.DiscoveryProcess(resourcesCtx, logA, rollupCfg, 10)
// Add A as bootnode to B
confB.Bootnodes = []*enode.Node{nodeA.Dv5Udp().Self()}
// Copy B config to C, and ensure they have a different priv / peerstore
confC := confB
confC.Priv = (*ecdsa.PrivateKey)((pC).(*crypto.Secp256k1PrivateKey))
confC.Store = sync.MutexWrap(ds.NewMapDatastore())
confB.DiscoveryDB = discDBC
// Start B
nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{})
require.NoError(t, err)
defer nodeB.Close()
hostB := nodeB.Host()
go nodeB.DiscoveryProcess(resourcesCtx, logB, rollupCfg, 10)
// Track connections to B
connsB := make(chan network.Conn, 2)
hostB.Network().Notify(&network.NotifyBundle{
ConnectedF: func(n network.Network, conn network.Conn) {
log.Info("connection to B", "peer", conn.RemotePeer())
connsB <- conn
}})
// Start C
nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{})
require.NoError(t, err)
defer nodeC.Close()
hostC := nodeC.Host()
go nodeC.DiscoveryProcess(resourcesCtx, logC, rollupCfg, 10)
// B and C don't know each other yet, but both have A as a bootnode.
// It should only be a matter of time for them to connect, if they discover each other via A.
var firstPeersOfB []peer.ID
for i := 0; i < 2; i++ {
select {
case <-time.After(time.Second * 30):
t.Fatal("failed to get connection to B in time")
case c := <-connsB:
firstPeersOfB = append(firstPeersOfB, c.RemotePeer())
}
}
// B should be connected to the bootnode it used (it's a valid optimism node to connect to here)
require.Contains(t, firstPeersOfB, hostA.ID())
// C should be connected, although this one might take more time to discover
require.Contains(t, firstPeersOfB, hostC.ID())
}
// Most tests should use mocknets instead of using the actual local host network // Most tests should use mocknets instead of using the actual local host network
func TestP2PMocknet(t *testing.T) { func TestP2PMocknet(t *testing.T) {
mnet, err := mocknet.FullMeshConnected(3) mnet, err := mocknet.FullMeshConnected(3)
......
...@@ -4,6 +4,9 @@ import ( ...@@ -4,6 +4,9 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strconv"
ma "github.com/multiformats/go-multiaddr"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -17,13 +20,14 @@ import ( ...@@ -17,13 +20,14 @@ import (
) )
type NodeP2P struct { type NodeP2P struct {
host host.Host // p2p host (optional, may be nil) host host.Host // p2p host (optional, may be nil)
gater ConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled gater ConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
dv5Local *enode.LocalNode // p2p discovery identity (optional, may be nil) // the below components are all optional, and may be nil. They require the host to not be nil.
dv5Udp *discover.UDPv5 // p2p discovery service (optional, may be nil) dv5Local *enode.LocalNode // p2p discovery identity
gs *pubsub.PubSub // p2p gossip router (optional, may be nil) dv5Udp *discover.UDPv5 // p2p discovery service
gsOut GossipOut // p2p gossip application interface for publishing (optional, may be nil) gs *pubsub.PubSub // p2p gossip router
gsOut GossipOut // p2p gossip application interface for publishing
} }
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) (*NodeP2P, error) { func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) (*NodeP2P, error) {
...@@ -46,12 +50,6 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. ...@@ -46,12 +50,6 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) error { func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) error {
var err error var err error
// All nil if disabled.
n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"))
if err != nil {
return fmt.Errorf("failed to start discv5: %v", err)
}
// nil if disabled. // nil if disabled.
n.host, err = setup.Host(log) n.host, err = setup.Host(log)
if err != nil { if err != nil {
...@@ -81,6 +79,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -81,6 +79,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return fmt.Errorf("failed to join blocks gossip topic: %v", err) return fmt.Errorf("failed to join blocks gossip topic: %v", err)
} }
log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().Pretty()) log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().Pretty())
tcpPort, err := FindActiveTCPPort(n.host)
if err != nil {
log.Warn("failed to find what TCP port p2p is binded to", "err", err)
}
// All nil if disabled.
n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort)
if err != nil {
return fmt.Errorf("failed to start discv5: %v", err)
}
} }
return nil return nil
} }
...@@ -130,3 +139,20 @@ func (n *NodeP2P) Close() error { ...@@ -130,3 +139,20 @@ func (n *NodeP2P) Close() error {
} }
return result.ErrorOrNil() return result.ErrorOrNil()
} }
func FindActiveTCPPort(h host.Host) (uint16, error) {
var tcpPort uint16
for _, addr := range h.Addrs() {
tcpPortStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
continue
}
v, err := strconv.ParseUint(tcpPortStr, 10, 16)
if err != nil {
continue
}
tcpPort = uint16(v)
break
}
return tcpPort, nil
}
...@@ -4,6 +4,10 @@ import ( ...@@ -4,6 +4,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
...@@ -20,6 +24,10 @@ type Prepared struct { ...@@ -20,6 +24,10 @@ type Prepared struct {
var _ SetupP2P = (*Prepared)(nil) var _ SetupP2P = (*Prepared)(nil)
func (p *Prepared) TargetPeers() uint {
return 20
}
func (p *Prepared) Check() error { func (p *Prepared) Check() error {
if (p.LocalNode == nil) != (p.UDPv5 == nil) { if (p.LocalNode == nil) != (p.UDPv5 == nil) {
return fmt.Errorf("inconsistent discv5 setup: %v <> %v", p.LocalNode, p.UDPv5) return fmt.Errorf("inconsistent discv5 setup: %v <> %v", p.LocalNode, p.UDPv5)
...@@ -36,6 +44,16 @@ func (p *Prepared) Host(log log.Logger) (host.Host, error) { ...@@ -36,6 +44,16 @@ func (p *Prepared) Host(log log.Logger) (host.Host, error) {
} }
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled. // Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
func (p *Prepared) Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5, error) { func (p *Prepared) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) {
if p.LocalNode != nil {
dat := OptimismENRData{
chainID: rollupCfg.L2ChainID.Uint64(),
version: 0,
}
p.LocalNode.Set(&dat)
if tcpPort != 0 {
p.LocalNode.Set(enr.TCP(tcpPort))
}
}
return p.LocalNode, p.UDPv5, nil return p.LocalNode, p.UDPv5, nil
} }
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"time" "time"
...@@ -26,8 +25,6 @@ import ( ...@@ -26,8 +25,6 @@ import (
// TODO: dynamic peering // TODO: dynamic peering
// - req-resp protocol to ensure peers from a different chain learn they shouldn't be connected // - req-resp protocol to ensure peers from a different chain learn they shouldn't be connected
// - banning peers based on score // - banning peers based on score
// - store enode in peerstore in dynamic-peering background process
// - peers must be tagged with the "optimism" tag and marked with high value if the chain ID matches
var ( var (
DisabledDiscovery = errors.New("discovery disabled") DisabledDiscovery = errors.New("discovery disabled")
...@@ -50,7 +47,6 @@ type Node interface { ...@@ -50,7 +47,6 @@ type Node interface {
ConnectionGater() ConnectionGater ConnectionGater() ConnectionGater
// ConnectionManager returns the connection manager, to protect peers with, may be nil // ConnectionManager returns the connection manager, to protect peers with, may be nil
ConnectionManager() connmgr.ConnManager ConnectionManager() connmgr.ConnManager
io.Closer
} }
type APIBackend struct { type APIBackend struct {
......
...@@ -86,10 +86,10 @@ The Ethereum Node Record (ENR) for an Optimism rollup node must contain the foll ...@@ -86,10 +86,10 @@ The Ethereum Node Record (ENR) for an Optimism rollup node must contain the foll
- A UDP port (`udp` field) representing the local discv5 listening port. - A UDP port (`udp` field) representing the local discv5 listening port.
- An Optimism (`optimism` field) L2 network identifier - An Optimism (`optimism` field) L2 network identifier
The `optimism` value is encoded as the concatenation of: The `optimism` value is encoded as a single RLP `bytes` value, the concatenation of:
- chain ID (`varint`) - chain ID (`unsigned varint`)
- fork ID (`varint`) - fork ID (`unsigned varint`)
Note that DiscV5 is a shared DHT (Distributed Hash Table): the L1 consensus and execution nodes, Note that DiscV5 is a shared DHT (Distributed Hash Table): the L1 consensus and execution nodes,
as well as testnet nodes, and even external IOT nodes, all communicate records in this large common DHT. as well as testnet nodes, and even external IOT nodes, all communicate records in this large common DHT.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment