node.go 9.7 KB
Newer Older
1 2 3 4 5 6
package p2p

import (
	"context"
	"errors"
	"fmt"
7
	"net"
8
	"strconv"
9 10
	"time"

11 12
	"github.com/hashicorp/go-multierror"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
13 14 15
	"github.com/libp2p/go-libp2p/core/connmgr"
	"github.com/libp2p/go-libp2p/core/host"
	p2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
16
	"github.com/libp2p/go-libp2p/core/network"
17
	"github.com/libp2p/go-libp2p/core/peer"
18
	ma "github.com/multiformats/go-multiaddr"
19
	manet "github.com/multiformats/go-multiaddr/net"
20 21 22 23 24 25 26 27 28 29 30 31

	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/p2p/discover"
	"github.com/ethereum/go-ethereum/p2p/enode"

	"github.com/ethereum-optimism/optimism/op-node/metrics"
	"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
	"github.com/ethereum-optimism/optimism/op-node/p2p/monitor"
	"github.com/ethereum-optimism/optimism/op-node/p2p/store"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum-optimism/optimism/op-service/clock"
	"github.com/ethereum-optimism/optimism/op-service/eth"
32 33
)

Andreas Bigger's avatar
Andreas Bigger committed
34
// NodeP2P is a p2p node, which can be used to gossip messages.
35
type NodeP2P struct {
36 37 38 39
	host        host.Host                      // p2p host (optional, may be nil)
	gater       gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
	scorer      Scorer                         // writes score-updates to the peerstore and keeps metrics of score changes
	connMgr     connmgr.ConnManager            // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
40
	peerMonitor *monitor.PeerMonitor           // peer monitor to disconnect bad peers, may be nil even with p2p enabled
41
	store       store.ExtendedPeerstore        // peerstore of host, with extra bindings for scoring and banning
42
	appScorer   ApplicationScorer
43
	log         log.Logger
44 45 46 47 48
	// the below components are all optional, and may be nil. They require the host to not be nil.
	dv5Local *enode.LocalNode // p2p discovery identity
	dv5Udp   *discover.UDPv5  // p2p discovery service
	gs       *pubsub.PubSub   // p2p gossip router
	gsOut    GossipOut        // p2p gossip application interface for publishing
49 50
	syncCl   *SyncClient
	syncSrv  *ReqRespServer
51 52
}

Andreas Bigger's avatar
Andreas Bigger committed
53 54
// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
// If metrics are configured, a bandwidth monitor will be spawned in a goroutine.
55
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) {
56 57 58 59
	if setup == nil {
		return nil, errors.New("p2p node cannot be created without setup")
	}
	var n NodeP2P
60
	if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics); err != nil {
61 62 63 64 65 66 67 68 69 70 71 72
		closeErr := n.Close()
		if closeErr != nil {
			log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err)
		}
		return nil, err
	}
	if n.host == nil {
		return nil, nil
	}
	return &n, nil
}

73
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
74 75
	bwc := p2pmetrics.NewBandwidthCounter()

76 77
	n.log = log

78 79
	var err error
	// nil if disabled.
80
	n.host, err = setup.Host(log, bwc, metrics)
81 82 83 84
	if err != nil {
		if n.dv5Udp != nil {
			n.dv5Udp.Close()
		}
85
		return fmt.Errorf("failed to start p2p host: %w", err)
86 87
	}

88
	// TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong
89 90 91 92 93 94
	if n.host != nil {
		// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
		if extra, ok := n.host.(ExtraHostFeatures); ok {
			n.gater = extra.ConnectionGater()
			n.connMgr = extra.ConnectionManager()
		}
95 96 97 98 99 100 101 102 103 104
		eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
		if !ok {
			return fmt.Errorf("cannot init without extended peerstore: %w", err)
		}
		n.store = eps
		scoreParams := setup.PeerScoringParams()

		if scoreParams != nil {
			n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
		} else {
105
			n.appScorer = &NoopApplicationScorer{}
106
		}
107 108
		// Activate the P2P req-resp sync if enabled by feature-flag.
		if setup.ReqRespSyncEnabled() {
109
			n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
			n.host.Network().Notify(&network.NotifyBundle{
				ConnectedF: func(nw network.Network, conn network.Conn) {
					n.syncCl.AddPeer(conn.RemotePeer())
				},
				DisconnectedF: func(nw network.Network, conn network.Conn) {
					n.syncCl.RemovePeer(conn.RemotePeer())
				},
			})
			n.syncCl.Start()
			// the host may already be connected to peers, add them all to the sync client
			for _, peerID := range n.host.Network().Peers() {
				n.syncCl.AddPeer(peerID)
			}
			if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
				n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
				// register the sync protocol with libp2p host
				payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
				n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
			}
		}
130
		n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
131
		// notify of any new connections/streams/etc.
132
		n.host.Network().Notify(NewNetworkNotifier(log, metrics))
133
		// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
134
		n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
135
		if err != nil {
136
			return fmt.Errorf("failed to start gossipsub router: %w", err)
137
		}
138
		n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
139
		if err != nil {
140
			return fmt.Errorf("failed to join blocks gossip topic: %w", err)
141 142
		}
		log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().Pretty())
143 144 145 146 147 148 149 150 151

		tcpPort, err := FindActiveTCPPort(n.host)
		if err != nil {
			log.Warn("failed to find what TCP port p2p is binded to", "err", err)
		}

		// All nil if disabled.
		n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort)
		if err != nil {
152
			return fmt.Errorf("failed to start discv5: %w", err)
153
		}
154

155 156 157
		if metrics != nil {
			go metrics.RecordBandwidth(resourcesCtx, bwc)
		}
158

159
		if setup.BanPeers() {
160
			n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
161 162
			n.peerMonitor.Start()
		}
163
		n.appScorer.start()
164 165 166 167
	}
	return nil
}

168 169 170 171 172 173 174 175 176 177 178
func (n *NodeP2P) AltSyncEnabled() bool {
	return n.syncCl != nil
}

func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
	if !n.AltSyncEnabled() {
		return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end)
	}
	return n.syncCl.RequestL2Range(ctx, start, end)
}

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
func (n *NodeP2P) Host() host.Host {
	return n.host
}

func (n *NodeP2P) Dv5Local() *enode.LocalNode {
	return n.dv5Local
}

func (n *NodeP2P) Dv5Udp() *discover.UDPv5 {
	return n.dv5Udp
}

func (n *NodeP2P) GossipSub() *pubsub.PubSub {
	return n.gs
}

func (n *NodeP2P) GossipOut() GossipOut {
	return n.gsOut
}

199
func (n *NodeP2P) ConnectionGater() gating.BlockingConnectionGater {
200 201 202 203 204 205 206
	return n.gater
}

func (n *NodeP2P) ConnectionManager() connmgr.ConnManager {
	return n.connMgr
}

207 208 209 210 211
func (n *NodeP2P) Peers() []peer.ID {
	return n.host.Network().Peers()
}

func (n *NodeP2P) GetPeerScore(id peer.ID) (float64, error) {
212
	return n.store.GetPeerScore(id)
213 214 215
}

func (n *NodeP2P) IsStatic(id peer.ID) bool {
216
	return n.connMgr != nil && n.connMgr.IsProtected(id, staticPeerTag)
217 218
}

219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
func (n *NodeP2P) BanPeer(id peer.ID, expiration time.Time) error {
	if err := n.store.SetPeerBanExpiration(id, expiration); err != nil {
		return fmt.Errorf("failed to set peer ban expiry: %w", err)
	}
	if err := n.host.Network().ClosePeer(id); err != nil {
		return fmt.Errorf("failed to close peer connection: %w", err)
	}
	return nil
}

func (n *NodeP2P) BanIP(ip net.IP, expiration time.Time) error {
	if err := n.store.SetIPBanExpiration(ip, expiration); err != nil {
		return fmt.Errorf("failed to set IP ban expiry: %w", err)
	}
	// kick all peers that match this IP
	for _, conn := range n.host.Network().Conns() {
		addr := conn.RemoteMultiaddr()
		remoteIP, err := manet.ToIP(addr)
		if err != nil {
			continue
		}
		if remoteIP.Equal(ip) {
			if err := conn.Close(); err != nil {
				n.log.Error("failed to close connection to peer with banned IP", "peer", conn.RemotePeer(), "ip", ip)
			}
		}
	}
	return nil
}

249 250
func (n *NodeP2P) Close() error {
	var result *multierror.Error
251 252 253
	if n.peerMonitor != nil {
		n.peerMonitor.Stop()
	}
254 255 256 257 258
	if n.dv5Udp != nil {
		n.dv5Udp.Close()
	}
	if n.gsOut != nil {
		if err := n.gsOut.Close(); err != nil {
259
			result = multierror.Append(result, fmt.Errorf("failed to close gossip cleanly: %w", err))
260 261 262 263
		}
	}
	if n.host != nil {
		if err := n.host.Close(); err != nil {
264
			result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %w", err))
265
		}
266 267 268 269 270
		if n.syncCl != nil {
			if err := n.syncCl.Close(); err != nil {
				result = multierror.Append(result, fmt.Errorf("failed to close p2p sync client cleanly: %w", err))
			}
		}
271
	}
272 273 274
	if n.appScorer != nil {
		n.appScorer.stop()
	}
275 276
	return result.ErrorOrNil()
}
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293

func FindActiveTCPPort(h host.Host) (uint16, error) {
	var tcpPort uint16
	for _, addr := range h.Addrs() {
		tcpPortStr, err := addr.ValueForProtocol(ma.P_TCP)
		if err != nil {
			continue
		}
		v, err := strconv.ParseUint(tcpPortStr, 10, 16)
		if err != nil {
			continue
		}
		tcpPort = uint16(v)
		break
	}
	return tcpPort, nil
}