Commit 2006a2f5 authored by protolambda's avatar protolambda

op-node: gossip flood-publish, cli gossip configuration

parent 805a9027
......@@ -4,6 +4,8 @@ import (
"time"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-node/p2p"
)
func p2pEnv(v string) string {
......@@ -199,6 +201,45 @@ var (
Value: "",
EnvVar: p2pEnv("SEQUENCER_KEY"),
}
GossipMeshDFlag = cli.UintFlag{
Name: "p2p.gossip.mesh.d",
Usage: "Configure GossipSub topic stable mesh target count, a.k.a. desired outbound degree, number of peers to gossip to",
Required: false,
Hidden: true,
Value: p2p.DefaultMeshD,
EnvVar: p2pEnv("GOSSIP_MESH_D"),
}
GossipMeshDloFlag = cli.UintFlag{
Name: "p2p.gossip.mesh.lo",
Usage: "Configure GossipSub topic stable mesh low watermark, a.k.a. lower bound of outbound degree",
Required: false,
Hidden: true,
Value: p2p.DefaultMeshDlo,
EnvVar: p2pEnv("GOSSIP_MESH_DLO"),
}
GossipMeshDhiFlag = cli.UintFlag{
Name: "p2p.gossip.mesh.dhi",
Usage: "Configure GossipSub topic stable mesh high watermark, a.k.a. upper bound of outbound degree, additional peers will not receive gossip",
Required: false,
Hidden: true,
Value: p2p.DefaultMeshDhi,
EnvVar: p2pEnv("GOSSIP_MESH_DHI"),
}
GossipMeshDlazyFlag = cli.UintFlag{
Name: "p2p.gossip.mesh.dlazy",
Usage: "Configure GossipSub gossip target, a.k.a. target degree for publishing-only (not propagation like p2p.gossip.mesh.d)",
Required: false,
Hidden: true,
Value: p2p.DefaultMeshDlazy,
EnvVar: p2pEnv("GOSSIP_MESH_DLAZY"),
}
GossipFloodPublishFlag = cli.BoolFlag{
Name: "p2p.gossip.mesh.floodpublish",
Usage: "Configure GossipSub to publish messages to all known peers on the topic, outside of the mesh, also see Dlazy as less aggressive alternative.",
Required: false,
Hidden: true,
EnvVar: p2pEnv("GOSSIP_FLOOD_PUBLISH"),
}
)
// None of these flags are strictly required.
......@@ -229,4 +270,9 @@ var p2pFlags = []cli.Flag{
PeerstorePath,
DiscoveryPath,
SequencerP2PKeyFlag,
GossipMeshDFlag,
GossipMeshDloFlag,
GossipMeshDhiFlag,
GossipMeshDlazyFlag,
GossipFloodPublishFlag,
}
package cli
import (
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
leveldb "github.com/ipfs/go-ds-leveldb"
lconf "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multiaddr"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/urfave/cli"
"github.com/ethereum/go-ethereum/p2p/enode"
)
func NewConfig(ctx *cli.Context) (*p2p.Config, error) {
conf := &p2p.Config{}
if ctx.GlobalBool(flags.DisableP2P.Name) {
conf.DisableP2P = true
return conf, nil
}
p, err := loadNetworkPrivKey(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load p2p priv key: %w", err)
}
conf.Priv = p
if err := loadListenOpts(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p listen options: %w", err)
}
if err := loadDiscoveryOpts(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p discovery options: %w", err)
}
if err := loadLibp2pOpts(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p options: %w", err)
}
if err := loadGossipOptions(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p gossip options: %w", err)
}
conf.ConnGater = p2p.DefaultConnGater
conf.ConnMngr = p2p.DefaultConnManager
return conf, nil
}
func validatePort(p uint) (uint16, error) {
if p == 0 {
return 0, nil
}
if p >= (1 << 16) {
return 0, fmt.Errorf("port out of range: %d", p)
}
if p < 1024 {
return 0, fmt.Errorf("port is reserved for system: %d", p)
}
return uint16(p), nil
}
func loadListenOpts(conf *p2p.Config, ctx *cli.Context) error {
listenIP := ctx.GlobalString(flags.ListenIP.Name)
if listenIP != "" { // optional
conf.ListenIP = net.ParseIP(listenIP)
if conf.ListenIP == nil {
return fmt.Errorf("failed to parse IP %q", listenIP)
}
}
var err error
conf.ListenTCPPort, err = validatePort(ctx.GlobalUint(flags.ListenTCPPort.Name))
if err != nil {
return fmt.Errorf("bad listen TCP port: %w", err)
}
conf.ListenUDPPort, err = validatePort(ctx.GlobalUint(flags.ListenUDPPort.Name))
if err != nil {
return fmt.Errorf("bad listen UDP port: %w", err)
}
return nil
}
func loadDiscoveryOpts(conf *p2p.Config, ctx *cli.Context) error {
if ctx.GlobalBool(flags.NoDiscovery.Name) {
conf.NoDiscovery = true
}
var err error
conf.AdvertiseTCPPort, err = validatePort(ctx.GlobalUint(flags.AdvertiseTCPPort.Name))
if err != nil {
return fmt.Errorf("bad advertised TCP port: %w", err)
}
conf.AdvertiseUDPPort, err = validatePort(ctx.GlobalUint(flags.AdvertiseUDPPort.Name))
if err != nil {
return fmt.Errorf("bad advertised UDP port: %w", err)
}
adIP := ctx.GlobalString(flags.AdvertiseIP.Name)
if adIP != "" { // optional
ips, err := net.LookupIP(adIP)
if err != nil {
return fmt.Errorf("failed to lookup IP of %q to advertise in ENR: %w", adIP, err)
}
// Find the first v4 IP it resolves to
for _, ip := range ips {
if ipv4 := ip.To4(); ipv4 != nil {
conf.AdvertiseIP = ipv4
break
}
}
if conf.AdvertiseIP == nil {
return fmt.Errorf("failed to parse IP %q", adIP)
}
}
dbPath := ctx.GlobalString(flags.DiscoveryPath.Name)
if dbPath == "" {
dbPath = "opnode_discovery_db"
}
if dbPath == "memory" {
dbPath = ""
}
conf.DiscoveryDB, err = enode.OpenDB(dbPath)
if err != nil {
return fmt.Errorf("failed to open discovery db: %w", err)
}
conf.Bootnodes = p2p.DefaultBootnodes
records := strings.Split(ctx.GlobalString(flags.Bootnodes.Name), ",")
for i, recordB64 := range records {
recordB64 = strings.TrimSpace(recordB64)
if recordB64 == "" { // ignore empty records
continue
}
nodeRecord, err := enode.Parse(enode.ValidSchemes, recordB64)
if err != nil {
return fmt.Errorf("bootnode record %d (of %d) is invalid: %q err: %w", i, len(records), recordB64, err)
}
conf.Bootnodes = append(conf.Bootnodes, nodeRecord)
}
return nil
}
func loadLibp2pOpts(conf *p2p.Config, ctx *cli.Context) error {
addrs := strings.Split(ctx.GlobalString(flags.StaticPeers.Name), ",")
for i, addr := range addrs {
addr = strings.TrimSpace(addr)
if addr == "" {
continue // skip empty multi addrs
}
a, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("failed to parse multi addr of static peer %d (out of %d): %q err: %w", i, len(addrs), addr, err)
}
conf.StaticPeers = append(conf.StaticPeers, a)
}
for _, v := range strings.Split(ctx.GlobalString(flags.HostMux.Name), ",") {
v = strings.ToLower(strings.TrimSpace(v))
var mc lconf.MsMuxC
var err error
switch v {
case "yamux":
mc, err = p2p.YamuxC()
case "mplex":
mc, err = p2p.MplexC()
default:
return fmt.Errorf("could not recognize mux %s", v)
}
if err != nil {
return fmt.Errorf("failed to make %s constructor: %w", v, err)
}
conf.HostMux = append(conf.HostMux, mc)
}
secArr := strings.Split(ctx.GlobalString(flags.HostSecurity.Name), ",")
for _, v := range secArr {
v = strings.ToLower(strings.TrimSpace(v))
var sc lconf.MsSecC
var err error
switch v {
case "none": // no security, for debugging etc.
if len(conf.HostSecurity) > 0 || len(secArr) > 1 {
return errors.New("cannot mix secure transport protocols with no-security")
}
conf.NoTransportSecurity = true
case "noise":
sc, err = p2p.NoiseC()
case "tls":
sc, err = p2p.TlsC()
default:
return fmt.Errorf("could not recognize security %s", v)
}
if err != nil {
return fmt.Errorf("failed to make %s constructor: %w", v, err)
}
conf.HostSecurity = append(conf.HostSecurity, sc)
}
conf.PeersLo = ctx.GlobalUint(flags.PeersLo.Name)
conf.PeersHi = ctx.GlobalUint(flags.PeersHi.Name)
conf.PeersGrace = ctx.GlobalDuration(flags.PeersGrace.Name)
conf.NAT = ctx.GlobalBool(flags.NAT.Name)
conf.UserAgent = ctx.GlobalString(flags.UserAgent.Name)
conf.TimeoutNegotiation = ctx.GlobalDuration(flags.TimeoutNegotiation.Name)
conf.TimeoutAccept = ctx.GlobalDuration(flags.TimeoutAccept.Name)
conf.TimeoutDial = ctx.GlobalDuration(flags.TimeoutDial.Name)
peerstorePath := ctx.GlobalString(flags.PeerstorePath.Name)
if peerstorePath == "" {
return errors.New("peerstore path must be specified, use 'memory' to explicitly not persist peer records")
}
var err error
var store ds.Batching
if peerstorePath == "memory" {
store = sync.MutexWrap(ds.NewMapDatastore())
} else {
store, err = leveldb.NewDatastore(peerstorePath, nil) // default leveldb options are fine
if err != nil {
return fmt.Errorf("failed to open leveldb db for peerstore: %w", err)
}
}
conf.Store = store
return nil
}
func loadNetworkPrivKey(ctx *cli.Context) (*crypto.Secp256k1PrivateKey, error) {
raw := ctx.GlobalString(flags.P2PPrivRaw.Name)
if raw != "" {
return parsePriv(raw)
}
keyPath := ctx.GlobalString(flags.P2PPrivPath.Name)
f, err := os.OpenFile(keyPath, os.O_RDONLY, 0600)
if os.IsNotExist(err) {
p, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
if err != nil {
return nil, fmt.Errorf("failed to generate new p2p priv key: %w", err)
}
b, err := p.Raw()
if err != nil {
return nil, fmt.Errorf("failed to encode new p2p priv key: %w", err)
}
f, err := os.OpenFile(keyPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil, fmt.Errorf("failed to store new p2p priv key: %w", err)
}
defer f.Close()
if _, err := f.WriteString(hex.EncodeToString(b)); err != nil {
return nil, fmt.Errorf("failed to write new p2p priv key: %w", err)
}
return (p).(*crypto.Secp256k1PrivateKey), nil
} else {
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("failed to read priv key file: %w", err)
}
return parsePriv(strings.TrimSpace(string(data)))
}
}
func parsePriv(data string) (*crypto.Secp256k1PrivateKey, error) {
if len(data) > 2 && data[:2] == "0x" {
data = data[2:]
}
b, err := hex.DecodeString(data)
if err != nil {
return nil, errors.New("p2p priv key is not formatted in hex chars")
}
p, err := crypto.UnmarshalSecp256k1PrivateKey(b)
if err != nil {
// avoid logging the priv key in the error, but hint at likely input length problem
return nil, fmt.Errorf("failed to parse priv key from %d bytes", len(b))
}
return (p).(*crypto.Secp256k1PrivateKey), nil
}
func loadGossipOptions(conf *p2p.Config, ctx *cli.Context) error {
conf.MeshD = ctx.GlobalInt(flags.GossipMeshDFlag.Name)
conf.MeshDLo = ctx.GlobalInt(flags.GossipMeshDloFlag.Name)
conf.MeshDHi = ctx.GlobalInt(flags.GossipMeshDhiFlag.Name)
conf.MeshDLazy = ctx.GlobalInt(flags.GossipMeshDlazyFlag.Name)
conf.FloodPublish = ctx.GlobalBool(flags.GossipFloodPublishFlag.Name)
return nil
}
package cli
import (
"fmt"
"github.com/ethereum/go-ethereum/crypto"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/p2p"
)
// TODO: implement remote signer setup (config to authenticated endpoint)
// and remote signer itself (e.g. a open http client to make signing requests)
// LoadSignerSetup loads a configuration for a Signer to be set up later
func LoadSignerSetup(ctx *cli.Context) (p2p.SignerSetup, error) {
key := ctx.GlobalString(flags.SequencerP2PKeyFlag.Name)
if key != "" {
// Mnemonics are bad because they leak *all* keys when they leak.
// Unencrypted keys from file are bad because they are easy to leak (and we are not checking file permissions).
priv, err := crypto.HexToECDSA(key)
if err != nil {
return nil, fmt.Errorf("failed to read batch submitter key: %w", err)
}
return &p2p.PreparedSigner{Signer: p2p.NewLocalSigner(priv)}, nil
}
// TODO: create remote signer
return nil, nil
}
package p2p
import (
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
leveldb "github.com/ipfs/go-ds-leveldb"
lconf "github.com/libp2p/go-libp2p/config"
core "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
cmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
......@@ -52,6 +38,7 @@ type SetupP2P interface {
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint
GossipSetupConfigurables
}
// Config sets up a p2p host and discv5 service from configuration.
......@@ -84,6 +71,14 @@ type Config struct {
PeersHi uint
PeersGrace time.Duration
MeshD int // topic stable mesh target count
MeshDLo int // topic stable mesh low watermark
MeshDHi int // topic stable mesh high watermark
MeshDLazy int // gossip target
// FloodPublish publishes messages from ourselves to peers outside of the gossip topic mesh but supporting the same topic.
FloodPublish bool
// If true a NAT manager will host a NAT port mapping that is updated with PMP and UPNP by libp2p/go-nat
NAT bool
......@@ -135,304 +130,11 @@ func DefaultConnManager(conf *Config) (connmgr.ConnManager, error) {
cmgr.WithEmergencyTrim(true))
}
func validatePort(p uint) (uint16, error) {
if p == 0 {
return 0, nil
}
if p >= (1 << 16) {
return 0, fmt.Errorf("port out of range: %d", p)
}
if p < 1024 {
return 0, fmt.Errorf("port is reserved for system: %d", p)
}
return uint16(p), nil
}
func NewConfig(ctx *cli.Context) (*Config, error) {
conf := &Config{}
if ctx.GlobalBool(flags.DisableP2P.Name) {
conf.DisableP2P = true
return conf, nil
}
p, err := loadNetworkPrivKey(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load p2p priv key: %w", err)
}
conf.Priv = p
if err := conf.loadListenOpts(ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p listen options: %w", err)
}
if err := conf.loadDiscoveryOpts(ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p discovery options: %w", err)
}
if err := conf.loadLibp2pOpts(ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p options: %w", err)
}
conf.ConnGater = DefaultConnGater
conf.ConnMngr = DefaultConnManager
return conf, nil
}
func (conf *Config) TargetPeers() uint {
return conf.PeersLo
}
func (conf *Config) loadListenOpts(ctx *cli.Context) error {
listenIP := ctx.GlobalString(flags.ListenIP.Name)
if listenIP != "" { // optional
conf.ListenIP = net.ParseIP(listenIP)
if conf.ListenIP == nil {
return fmt.Errorf("failed to parse IP %q", listenIP)
}
}
var err error
conf.ListenTCPPort, err = validatePort(ctx.GlobalUint(flags.ListenTCPPort.Name))
if err != nil {
return fmt.Errorf("bad listen TCP port: %w", err)
}
conf.ListenUDPPort, err = validatePort(ctx.GlobalUint(flags.ListenUDPPort.Name))
if err != nil {
return fmt.Errorf("bad listen UDP port: %w", err)
}
return nil
}
func (conf *Config) loadDiscoveryOpts(ctx *cli.Context) error {
if ctx.GlobalBool(flags.NoDiscovery.Name) {
conf.NoDiscovery = true
}
var err error
conf.AdvertiseTCPPort, err = validatePort(ctx.GlobalUint(flags.AdvertiseTCPPort.Name))
if err != nil {
return fmt.Errorf("bad advertised TCP port: %w", err)
}
conf.AdvertiseUDPPort, err = validatePort(ctx.GlobalUint(flags.AdvertiseUDPPort.Name))
if err != nil {
return fmt.Errorf("bad advertised UDP port: %w", err)
}
adIP := ctx.GlobalString(flags.AdvertiseIP.Name)
if adIP != "" { // optional
ips, err := net.LookupIP(adIP)
if err != nil {
return fmt.Errorf("failed to lookup IP of %q to advertise in ENR: %w", adIP, err)
}
// Find the first v4 IP it resolves to
for _, ip := range ips {
if ipv4 := ip.To4(); ipv4 != nil {
conf.AdvertiseIP = ipv4
break
}
}
if conf.AdvertiseIP == nil {
return fmt.Errorf("failed to parse IP %q", adIP)
}
}
dbPath := ctx.GlobalString(flags.DiscoveryPath.Name)
if dbPath == "" {
dbPath = "opnode_discovery_db"
}
if dbPath == "memory" {
dbPath = ""
}
conf.DiscoveryDB, err = enode.OpenDB(dbPath)
if err != nil {
return fmt.Errorf("failed to open discovery db: %w", err)
}
conf.Bootnodes = DefaultBootnodes
records := strings.Split(ctx.GlobalString(flags.Bootnodes.Name), ",")
for i, recordB64 := range records {
recordB64 = strings.TrimSpace(recordB64)
if recordB64 == "" { // ignore empty records
continue
}
nodeRecord, err := enode.Parse(enode.ValidSchemes, recordB64)
if err != nil {
return fmt.Errorf("bootnode record %d (of %d) is invalid: %q err: %w", i, len(records), recordB64, err)
}
conf.Bootnodes = append(conf.Bootnodes, nodeRecord)
}
return nil
}
func yamuxC() (lconf.MsMuxC, error) {
mtpt, err := lconf.MuxerConstructor(yamux.DefaultTransport)
if err != nil {
return lconf.MsMuxC{}, err
}
return lconf.MsMuxC{MuxC: mtpt, ID: "/yamux/1.0.0"}, nil
}
func mplexC() (lconf.MsMuxC, error) {
mtpt, err := lconf.MuxerConstructor(mplex.DefaultTransport)
if err != nil {
return lconf.MsMuxC{}, err
}
return lconf.MsMuxC{MuxC: mtpt, ID: "/mplex/6.7.0"}, nil
}
func noiseC() (lconf.MsSecC, error) {
stpt, err := lconf.SecurityConstructor(noise.New)
if err != nil {
return lconf.MsSecC{}, err
}
return lconf.MsSecC{SecC: stpt, ID: noise.ID}, nil
}
func tlsC() (lconf.MsSecC, error) {
stpt, err := lconf.SecurityConstructor(tls.New)
if err != nil {
return lconf.MsSecC{}, err
}
return lconf.MsSecC{SecC: stpt, ID: tls.ID}, nil
}
func (conf *Config) loadLibp2pOpts(ctx *cli.Context) error {
addrs := strings.Split(ctx.GlobalString(flags.StaticPeers.Name), ",")
for i, addr := range addrs {
addr = strings.TrimSpace(addr)
if addr == "" {
continue // skip empty multi addrs
}
a, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("failed to parse multi addr of static peer %d (out of %d): %q err: %w", i, len(addrs), addr, err)
}
conf.StaticPeers = append(conf.StaticPeers, a)
}
for _, v := range strings.Split(ctx.GlobalString(flags.HostMux.Name), ",") {
v = strings.ToLower(strings.TrimSpace(v))
var mc lconf.MsMuxC
var err error
switch v {
case "yamux":
mc, err = yamuxC()
case "mplex":
mc, err = mplexC()
default:
return fmt.Errorf("could not recognize mux %s", v)
}
if err != nil {
return fmt.Errorf("failed to make %s constructor: %w", v, err)
}
conf.HostMux = append(conf.HostMux, mc)
}
secArr := strings.Split(ctx.GlobalString(flags.HostSecurity.Name), ",")
for _, v := range secArr {
v = strings.ToLower(strings.TrimSpace(v))
var sc lconf.MsSecC
var err error
switch v {
case "none": // no security, for debugging etc.
if len(conf.HostSecurity) > 0 || len(secArr) > 1 {
return errors.New("cannot mix secure transport protocols with no-security")
}
conf.NoTransportSecurity = true
case "noise":
sc, err = noiseC()
case "tls":
sc, err = tlsC()
default:
return fmt.Errorf("could not recognize security %s", v)
}
if err != nil {
return fmt.Errorf("failed to make %s constructor: %w", v, err)
}
conf.HostSecurity = append(conf.HostSecurity, sc)
}
conf.PeersLo = ctx.GlobalUint(flags.PeersLo.Name)
conf.PeersHi = ctx.GlobalUint(flags.PeersHi.Name)
conf.PeersGrace = ctx.GlobalDuration(flags.PeersGrace.Name)
conf.NAT = ctx.GlobalBool(flags.NAT.Name)
conf.UserAgent = ctx.GlobalString(flags.UserAgent.Name)
conf.TimeoutNegotiation = ctx.GlobalDuration(flags.TimeoutNegotiation.Name)
conf.TimeoutAccept = ctx.GlobalDuration(flags.TimeoutAccept.Name)
conf.TimeoutDial = ctx.GlobalDuration(flags.TimeoutDial.Name)
peerstorePath := ctx.GlobalString(flags.PeerstorePath.Name)
if peerstorePath == "" {
return errors.New("peerstore path must be specified, use 'memory' to explicitly not persist peer records")
}
var err error
var store ds.Batching
if peerstorePath == "memory" {
store = sync.MutexWrap(ds.NewMapDatastore())
} else {
store, err = leveldb.NewDatastore(peerstorePath, nil) // default leveldb options are fine
if err != nil {
return fmt.Errorf("failed to open leveldb db for peerstore: %w", err)
}
}
conf.Store = store
return nil
}
func loadNetworkPrivKey(ctx *cli.Context) (*crypto.Secp256k1PrivateKey, error) {
raw := ctx.GlobalString(flags.P2PPrivRaw.Name)
if raw != "" {
return parsePriv(raw)
}
keyPath := ctx.GlobalString(flags.P2PPrivPath.Name)
f, err := os.OpenFile(keyPath, os.O_RDONLY, 0600)
if os.IsNotExist(err) {
p, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
if err != nil {
return nil, fmt.Errorf("failed to generate new p2p priv key: %w", err)
}
b, err := p.Raw()
if err != nil {
return nil, fmt.Errorf("failed to encode new p2p priv key: %w", err)
}
f, err := os.OpenFile(keyPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil, fmt.Errorf("failed to store new p2p priv key: %w", err)
}
defer f.Close()
if _, err := f.WriteString(hex.EncodeToString(b)); err != nil {
return nil, fmt.Errorf("failed to write new p2p priv key: %w", err)
}
return (p).(*crypto.Secp256k1PrivateKey), nil
} else {
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("failed to read priv key file: %w", err)
}
return parsePriv(strings.TrimSpace(string(data)))
}
}
func parsePriv(data string) (*crypto.Secp256k1PrivateKey, error) {
if len(data) > 2 && data[:2] == "0x" {
data = data[2:]
}
b, err := hex.DecodeString(data)
if err != nil {
return nil, errors.New("p2p priv key is not formatted in hex chars")
}
p, err := crypto.UnmarshalSecp256k1PrivateKey(b)
if err != nil {
// avoid logging the priv key in the error, but hint at likely input length problem
return nil, fmt.Errorf("failed to parse priv key from %d bytes", len(b))
}
return (p).(*crypto.Secp256k1PrivateKey), nil
}
const maxMeshParam = 1000
func (conf *Config) Check() error {
if conf.DisableP2P {
......@@ -455,5 +157,17 @@ func (conf *Config) Check() error {
if conf.ConnGater == nil {
return errors.New("need a connection gater")
}
if conf.MeshD <= 0 || conf.MeshD > maxMeshParam {
return fmt.Errorf("mesh D param must not be 0 or exceed %d, but got %d", maxMeshParam, conf.MeshD)
}
if conf.MeshDLo <= 0 || conf.MeshDLo > maxMeshParam {
return fmt.Errorf("mesh Dlo param must not be 0 or exceed %d, but got %d", maxMeshParam, conf.MeshDLo)
}
if conf.MeshDHi <= 0 || conf.MeshDHi > maxMeshParam {
return fmt.Errorf("mesh Dhi param must not be 0 or exceed %d, but got %d", maxMeshParam, conf.MeshDHi)
}
if conf.MeshDLazy <= 0 || conf.MeshDLazy > maxMeshParam {
return fmt.Errorf("mesh Dlazy param must not be 0 or exceed %d, but got %d", maxMeshParam, conf.MeshDLazy)
}
return nil
}
......@@ -34,7 +34,11 @@ const (
globalValidateThrottle = 512
gossipHeartbeat = 500 * time.Millisecond
// seenMessagesTTL limits the duration that message IDs are remembered for gossip deduplication purposes
seenMessagesTTL = 80 * gossipHeartbeat
seenMessagesTTL = 80 * gossipHeartbeat
DefaultMeshD = 8 // topic stable mesh target count
DefaultMeshDlo = 6 // topic stable mesh low watermark
DefaultMeshDhi = 12 // topic stable mesh high watermark
DefaultMeshDlazy = 6 // gossip target
)
// Message domains, the msg id function uncompresses to keep data monomorphic,
......@@ -43,6 +47,10 @@ const (
var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface {
ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option
}
type GossipRuntimeConfig interface {
P2PSequencerAddress() common.Address
}
......@@ -106,12 +114,24 @@ func BuildMsgIdFn(cfg *rollup.Config) pubsub.MsgIdFunction {
}
}
func (p *Config) ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option {
params.D = p.MeshD
params.Dlo = p.MeshDLo
params.Dhi = p.MeshDHi
params.Dlazy = p.MeshDLazy
// in the future we may add more advanced options like scoring and PX / direct-mesh / episub
return []pubsub.Option{
pubsub.WithFloodPublish(p.FloodPublish),
}
}
func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
params := pubsub.DefaultGossipSubParams()
params.D = 8 // topic stable mesh target count
params.Dlo = 6 // topic stable mesh low watermark
params.Dhi = 12 // topic stable mesh high watermark
params.Dlazy = 6 // gossip target
params.D = DefaultMeshD // topic stable mesh target count
params.Dlo = DefaultMeshDlo // topic stable mesh low watermark
params.Dhi = DefaultMeshDhi // topic stable mesh high watermark
params.Dlazy = DefaultMeshDlazy // gossip target
params.HeartbeatInterval = gossipHeartbeat // interval of heartbeat
params.FanoutTTL = 24 * time.Second // ttl for fanout maps for topics we are not subscribed to but have published to
params.HistoryLength = 12 // number of windows to retain full messages in cache for IWANT responses
......@@ -120,12 +140,13 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
return params
}
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, m GossipMetricer) (*pubsub.PubSub, error) {
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil {
return nil, err
}
return pubsub.NewGossipSub(p2pCtx, h,
params := BuildGlobalGossipParams(cfg)
gossipOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(maxGossipSize),
pubsub.WithMessageIdFn(BuildMsgIdFn(cfg)),
pubsub.WithNoAuthor(),
......@@ -137,9 +158,11 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, m Gos
pubsub.WithSeenMessagesTTL(seenMessagesTTL),
pubsub.WithPeerExchange(false),
pubsub.WithBlacklist(denyList),
pubsub.WithGossipSubParams(BuildGlobalGossipParams(cfg)),
pubsub.WithGossipSubParams(params),
pubsub.WithEventTracer(&gossipTracer{m: m}),
)
}
gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(&params)...)
return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
// TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores
}
......
......@@ -13,6 +13,10 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
......@@ -175,3 +179,35 @@ func addrFromIPAndPort(ip net.IP, port uint16) (ma.Multiaddr, error) {
}
return ma.NewMultiaddr(fmt.Sprintf("/%s/%s/tcp/%d", ipScheme, ip.String(), port))
}
func YamuxC() (lconf.MsMuxC, error) {
mtpt, err := lconf.MuxerConstructor(yamux.DefaultTransport)
if err != nil {
return lconf.MsMuxC{}, err
}
return lconf.MsMuxC{MuxC: mtpt, ID: "/yamux/1.0.0"}, nil
}
func MplexC() (lconf.MsMuxC, error) {
mtpt, err := lconf.MuxerConstructor(mplex.DefaultTransport)
if err != nil {
return lconf.MsMuxC{}, err
}
return lconf.MsMuxC{MuxC: mtpt, ID: "/mplex/6.7.0"}, nil
}
func NoiseC() (lconf.MsSecC, error) {
stpt, err := lconf.SecurityConstructor(noise.New)
if err != nil {
return lconf.MsSecC{}, err
}
return lconf.MsSecC{SecC: stpt, ID: noise.ID}, nil
}
func TlsC() (lconf.MsSecC, error) {
stpt, err := lconf.SecurityConstructor(tls.New)
if err != nil {
return lconf.MsSecC{}, err
}
return lconf.MsSecC{SecC: stpt, ID: tls.ID}, nil
}
......@@ -96,13 +96,13 @@ func TestP2PFull(t *testing.T) {
pB, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
require.NoError(t, err, "failed to generate new p2p priv key")
mplexC, err := mplexC()
mplexC, err := MplexC()
require.NoError(t, err)
yamuxC, err := yamuxC()
yamuxC, err := YamuxC()
require.NoError(t, err)
noiseC, err := noiseC()
noiseC, err := NoiseC()
require.NoError(t, err)
tlsC, err := tlsC()
tlsC, err := TlsC()
require.NoError(t, err)
confA := Config{
......@@ -242,13 +242,13 @@ func TestDiscovery(t *testing.T) {
logB := testlog.Logger(t, log.LvlError).New("host", "B")
logC := testlog.Logger(t, log.LvlError).New("host", "C")
mplexC, err := mplexC()
mplexC, err := MplexC()
require.NoError(t, err)
yamuxC, err := yamuxC()
yamuxC, err := YamuxC()
require.NoError(t, err)
noiseC, err := noiseC()
noiseC, err := NoiseC()
require.NoError(t, err)
tlsC, err := tlsC()
tlsC, err := TlsC()
require.NoError(t, err)
discDBA, err := enode.OpenDB("") // "" = memory db
......
......@@ -75,7 +75,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// unregister identify-push handler. Only identifying on dial is fine, and more robust against spam
n.host.RemoveStreamHandler(identify.IDDelta)
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, metrics)
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, metrics)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
......
......@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
......@@ -58,3 +59,7 @@ func (p *Prepared) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort u
}
return p.LocalNode, p.UDPv5, nil
}
func (p *Prepared) ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option {
return nil
}
......@@ -4,16 +4,13 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"math/big"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
var SigningDomainBlocksV1 = [32]byte{}
......@@ -79,28 +76,6 @@ func (p *PreparedSigner) SetupSigner(ctx context.Context) (Signer, error) {
return p.Signer, nil
}
// TODO: implement remote signer setup (config to authenticated endpoint)
// and remote signer itself (e.g. a open http client to make signing requests)
type SignerSetup interface {
SetupSigner(ctx context.Context) (Signer, error)
}
// LoadSignerSetup loads a configuration for a Signer to be set up later
func LoadSignerSetup(ctx *cli.Context) (SignerSetup, error) {
key := ctx.GlobalString(flags.SequencerP2PKeyFlag.Name)
if key != "" {
// Mnemonics are bad because they leak *all* keys when they leak.
// Unencrypted keys from file are bad because they are easy to leak (and we are not checking file permissions).
priv, err := crypto.HexToECDSA(key)
if err != nil {
return nil, fmt.Errorf("failed to read batch submitter key: %w", err)
}
return &PreparedSigner{Signer: NewLocalSigner(priv)}, nil
}
// TODO: create remote signer
return nil, nil
}
......@@ -19,7 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
)
......@@ -40,12 +40,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, err
}
p2pSignerSetup, err := p2p.LoadSignerSetup(ctx)
p2pSignerSetup, err := p2pcli.LoadSignerSetup(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load p2p signer: %w", err)
}
p2pConfig, err := p2p.NewConfig(ctx)
p2pConfig, err := p2pcli.NewConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load p2p config: %w", err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment