node.go 9.7 KB
Newer Older
1 2 3 4 5 6
package p2p

import (
	"context"
	"errors"
	"fmt"
7
	"net"
8
	"strconv"
9 10
	"time"

11 12
	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/metrics"
13
	"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
14
	"github.com/ethereum-optimism/optimism/op-node/p2p/monitor"
15 16
	"github.com/ethereum-optimism/optimism/op-node/p2p/store"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
17
	"github.com/ethereum-optimism/optimism/op-service/clock"
18 19 20
	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/p2p/discover"
	"github.com/ethereum/go-ethereum/p2p/enode"
21 22
	"github.com/hashicorp/go-multierror"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
23 24 25
	"github.com/libp2p/go-libp2p/core/connmgr"
	"github.com/libp2p/go-libp2p/core/host"
	p2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
26
	"github.com/libp2p/go-libp2p/core/network"
27
	"github.com/libp2p/go-libp2p/core/peer"
28
	ma "github.com/multiformats/go-multiaddr"
29
	manet "github.com/multiformats/go-multiaddr/net"
30 31
)

Andreas Bigger's avatar
Andreas Bigger committed
32
// NodeP2P is a p2p node, which can be used to gossip messages.
33
type NodeP2P struct {
34 35 36 37
	host        host.Host                      // p2p host (optional, may be nil)
	gater       gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
	scorer      Scorer                         // writes score-updates to the peerstore and keeps metrics of score changes
	connMgr     connmgr.ConnManager            // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
38
	peerMonitor *monitor.PeerMonitor           // peer monitor to disconnect bad peers, may be nil even with p2p enabled
39
	store       store.ExtendedPeerstore        // peerstore of host, with extra bindings for scoring and banning
40
	appScorer   ApplicationScorer
41
	log         log.Logger
42 43 44 45 46
	// the below components are all optional, and may be nil. They require the host to not be nil.
	dv5Local *enode.LocalNode // p2p discovery identity
	dv5Udp   *discover.UDPv5  // p2p discovery service
	gs       *pubsub.PubSub   // p2p gossip router
	gsOut    GossipOut        // p2p gossip application interface for publishing
47 48
	syncCl   *SyncClient
	syncSrv  *ReqRespServer
49 50
}

Andreas Bigger's avatar
Andreas Bigger committed
51 52
// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
// If metrics are configured, a bandwidth monitor will be spawned in a goroutine.
53
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) {
54 55 56 57
	if setup == nil {
		return nil, errors.New("p2p node cannot be created without setup")
	}
	var n NodeP2P
58
	if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics); err != nil {
59 60 61 62 63 64 65 66 67 68 69 70
		closeErr := n.Close()
		if closeErr != nil {
			log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err)
		}
		return nil, err
	}
	if n.host == nil {
		return nil, nil
	}
	return &n, nil
}

71
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
72 73
	bwc := p2pmetrics.NewBandwidthCounter()

74 75
	n.log = log

76 77
	var err error
	// nil if disabled.
78
	n.host, err = setup.Host(log, bwc, metrics)
79 80 81 82
	if err != nil {
		if n.dv5Udp != nil {
			n.dv5Udp.Close()
		}
83
		return fmt.Errorf("failed to start p2p host: %w", err)
84 85
	}

86
	// TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong
87 88 89 90 91 92
	if n.host != nil {
		// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
		if extra, ok := n.host.(ExtraHostFeatures); ok {
			n.gater = extra.ConnectionGater()
			n.connMgr = extra.ConnectionManager()
		}
93 94 95 96 97 98 99 100 101 102
		eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
		if !ok {
			return fmt.Errorf("cannot init without extended peerstore: %w", err)
		}
		n.store = eps
		scoreParams := setup.PeerScoringParams()

		if scoreParams != nil {
			n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
		} else {
103
			n.appScorer = &NoopApplicationScorer{}
104
		}
105 106
		// Activate the P2P req-resp sync if enabled by feature-flag.
		if setup.ReqRespSyncEnabled() {
107
			n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
			n.host.Network().Notify(&network.NotifyBundle{
				ConnectedF: func(nw network.Network, conn network.Conn) {
					n.syncCl.AddPeer(conn.RemotePeer())
				},
				DisconnectedF: func(nw network.Network, conn network.Conn) {
					n.syncCl.RemovePeer(conn.RemotePeer())
				},
			})
			n.syncCl.Start()
			// the host may already be connected to peers, add them all to the sync client
			for _, peerID := range n.host.Network().Peers() {
				n.syncCl.AddPeer(peerID)
			}
			if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
				n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
				// register the sync protocol with libp2p host
				payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
				n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
			}
		}
128
		n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
129
		// notify of any new connections/streams/etc.
130
		n.host.Network().Notify(NewNetworkNotifier(log, metrics))
131
		// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
132
		n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
133
		if err != nil {
134
			return fmt.Errorf("failed to start gossipsub router: %w", err)
135
		}
136
		n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
137
		if err != nil {
138
			return fmt.Errorf("failed to join blocks gossip topic: %w", err)
139 140
		}
		log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().Pretty())
141 142 143 144 145 146 147 148 149

		tcpPort, err := FindActiveTCPPort(n.host)
		if err != nil {
			log.Warn("failed to find what TCP port p2p is binded to", "err", err)
		}

		// All nil if disabled.
		n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort)
		if err != nil {
150
			return fmt.Errorf("failed to start discv5: %w", err)
151
		}
152

153 154 155
		if metrics != nil {
			go metrics.RecordBandwidth(resourcesCtx, bwc)
		}
156

157
		if setup.BanPeers() {
158
			n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
159 160
			n.peerMonitor.Start()
		}
161
		n.appScorer.start()
162 163 164 165
	}
	return nil
}

166 167 168 169 170 171 172 173 174 175 176
func (n *NodeP2P) AltSyncEnabled() bool {
	return n.syncCl != nil
}

func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
	if !n.AltSyncEnabled() {
		return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end)
	}
	return n.syncCl.RequestL2Range(ctx, start, end)
}

177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
func (n *NodeP2P) Host() host.Host {
	return n.host
}

func (n *NodeP2P) Dv5Local() *enode.LocalNode {
	return n.dv5Local
}

func (n *NodeP2P) Dv5Udp() *discover.UDPv5 {
	return n.dv5Udp
}

func (n *NodeP2P) GossipSub() *pubsub.PubSub {
	return n.gs
}

func (n *NodeP2P) GossipOut() GossipOut {
	return n.gsOut
}

197
func (n *NodeP2P) ConnectionGater() gating.BlockingConnectionGater {
198 199 200 201 202 203 204
	return n.gater
}

func (n *NodeP2P) ConnectionManager() connmgr.ConnManager {
	return n.connMgr
}

205 206 207 208 209
func (n *NodeP2P) Peers() []peer.ID {
	return n.host.Network().Peers()
}

func (n *NodeP2P) GetPeerScore(id peer.ID) (float64, error) {
210
	return n.store.GetPeerScore(id)
211 212 213
}

func (n *NodeP2P) IsStatic(id peer.ID) bool {
214
	return n.connMgr != nil && n.connMgr.IsProtected(id, staticPeerTag)
215 216
}

217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
func (n *NodeP2P) BanPeer(id peer.ID, expiration time.Time) error {
	if err := n.store.SetPeerBanExpiration(id, expiration); err != nil {
		return fmt.Errorf("failed to set peer ban expiry: %w", err)
	}
	if err := n.host.Network().ClosePeer(id); err != nil {
		return fmt.Errorf("failed to close peer connection: %w", err)
	}
	return nil
}

func (n *NodeP2P) BanIP(ip net.IP, expiration time.Time) error {
	if err := n.store.SetIPBanExpiration(ip, expiration); err != nil {
		return fmt.Errorf("failed to set IP ban expiry: %w", err)
	}
	// kick all peers that match this IP
	for _, conn := range n.host.Network().Conns() {
		addr := conn.RemoteMultiaddr()
		remoteIP, err := manet.ToIP(addr)
		if err != nil {
			continue
		}
		if remoteIP.Equal(ip) {
			if err := conn.Close(); err != nil {
				n.log.Error("failed to close connection to peer with banned IP", "peer", conn.RemotePeer(), "ip", ip)
			}
		}
	}
	return nil
}

247 248
func (n *NodeP2P) Close() error {
	var result *multierror.Error
249 250 251
	if n.peerMonitor != nil {
		n.peerMonitor.Stop()
	}
252 253 254 255 256
	if n.dv5Udp != nil {
		n.dv5Udp.Close()
	}
	if n.gsOut != nil {
		if err := n.gsOut.Close(); err != nil {
257
			result = multierror.Append(result, fmt.Errorf("failed to close gossip cleanly: %w", err))
258 259 260 261
		}
	}
	if n.host != nil {
		if err := n.host.Close(); err != nil {
262
			result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %w", err))
263
		}
264 265 266 267 268
		if n.syncCl != nil {
			if err := n.syncCl.Close(); err != nil {
				result = multierror.Append(result, fmt.Errorf("failed to close p2p sync client cleanly: %w", err))
			}
		}
269
	}
270 271 272
	if n.appScorer != nil {
		n.appScorer.stop()
	}
273 274
	return result.ErrorOrNil()
}
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291

func FindActiveTCPPort(h host.Host) (uint16, error) {
	var tcpPort uint16
	for _, addr := range h.Addrs() {
		tcpPortStr, err := addr.ValueForProtocol(ma.P_TCP)
		if err != nil {
			continue
		}
		v, err := strconv.ParseUint(tcpPortStr, 10, 16)
		if err != nil {
			continue
		}
		tcpPort = uint16(v)
		break
	}
	return tcpPort, nil
}