host.go 8.45 KB
Newer Older
1 2 3 4 5 6
package p2p

import (
	"context"
	"fmt"
	"net"
7
	"sync"
8 9
	"time"

10 11 12
	//nolint:all
	"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"

13
	libp2p "github.com/libp2p/go-libp2p"
14
	mplex "github.com/libp2p/go-libp2p-mplex"
15
	lconf "github.com/libp2p/go-libp2p/config"
16 17 18
	"github.com/libp2p/go-libp2p/core/connmgr"
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/metrics"
19
	"github.com/libp2p/go-libp2p/core/network"
20
	"github.com/libp2p/go-libp2p/core/peer"
21
	"github.com/libp2p/go-libp2p/core/sec/insecure"
22
	basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
23 24 25
	"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
	"github.com/libp2p/go-libp2p/p2p/security/noise"
	tls "github.com/libp2p/go-libp2p/p2p/security/tls"
26
	"github.com/libp2p/go-libp2p/p2p/transport/tcp"
27 28
	ma "github.com/multiformats/go-multiaddr"
	madns "github.com/multiformats/go-multiaddr-dns"
29 30

	"github.com/ethereum/go-ethereum/log"
31 32 33 34

	"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
	"github.com/ethereum-optimism/optimism/op-node/p2p/store"
	"github.com/ethereum-optimism/optimism/op-service/clock"
35 36
)

37
const (
38
	staticPeerTag = "static"
39 40
)

41 42
type ExtraHostFeatures interface {
	host.Host
43
	ConnectionGater() gating.BlockingConnectionGater
44 45 46 47 48
	ConnectionManager() connmgr.ConnManager
}

type extraHost struct {
	host.Host
49
	gater   gating.BlockingConnectionGater
50
	connMgr connmgr.ConnManager
51 52 53 54 55
	log     log.Logger

	staticPeers []*peer.AddrInfo

	quitC chan struct{}
56 57
}

58
func (e *extraHost) ConnectionGater() gating.BlockingConnectionGater {
59 60 61 62 63 64 65
	return e.gater
}

func (e *extraHost) ConnectionManager() connmgr.ConnManager {
	return e.connMgr
}

66 67 68 69 70 71 72 73 74 75
func (e *extraHost) Close() error {
	close(e.quitC)
	return e.Host.Close()
}

func (e *extraHost) initStaticPeers() {
	for _, addr := range e.staticPeers {
		e.Peerstore().AddAddrs(addr.ID, addr.Addrs, time.Hour*24*7)
		// We protect the peer, so the connection manager doesn't decide to prune it.
		// We tag it with "static" so other protects/unprotects with different tags don't affect this protection.
76
		e.connMgr.Protect(addr.ID, staticPeerTag)
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
		// Try to dial the node in the background
		go func(addr *peer.AddrInfo) {
			ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
			defer cancel()
			if err := e.dialStaticPeer(ctx, addr); err != nil {
				e.log.Warn("error dialing static peer", "peer", addr.ID, "err", err)
			}
		}(addr)
	}
}

func (e *extraHost) dialStaticPeer(ctx context.Context, addr *peer.AddrInfo) error {
	e.log.Info("dialing static peer", "peer", addr.ID, "addrs", addr.Addrs)
	if _, err := e.Network().DialPeer(ctx, addr.ID); err != nil {
		return err
	}
	return nil
}

func (e *extraHost) monitorStaticPeers() {
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()

	for {
		select {
		case <-tick.C:
			ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
			var wg sync.WaitGroup

106
			e.log.Debug("polling static peers", "peers", len(e.staticPeers))
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
			for _, addr := range e.staticPeers {
				connectedness := e.Network().Connectedness(addr.ID)
				e.log.Trace("static peer connectedness", "peer", addr.ID, "connectedness", connectedness)

				if connectedness == network.Connected {
					continue
				}

				wg.Add(1)
				go func(addr *peer.AddrInfo) {
					e.log.Warn("static peer disconnected, reconnecting", "peer", addr.ID)
					if err := e.dialStaticPeer(ctx, addr); err != nil {
						e.log.Warn("error reconnecting to static peer", "peer", addr.ID, "err", err)
					}
					wg.Done()
				}(addr)
			}

			wg.Wait()
			cancel()
		case <-e.quitC:
			return
		}
	}
}

133 134
var _ ExtraHostFeatures = (*extraHost)(nil)

135
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
136 137 138
	if conf.DisableP2P {
		return nil, nil
	}
139
	pub := conf.Priv.GetPublic()
140 141
	pid, err := peer.IDFromPublicKey(pub)
	if err != nil {
142
		return nil, fmt.Errorf("failed to derive pubkey from network priv key: %w", err)
143 144
	}

145
	basePs, err := pstoreds.NewPeerstore(context.Background(), conf.Store, pstoreds.DefaultOpts())
146
	if err != nil {
147
		return nil, fmt.Errorf("failed to open peerstore: %w", err)
148 149
	}

150
	peerScoreParams := conf.PeerScoringParams()
151 152 153
	var scoreRetention time.Duration
	if peerScoreParams != nil {
		// Use the same retention period as gossip will if available
154
		scoreRetention = peerScoreParams.PeerScoring.RetainScore
155 156 157 158 159
	} else {
		// Disable score GC if peer scoring is disabled
		scoreRetention = 0
	}
	ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, scoreRetention)
160 161 162 163
	if err != nil {
		return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
	}

164
	if err := ps.AddPrivKey(pid, conf.Priv); err != nil {
165
		return nil, fmt.Errorf("failed to set up peerstore with priv key: %w", err)
166 167
	}
	if err := ps.AddPubKey(pid, pub); err != nil {
168
		return nil, fmt.Errorf("failed to set up peerstore with pub key: %w", err)
169 170
	}

171 172
	var connGtr gating.BlockingConnectionGater
	connGtr, err = gating.NewBlockingConnectionGater(conf.Store)
173
	if err != nil {
174
		return nil, fmt.Errorf("failed to open connection gater: %w", err)
175
	}
176
	connGtr = gating.AddBanExpiry(connGtr, ps, log, clock.SystemClock, metrics)
177
	connGtr = gating.AddMetering(connGtr, metrics)
178

179
	connMngr, err := DefaultConnManager(conf)
180
	if err != nil {
181
		return nil, fmt.Errorf("failed to open connection manager: %w", err)
182 183 184 185
	}

	listenAddr, err := addrFromIPAndPort(conf.ListenIP, conf.ListenTCPPort)
	if err != nil {
186
		return nil, fmt.Errorf("failed to make listen addr: %w", err)
187
	}
188
	tcpTransport := libp2p.Transport(
189 190 191 192 193 194 195 196 197
		tcp.NewTCPTransport,
		tcp.WithConnectionTimeout(time.Minute*60)) // break unused connections
	// TODO: technically we can also run the node on websocket and QUIC transports. Maybe in the future?

	var nat lconf.NATManagerC // disabled if nil
	if conf.NAT {
		nat = basichost.NewNATManager
	}

198 199
	opts := []libp2p.Option{
		libp2p.Identity(conf.Priv),
200
		// Explicitly set the user-agent, so we can differentiate from other Go libp2p users.
201 202 203
		libp2p.UserAgent(conf.UserAgent),
		tcpTransport,
		libp2p.WithDialTimeout(conf.TimeoutDial),
204
		// No relay services, direct connections between peers only.
205
		libp2p.DisableRelay(),
206
		// host will start and listen to network directly after construction from config.
207 208 209 210 211 212 213 214
		libp2p.ListenAddrs(listenAddr),
		libp2p.ConnectionGater(connGtr),
		libp2p.ConnectionManager(connMngr),
		//libp2p.ResourceManager(nil), // TODO use resource manager interface to manage resources per peer better.
		libp2p.NATManager(nat),
		libp2p.Peerstore(ps),
		libp2p.BandwidthReporter(reporter), // may be nil if disabled
		libp2p.MultiaddrResolver(madns.DefaultResolver),
215
		// Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
216
		libp2p.Ping(true),
217
		// Help peers with their NAT reachability status, but throttle to avoid too much work.
218 219
		libp2p.EnableNATService(),
		libp2p.AutoNATServiceRateLimit(10, 5, time.Second*60),
220
	}
221 222 223 224 225 226 227
	opts = append(opts, conf.HostMux...)
	if conf.NoTransportSecurity {
		opts = append(opts, libp2p.Security(insecure.ID, insecure.NewWithIdentity))
	} else {
		opts = append(opts, conf.HostSecurity...)
	}
	h, err := libp2p.New(opts...)
228 229 230
	if err != nil {
		return nil, err
	}
231

232 233
	staticPeers := make([]*peer.AddrInfo, 0, len(conf.StaticPeers))
	for _, peerAddr := range conf.StaticPeers {
234 235
		addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
		if err != nil {
236
			return nil, fmt.Errorf("bad peer address: %w", err)
237
		}
238
		if addr.ID == h.ID() {
239
			log.Info("Static-peer list contains address of local peer, ignoring the address.", "peer_id", addr.ID, "addrs", addr.Addrs)
240 241 242
			continue
		}
		staticPeers = append(staticPeers, addr)
243
	}
244 245 246 247 248 249 250 251 252 253 254 255 256

	out := &extraHost{
		Host:        h,
		connMgr:     connMngr,
		log:         log,
		staticPeers: staticPeers,
		quitC:       make(chan struct{}),
	}
	out.initStaticPeers()
	if len(conf.StaticPeers) > 0 {
		go out.monitorStaticPeers()
	}

257
	out.gater = connGtr
258 259 260 261 262 263 264 265 266 267 268 269 270
	return out, nil
}

// Creates a multi-addr to bind to. Does not contain a PeerID component (required for usage by external peers)
func addrFromIPAndPort(ip net.IP, port uint16) (ma.Multiaddr, error) {
	ipScheme := "ip4"
	if ip4 := ip.To4(); ip4 == nil {
		ipScheme = "ip6"
	} else {
		ip = ip4
	}
	return ma.NewMultiaddr(fmt.Sprintf("/%s/%s/tcp/%d", ipScheme, ip.String(), port))
}
271

272 273
func YamuxC() libp2p.Option {
	return libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport)
274 275
}

276 277
func MplexC() libp2p.Option {
	return libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport)
278 279
}

280 281
func NoiseC() libp2p.Option {
	return libp2p.Security(noise.ID, noise.New)
282 283
}

284 285
func TlsC() libp2p.Option {
	return libp2p.Security(tls.ID, tls.New)
286
}