Commit 5bacaf55 authored by Anatolie Lupacescu's avatar Anatolie Lupacescu Committed by GitHub

Light nodes (#1242)

parent 83c47ca2
...@@ -58,6 +58,7 @@ const ( ...@@ -58,6 +58,7 @@ const (
optionNameSwapFactoryAddress = "swap-factory-address" optionNameSwapFactoryAddress = "swap-factory-address"
optionNameSwapInitialDeposit = "swap-initial-deposit" optionNameSwapInitialDeposit = "swap-initial-deposit"
optionNameSwapEnable = "swap-enable" optionNameSwapEnable = "swap-enable"
optionNameFullNode = "full-node"
) )
func init() { func init() {
...@@ -224,6 +225,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { ...@@ -224,6 +225,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameSwapFactoryAddress, "", "swap factory address") cmd.Flags().String(optionNameSwapFactoryAddress, "", "swap factory address")
cmd.Flags().String(optionNameSwapInitialDeposit, "100000000000000000", "initial deposit if deploying a new chequebook") cmd.Flags().String(optionNameSwapInitialDeposit, "100000000000000000", "initial deposit if deploying a new chequebook")
cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap") cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap")
cmd.Flags().Bool(optionNameFullNode, true, "cause the node to start in full mode")
} }
func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) { func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) {
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"bytes" "bytes"
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
...@@ -107,6 +108,13 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz ...@@ -107,6 +108,13 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
return err return err
} }
bootNode := c.config.GetBool(optionNameBootnodeMode)
fullNode := c.config.GetBool(optionNameFullNode)
if bootNode && !fullNode {
return errors.New("boot node must be started as a full node")
}
b, err := node.NewBee(c.config.GetString(optionNameP2PAddr), signerConfig.address, *signerConfig.publicKey, signerConfig.signer, c.config.GetUint64(optionNameNetworkID), logger, signerConfig.libp2pPrivateKey, signerConfig.pssPrivateKey, node.Options{ b, err := node.NewBee(c.config.GetString(optionNameP2PAddr), signerConfig.address, *signerConfig.publicKey, signerConfig.signer, c.config.GetUint64(optionNameNetworkID), logger, signerConfig.libp2pPrivateKey, signerConfig.pssPrivateKey, node.Options{
DataDir: c.config.GetString(optionNameDataDir), DataDir: c.config.GetString(optionNameDataDir),
DBCapacity: c.config.GetUint64(optionNameDBCapacity), DBCapacity: c.config.GetUint64(optionNameDBCapacity),
...@@ -134,11 +142,12 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz ...@@ -134,11 +142,12 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
PaymentEarly: c.config.GetString(optionNamePaymentEarly), PaymentEarly: c.config.GetString(optionNamePaymentEarly),
ResolverConnectionCfgs: resolverCfgs, ResolverConnectionCfgs: resolverCfgs,
GatewayMode: c.config.GetBool(optionNameGatewayMode), GatewayMode: c.config.GetBool(optionNameGatewayMode),
BootnodeMode: c.config.GetBool(optionNameBootnodeMode), BootnodeMode: bootNode,
SwapEndpoint: c.config.GetString(optionNameSwapEndpoint), SwapEndpoint: c.config.GetString(optionNameSwapEndpoint),
SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress), SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress),
SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit), SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit),
SwapEnable: c.config.GetBool(optionNameSwapEnable), SwapEnable: c.config.GetBool(optionNameSwapEnable),
FullNodeMode: fullNode,
}) })
if err != nil { if err != nil {
return err return err
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
...@@ -48,6 +49,7 @@ type Service struct { ...@@ -48,6 +49,7 @@ type Service struct {
swap swap.ApiInterface swap swap.ApiInterface
corsAllowedOrigins []string corsAllowedOrigins []string
metricsRegistry *prometheus.Registry metricsRegistry *prometheus.Registry
lightNodes *lightnode.Container
// handler is changed in the Configure method // handler is changed in the Configure method
handler http.Handler handler http.Handler
handlerMu sync.RWMutex handlerMu sync.RWMutex
...@@ -76,7 +78,7 @@ func New(overlay swarm.Address, publicKey, pssPublicKey ecdsa.PublicKey, ethereu ...@@ -76,7 +78,7 @@ func New(overlay swarm.Address, publicKey, pssPublicKey ecdsa.PublicKey, ethereu
// Configure injects required dependencies and configuration parameters and // Configure injects required dependencies and configuration parameters and
// constructs HTTP routes that depend on them. It is intended and safe to call // constructs HTTP routes that depend on them. It is intended and safe to call
// this method only once. // this method only once.
func (s *Service) Configure(p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, storer storage.Storer, tags *tags.Tags, accounting accounting.Interface, settlement settlement.Interface, chequebookEnabled bool, swap swap.ApiInterface, chequebook chequebook.Service) { func (s *Service) Configure(p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, lightNodes *lightnode.Container, storer storage.Storer, tags *tags.Tags, accounting accounting.Interface, settlement settlement.Interface, chequebookEnabled bool, swap swap.ApiInterface, chequebook chequebook.Service) {
s.p2p = p2p s.p2p = p2p
s.pingpong = pingpong s.pingpong = pingpong
s.topologyDriver = topologyDriver s.topologyDriver = topologyDriver
...@@ -87,6 +89,7 @@ func (s *Service) Configure(p2p p2p.DebugService, pingpong pingpong.Interface, t ...@@ -87,6 +89,7 @@ func (s *Service) Configure(p2p p2p.DebugService, pingpong pingpong.Interface, t
s.chequebookEnabled = chequebookEnabled s.chequebookEnabled = chequebookEnabled
s.chequebook = chequebook s.chequebook = chequebook
s.swap = swap s.swap = swap
s.lightNodes = lightNodes
s.setRouter(s.newRouter()) s.setRouter(s.newRouter())
} }
......
...@@ -21,7 +21,6 @@ import ( ...@@ -21,7 +21,6 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/mock"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock" p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/resolver" "github.com/ethersphere/bee/pkg/resolver"
...@@ -30,6 +29,7 @@ import ( ...@@ -30,6 +29,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology/lightnode"
topologymock "github.com/ethersphere/bee/pkg/topology/mock" topologymock "github.com/ethersphere/bee/pkg/topology/mock"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"resenje.org/web" "resenje.org/web"
...@@ -64,8 +64,9 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer { ...@@ -64,8 +64,9 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
settlement := swapmock.New(o.SettlementOpts...) settlement := swapmock.New(o.SettlementOpts...)
chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...) chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...)
swapserv := swapmock.NewApiInterface(o.SwapOpts...) swapserv := swapmock.NewApiInterface(o.SwapOpts...)
ln := lightnode.NewContainer()
s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins) s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins)
s.Configure(o.P2P, o.Pingpong, topologyDriver, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook) s.Configure(o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook)
ts := httptest.NewServer(s) ts := httptest.NewServer(s)
t.Cleanup(ts.Close) t.Cleanup(ts.Close)
...@@ -121,7 +122,7 @@ func TestServer_Configure(t *testing.T) { ...@@ -121,7 +122,7 @@ func TestServer_Configure(t *testing.T) {
PSSPublicKey: pssPrivateKey.PublicKey, PSSPublicKey: pssPrivateKey.PublicKey,
Overlay: overlay, Overlay: overlay,
EthereumAddress: ethereumAddress, EthereumAddress: ethereumAddress,
P2P: mock.New(mock.WithAddressesFunc(func() ([]multiaddr.Multiaddr, error) { P2P: p2pmock.New(p2pmock.WithAddressesFunc(func() ([]multiaddr.Multiaddr, error) {
return addresses, nil return addresses, nil
})), })),
} }
...@@ -130,6 +131,7 @@ func TestServer_Configure(t *testing.T) { ...@@ -130,6 +131,7 @@ func TestServer_Configure(t *testing.T) {
settlement := swapmock.New(o.SettlementOpts...) settlement := swapmock.New(o.SettlementOpts...)
chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...) chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...)
swapserv := swapmock.NewApiInterface(o.SwapOpts...) swapserv := swapmock.NewApiInterface(o.SwapOpts...)
ln := lightnode.NewContainer()
s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, nil) s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, nil)
ts := httptest.NewServer(s) ts := httptest.NewServer(s)
t.Cleanup(ts.Close) t.Cleanup(ts.Close)
...@@ -162,7 +164,7 @@ func TestServer_Configure(t *testing.T) { ...@@ -162,7 +164,7 @@ func TestServer_Configure(t *testing.T) {
}), }),
) )
s.Configure(o.P2P, o.Pingpong, topologyDriver, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook) s.Configure(o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook)
testBasicRouter(t, client) testBasicRouter(t, client)
jsonhttptest.Request(t, client, http.MethodGet, "/readiness", http.StatusOK, jsonhttptest.Request(t, client, http.MethodGet, "/readiness", http.StatusOK,
......
...@@ -16,6 +16,8 @@ import ( ...@@ -16,6 +16,8 @@ import (
func (s *Service) topologyHandler(w http.ResponseWriter, r *http.Request) { func (s *Service) topologyHandler(w http.ResponseWriter, r *http.Request) {
params := s.topologyDriver.Snapshot() params := s.topologyDriver.Snapshot()
params.LightNodes = s.lightNodes.PeerInfo()
b, err := json.Marshal(params) b, err := json.Marshal(params)
if err != nil { if err != nil {
s.logger.Errorf("topology marshal to json: %v", err) s.logger.Errorf("topology marshal to json: %v", err)
......
...@@ -28,11 +28,11 @@ import ( ...@@ -28,11 +28,11 @@ import (
"github.com/ethersphere/bee/pkg/debugapi" "github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/feeds/factory" "github.com/ethersphere/bee/pkg/feeds/factory"
"github.com/ethersphere/bee/pkg/hive" "github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/kademlia"
"github.com/ethersphere/bee/pkg/localstore" "github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/metrics" "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/netstore" "github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/pinning" "github.com/ethersphere/bee/pkg/pinning"
...@@ -55,6 +55,8 @@ import ( ...@@ -55,6 +55,8 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology/kademlia"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/traversal" "github.com/ethersphere/bee/pkg/traversal"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
...@@ -116,6 +118,7 @@ type Options struct { ...@@ -116,6 +118,7 @@ type Options struct {
SwapFactoryAddress string SwapFactoryAddress string
SwapInitialDeposit string SwapInitialDeposit string
SwapEnable bool SwapEnable bool
FullNodeMode bool
} }
func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (b *Bee, err error) { func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (b *Bee, err error) {
...@@ -255,13 +258,16 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -255,13 +258,16 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
) )
} }
p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{ lightNodes := lightnode.NewContainer()
p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey, PrivateKey: libp2pPrivateKey,
NATAddr: o.NATAddr, NATAddr: o.NATAddr,
EnableWS: o.EnableWS, EnableWS: o.EnableWS,
EnableQUIC: o.EnableQUIC, EnableQUIC: o.EnableQUIC,
Standalone: o.Standalone, Standalone: o.Standalone,
WelcomeMessage: o.WelcomeMessage, WelcomeMessage: o.WelcomeMessage,
FullNode: o.FullNodeMode,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("p2p service: %w", err) return nil, fmt.Errorf("p2p service: %w", err)
...@@ -412,10 +418,6 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -412,10 +418,6 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
tagService := tags.NewTags(stateStore, logger) tagService := tags.NewTags(stateStore, logger)
b.tagsCloser = tagService b.tagsCloser = tagService
if err = p2ps.AddProtocol(retrieve.Protocol()); err != nil {
return nil, fmt.Errorf("retrieval service: %w", err)
}
pssService := pss.New(pssPrivateKey, logger) pssService := pss.New(pssPrivateKey, logger)
b.pssCloser = pssService b.pssCloser = pssService
...@@ -432,36 +434,50 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -432,36 +434,50 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
pinningService := pinning.NewService(storer, stateStore, traversalService) pinningService := pinning.NewService(storer, stateStore, traversalService)
pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, pricer, signer, tracer) pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, o.FullNodeMode, pssService.TryUnwrap, logger, acc, pricer, signer, tracer)
// set the pushSyncer in the PSS // set the pushSyncer in the PSS
pssService.SetPushSyncer(pushSyncProtocol) pssService.SetPushSyncer(pushSyncProtocol)
if err = p2ps.AddProtocol(pushSyncProtocol.Protocol()); err != nil {
return nil, fmt.Errorf("pushsync service: %w", err)
}
if o.GlobalPinningEnabled { if o.GlobalPinningEnabled {
// register function for chunk repair upon receiving a trojan message // register function for chunk repair upon receiving a trojan message
chunkRepairHandler := recovery.NewRepairHandler(ns, logger, pushSyncProtocol) chunkRepairHandler := recovery.NewRepairHandler(ns, logger, pushSyncProtocol)
b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler) b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler)
} }
pushSyncPusher := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer) pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer)
b.pusherCloser = pushSyncPusher b.pusherCloser = pusherService
pullStorage := pullstorage.New(storer) pullStorage := pullstorage.New(storer)
pullSync := pullsync.New(p2ps, pullStorage, pssService.TryUnwrap, logger) pullSyncProtocol := pullsync.New(p2ps, pullStorage, pssService.TryUnwrap, logger)
b.pullSyncCloser = pullSync b.pullSyncCloser = pullSyncProtocol
if err = p2ps.AddProtocol(pullSync.Protocol()); err != nil { pullerService := puller.New(stateStore, kad, pullSyncProtocol, logger, puller.Options{})
return nil, fmt.Errorf("pullsync protocol: %w", err) b.pullerCloser = pullerService
}
puller := puller.New(stateStore, kad, pullSync, logger, puller.Options{}) retrieveProtocolSpec := retrieve.Protocol()
pushSyncProtocolSpec := pushSyncProtocol.Protocol()
pullSyncProtocolSpec := pullSyncProtocol.Protocol()
b.pullerCloser = puller if o.FullNodeMode {
logger.Info("starting in full mode")
} else {
logger.Info("starting in light mode")
p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, retrieveProtocolSpec)
p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, pushSyncProtocolSpec)
p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, pullSyncProtocolSpec)
}
if err = p2ps.AddProtocol(retrieveProtocolSpec); err != nil {
return nil, fmt.Errorf("retrieval service: %w", err)
}
if err = p2ps.AddProtocol(pushSyncProtocolSpec); err != nil {
return nil, fmt.Errorf("pushsync service: %w", err)
}
if err = p2ps.AddProtocol(pullSyncProtocolSpec); err != nil {
return nil, fmt.Errorf("pullsync protocol: %w", err)
}
multiResolver := multiresolver.NewMultiResolver( multiResolver := multiresolver.NewMultiResolver(
multiresolver.WithConnectionConfigs(o.ResolverConnectionCfgs), multiresolver.WithConnectionConfigs(o.ResolverConnectionCfgs),
...@@ -509,10 +525,10 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -509,10 +525,10 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...) debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
debugAPIService.MustRegisterMetrics(acc.Metrics()...) debugAPIService.MustRegisterMetrics(acc.Metrics()...)
debugAPIService.MustRegisterMetrics(storer.Metrics()...) debugAPIService.MustRegisterMetrics(storer.Metrics()...)
debugAPIService.MustRegisterMetrics(puller.Metrics()...) debugAPIService.MustRegisterMetrics(pullerService.Metrics()...)
debugAPIService.MustRegisterMetrics(pushSyncProtocol.Metrics()...) debugAPIService.MustRegisterMetrics(pushSyncProtocol.Metrics()...)
debugAPIService.MustRegisterMetrics(pushSyncPusher.Metrics()...) debugAPIService.MustRegisterMetrics(pusherService.Metrics()...)
debugAPIService.MustRegisterMetrics(pullSync.Metrics()...) debugAPIService.MustRegisterMetrics(pullSyncProtocol.Metrics()...)
debugAPIService.MustRegisterMetrics(retrieve.Metrics()...) debugAPIService.MustRegisterMetrics(retrieve.Metrics()...)
if pssServiceMetrics, ok := pssService.(metrics.Collector); ok { if pssServiceMetrics, ok := pssService.(metrics.Collector); ok {
...@@ -531,7 +547,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -531,7 +547,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
} }
// inject dependencies and configure full debug api http path routes // inject dependencies and configure full debug api http path routes
debugAPIService.Configure(p2ps, pingPong, kad, storer, tagService, acc, settlement, o.SwapEnable, swapService, chequebookService) debugAPIService.Configure(p2ps, pingPong, kad, lightNodes, storer, tagService, acc, settlement, o.SwapEnable, swapService, chequebookService)
} }
if err := kad.Start(p2pCtx); err != nil { if err := kad.Start(p2pCtx); err != nil {
......
...@@ -18,6 +18,10 @@ var ( ...@@ -18,6 +18,10 @@ var (
ErrAlreadyConnected = errors.New("already connected") ErrAlreadyConnected = errors.New("already connected")
) )
const (
DefaultBlocklistTime = 1 * time.Minute
)
// ConnectionBackoffError indicates that connection calls will not be executed until `tryAfter` timetamp. // ConnectionBackoffError indicates that connection calls will not be executed until `tryAfter` timetamp.
// The reason is provided in the wrappped error. // The reason is provided in the wrappped error.
type ConnectionBackoffError struct { type ConnectionBackoffError struct {
......
...@@ -7,6 +7,7 @@ package libp2p_test ...@@ -7,6 +7,7 @@ package libp2p_test
import ( import (
"context" "context"
"errors" "errors"
"io"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -17,6 +18,7 @@ import ( ...@@ -17,6 +18,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/statestore/mock" "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/libp2p/go-libp2p-core/mux"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer" libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
...@@ -200,7 +202,9 @@ func TestDoubleConnectOnAllAddresses(t *testing.T) { ...@@ -200,7 +202,9 @@ func TestDoubleConnectOnAllAddresses(t *testing.T) {
} }
for _, addr := range addrs { for _, addr := range addrs {
// creating new remote host for each address // creating new remote host for each address
s2, overlay2 := newService(t, 1, libp2pServiceOpts{}) s2, overlay2 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
if _, err := s2.Connect(ctx, addr); err != nil { if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -253,6 +257,7 @@ func TestConnectWithEnabledQUICAndWSTransports(t *testing.T) { ...@@ -253,6 +257,7 @@ func TestConnectWithEnabledQUICAndWSTransports(t *testing.T) {
libp2pOpts: libp2p.Options{ libp2pOpts: libp2p.Options{
EnableQUIC: true, EnableQUIC: true,
EnableWS: true, EnableWS: true,
FullNode: true,
}, },
}) })
...@@ -260,6 +265,7 @@ func TestConnectWithEnabledQUICAndWSTransports(t *testing.T) { ...@@ -260,6 +265,7 @@ func TestConnectWithEnabledQUICAndWSTransports(t *testing.T) {
libp2pOpts: libp2p.Options{ libp2pOpts: libp2p.Options{
EnableQUIC: true, EnableQUIC: true,
EnableWS: true, EnableWS: true,
FullNode: true,
}, },
}) })
...@@ -389,11 +395,21 @@ func TestTopologyNotifier(t *testing.T) { ...@@ -389,11 +395,21 @@ func TestTopologyNotifier(t *testing.T) {
} }
) )
notifier1 := mockNotifier(n1c, n1d, true) notifier1 := mockNotifier(n1c, n1d, true)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{Addressbook: ab1}) s1, overlay1 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab1,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s1.SetPickyNotifier(notifier1) s1.SetPickyNotifier(notifier1)
notifier2 := mockNotifier(n2c, n2d, true) notifier2 := mockNotifier(n2c, n2d, true)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{Addressbook: ab2}) s2, overlay2 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab2,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s2.SetPickyNotifier(notifier2) s2.SetPickyNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1) addr := serviceUnderlayAddress(t, s1)
...@@ -511,6 +527,126 @@ func TestTopologyOverSaturated(t *testing.T) { ...@@ -511,6 +527,126 @@ func TestTopologyOverSaturated(t *testing.T) {
waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1) waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1)
} }
func TestWithDisconnectStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
testSpec := p2p.ProtocolSpec{
Name: testProtocolName,
Version: testProtocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: testStreamName,
Handler: func(c context.Context, p p2p.Peer, s p2p.Stream) error {
return nil
},
},
},
}
p2p.WithDisconnectStreams(testSpec)
_ = s1.AddProtocol(testSpec)
s1_underlay := serviceUnderlayAddress(t, s1)
expectPeers(t, s1)
expectPeers(t, s2)
if _, err := s2.Connect(ctx, s1_underlay); err != nil {
t.Fatal(err)
}
expectPeers(t, s1, overlay2)
expectPeers(t, s2, overlay1)
s, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName)
expectStreamReset(t, s, err)
expectPeersEventually(t, s2)
expectPeersEventually(t, s1)
}
func TestWithBlocklistStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
testSpec := p2p.ProtocolSpec{
Name: testProtocolName,
Version: testProtocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: testStreamName,
Handler: func(c context.Context, p p2p.Peer, s p2p.Stream) error {
return nil
},
},
},
}
p2p.WithBlocklistStreams(1*time.Minute, testSpec)
_ = s1.AddProtocol(testSpec)
s1_underlay := serviceUnderlayAddress(t, s1)
if _, err := s2.Connect(ctx, s1_underlay); err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
s, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName)
expectStreamReset(t, s, err)
expectPeersEventually(t, s2)
expectPeersEventually(t, s1)
if _, err := s2.Connect(ctx, s1_underlay); err == nil {
t.Fatal("expected error when connecting to blocklisted peer")
}
expectPeersEventually(t, s2)
expectPeersEventually(t, s1)
}
func expectStreamReset(t *testing.T, s io.ReadCloser, err error) {
t.Helper()
// due to the fact that disconnect method is asynchronous
// stream reset error should occur either on creation or on first read attempt
if err != nil && !errors.Is(err, mux.ErrReset) {
t.Fatalf("expected stream reset error, got %v", err)
}
if err == nil {
readErr := make(chan error)
go func() {
_, err := s.Read(make([]byte, 10))
readErr <- err
}()
select {
// because read could block without erroring we should also expect timeout
case <-time.After(2 * time.Second):
t.Error("expected stream reset error, got timeout reading")
case err := <-readErr:
if !errors.Is(err, mux.ErrReset) {
t.Errorf("expected stream reset error, got %v", err)
}
}
}
}
func expectZeroAddress(t *testing.T, addrs ...swarm.Address) { func expectZeroAddress(t *testing.T, addrs ...swarm.Address) {
t.Helper() t.Helper()
for i, a := range addrs { for i, a := range addrs {
...@@ -567,6 +703,10 @@ func (n *notifiee) Pick(p p2p.Peer) bool { ...@@ -567,6 +703,10 @@ func (n *notifiee) Pick(p p2p.Peer) bool {
return n.pick return n.pick
} }
func (n *notifiee) Announce(context.Context, swarm.Address) error {
return nil
}
func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier { func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick} return &notifiee{connected: c, disconnected: d, pick: pick}
} }
......
...@@ -64,7 +64,7 @@ type Service struct { ...@@ -64,7 +64,7 @@ type Service struct {
signer crypto.Signer signer crypto.Signer
advertisableAddresser AdvertisableAddressResolver advertisableAddresser AdvertisableAddressResolver
overlay swarm.Address overlay swarm.Address
lightNode bool fullNode bool
networkID uint64 networkID uint64
welcomeMessage atomic.Value welcomeMessage atomic.Value
receivedHandshakes map[libp2ppeer.ID]struct{} receivedHandshakes map[libp2ppeer.ID]struct{}
...@@ -77,11 +77,19 @@ type Service struct { ...@@ -77,11 +77,19 @@ type Service struct {
// Info contains the information received from the handshake. // Info contains the information received from the handshake.
type Info struct { type Info struct {
BzzAddress *bzz.Address BzzAddress *bzz.Address
Light bool FullNode bool
}
func (i *Info) LightString() string {
if i.FullNode {
return " (light)"
}
return ""
} }
// New creates a new handshake Service. // New creates a new handshake Service.
func New(signer crypto.Signer, advertisableAddresser AdvertisableAddressResolver, overlay swarm.Address, networkID uint64, lighNode bool, welcomeMessage string, logger logging.Logger) (*Service, error) { func New(signer crypto.Signer, advertisableAddresser AdvertisableAddressResolver, overlay swarm.Address, networkID uint64, fullNode bool, welcomeMessage string, logger logging.Logger) (*Service, error) {
if len(welcomeMessage) > MaxWelcomeMessageLength { if len(welcomeMessage) > MaxWelcomeMessageLength {
return nil, ErrWelcomeMessageLength return nil, ErrWelcomeMessageLength
} }
...@@ -91,7 +99,7 @@ func New(signer crypto.Signer, advertisableAddresser AdvertisableAddressResolver ...@@ -91,7 +99,7 @@ func New(signer crypto.Signer, advertisableAddresser AdvertisableAddressResolver
advertisableAddresser: advertisableAddresser, advertisableAddresser: advertisableAddresser,
overlay: overlay, overlay: overlay,
networkID: networkID, networkID: networkID,
lightNode: lighNode, fullNode: fullNode,
receivedHandshakes: make(map[libp2ppeer.ID]struct{}), receivedHandshakes: make(map[libp2ppeer.ID]struct{}),
logger: logger, logger: logger,
Notifiee: new(network.NoopNotifiee), Notifiee: new(network.NoopNotifiee),
...@@ -162,7 +170,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd ...@@ -162,7 +170,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd
Signature: bzzAddress.Signature, Signature: bzzAddress.Signature,
}, },
NetworkID: s.networkID, NetworkID: s.networkID,
Light: s.lightNode, FullNode: s.fullNode,
WelcomeMessage: welcomeMessage, WelcomeMessage: welcomeMessage,
}); err != nil { }); err != nil {
return nil, fmt.Errorf("write ack message: %w", err) return nil, fmt.Errorf("write ack message: %w", err)
...@@ -175,7 +183,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd ...@@ -175,7 +183,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd
return &Info{ return &Info{
BzzAddress: remoteBzzAddress, BzzAddress: remoteBzzAddress,
Light: resp.Ack.Light, FullNode: resp.Ack.FullNode,
}, nil }, nil
} }
...@@ -241,7 +249,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, remoteMultiaddr ...@@ -241,7 +249,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, remoteMultiaddr
Signature: bzzAddress.Signature, Signature: bzzAddress.Signature,
}, },
NetworkID: s.networkID, NetworkID: s.networkID,
Light: s.lightNode, FullNode: s.fullNode,
WelcomeMessage: welcomeMessage, WelcomeMessage: welcomeMessage,
}, },
}); err != nil { }); err != nil {
...@@ -265,7 +273,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, remoteMultiaddr ...@@ -265,7 +273,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, remoteMultiaddr
return &Info{ return &Info{
BzzAddress: remoteBzzAddress, BzzAddress: remoteBzzAddress,
Light: ack.Light, FullNode: ack.FullNode,
}, nil }, nil
} }
......
...@@ -83,16 +83,16 @@ func TestHandshake(t *testing.T) { ...@@ -83,16 +83,16 @@ func TestHandshake(t *testing.T) {
node1Info := handshake.Info{ node1Info := handshake.Info{
BzzAddress: node1BzzAddress, BzzAddress: node1BzzAddress,
Light: false, FullNode: true,
} }
node2Info := handshake.Info{ node2Info := handshake.Info{
BzzAddress: node2BzzAddress, BzzAddress: node2BzzAddress,
Light: false, FullNode: true,
} }
aaddresser := &AdvertisableAddresserMock{} aaddresser := &AdvertisableAddresserMock{}
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, testWelcomeMessage, logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, testWelcomeMessage, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -115,7 +115,7 @@ func TestHandshake(t *testing.T) { ...@@ -115,7 +115,7 @@ func TestHandshake(t *testing.T) {
Signature: node2BzzAddress.Signature, Signature: node2BzzAddress.Signature,
}, },
NetworkID: networkID, NetworkID: networkID,
Light: false, FullNode: true,
WelcomeMessage: testWelcomeMessage, WelcomeMessage: testWelcomeMessage,
}, },
}); err != nil { }); err != nil {
...@@ -147,7 +147,7 @@ func TestHandshake(t *testing.T) { ...@@ -147,7 +147,7 @@ func TestHandshake(t *testing.T) {
!bytes.Equal(ack.Address.Underlay, node1maBinary) || !bytes.Equal(ack.Address.Underlay, node1maBinary) ||
!bytes.Equal(ack.Address.Signature, node1BzzAddress.Signature) || !bytes.Equal(ack.Address.Signature, node1BzzAddress.Signature) ||
ack.NetworkID != networkID || ack.NetworkID != networkID ||
ack.Light != false { ack.FullNode != true {
t.Fatal("bad ack") t.Fatal("bad ack")
} }
...@@ -160,7 +160,7 @@ func TestHandshake(t *testing.T) { ...@@ -160,7 +160,7 @@ func TestHandshake(t *testing.T) {
const LongMessage = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi consectetur urna ut lorem sollicitudin posuere. Donec sagittis laoreet sapien." const LongMessage = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi consectetur urna ut lorem sollicitudin posuere. Donec sagittis laoreet sapien."
expectedErr := handshake.ErrWelcomeMessageLength expectedErr := handshake.ErrWelcomeMessageLength
_, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, LongMessage, logger) _, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, LongMessage, logger)
if err == nil || err.Error() != expectedErr.Error() { if err == nil || err.Error() != expectedErr.Error() {
t.Fatal("expected:", expectedErr, "got:", err) t.Fatal("expected:", expectedErr, "got:", err)
} }
...@@ -240,7 +240,7 @@ func TestHandshake(t *testing.T) { ...@@ -240,7 +240,7 @@ func TestHandshake(t *testing.T) {
Signature: node2BzzAddress.Signature, Signature: node2BzzAddress.Signature,
}, },
NetworkID: networkID, NetworkID: networkID,
Light: false, FullNode: true,
}, },
}, },
); err != nil { ); err != nil {
...@@ -275,7 +275,7 @@ func TestHandshake(t *testing.T) { ...@@ -275,7 +275,7 @@ func TestHandshake(t *testing.T) {
Signature: node2BzzAddress.Signature, Signature: node2BzzAddress.Signature,
}, },
NetworkID: 5, NetworkID: 5,
Light: false, FullNode: true,
}, },
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -309,7 +309,7 @@ func TestHandshake(t *testing.T) { ...@@ -309,7 +309,7 @@ func TestHandshake(t *testing.T) {
Signature: node1BzzAddress.Signature, Signature: node1BzzAddress.Signature,
}, },
NetworkID: networkID, NetworkID: networkID,
Light: false, FullNode: true,
}, },
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -349,7 +349,7 @@ func TestHandshake(t *testing.T) { ...@@ -349,7 +349,7 @@ func TestHandshake(t *testing.T) {
Signature: node2BzzAddress.Signature, Signature: node2BzzAddress.Signature,
}, },
NetworkID: networkID, NetworkID: networkID,
Light: false, FullNode: true,
}, },
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -368,7 +368,7 @@ func TestHandshake(t *testing.T) { ...@@ -368,7 +368,7 @@ func TestHandshake(t *testing.T) {
}) })
t.Run("Handle - OK", func(t *testing.T) { t.Run("Handle - OK", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -391,7 +391,7 @@ func TestHandshake(t *testing.T) { ...@@ -391,7 +391,7 @@ func TestHandshake(t *testing.T) {
Signature: node2BzzAddress.Signature, Signature: node2BzzAddress.Signature,
}, },
NetworkID: networkID, NetworkID: networkID,
Light: false, FullNode: true,
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -420,12 +420,12 @@ func TestHandshake(t *testing.T) { ...@@ -420,12 +420,12 @@ func TestHandshake(t *testing.T) {
testInfo(t, node1Info, handshake.Info{ testInfo(t, node1Info, handshake.Info{
BzzAddress: bzzAddress, BzzAddress: bzzAddress,
Light: got.Ack.Light, FullNode: got.Ack.FullNode,
}) })
}) })
t.Run("Handle - read error ", func(t *testing.T) { t.Run("Handle - read error ", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -444,7 +444,7 @@ func TestHandshake(t *testing.T) { ...@@ -444,7 +444,7 @@ func TestHandshake(t *testing.T) {
}) })
t.Run("Handle - write error ", func(t *testing.T) { t.Run("Handle - write error ", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -471,7 +471,7 @@ func TestHandshake(t *testing.T) { ...@@ -471,7 +471,7 @@ func TestHandshake(t *testing.T) {
}) })
t.Run("Handle - ack read error ", func(t *testing.T) { t.Run("Handle - ack read error ", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -500,7 +500,7 @@ func TestHandshake(t *testing.T) { ...@@ -500,7 +500,7 @@ func TestHandshake(t *testing.T) {
}) })
t.Run("Handle - networkID mismatch ", func(t *testing.T) { t.Run("Handle - networkID mismatch ", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -523,7 +523,7 @@ func TestHandshake(t *testing.T) { ...@@ -523,7 +523,7 @@ func TestHandshake(t *testing.T) {
Signature: node2BzzAddress.Signature, Signature: node2BzzAddress.Signature,
}, },
NetworkID: 5, NetworkID: 5,
Light: false, FullNode: true,
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -539,7 +539,7 @@ func TestHandshake(t *testing.T) { ...@@ -539,7 +539,7 @@ func TestHandshake(t *testing.T) {
}) })
t.Run("Handle - duplicate handshake", func(t *testing.T) { t.Run("Handle - duplicate handshake", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -562,7 +562,7 @@ func TestHandshake(t *testing.T) { ...@@ -562,7 +562,7 @@ func TestHandshake(t *testing.T) {
Signature: node2BzzAddress.Signature, Signature: node2BzzAddress.Signature,
}, },
NetworkID: networkID, NetworkID: networkID,
Light: false, FullNode: true,
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -591,7 +591,7 @@ func TestHandshake(t *testing.T) { ...@@ -591,7 +591,7 @@ func TestHandshake(t *testing.T) {
testInfo(t, node1Info, handshake.Info{ testInfo(t, node1Info, handshake.Info{
BzzAddress: bzzAddress, BzzAddress: bzzAddress,
Light: got.Ack.Light, FullNode: got.Ack.FullNode,
}) })
_, err = handshakeService.Handle(context.Background(), stream1, node2AddrInfo.Addrs[0], node2AddrInfo.ID) _, err = handshakeService.Handle(context.Background(), stream1, node2AddrInfo.Addrs[0], node2AddrInfo.ID)
...@@ -601,7 +601,7 @@ func TestHandshake(t *testing.T) { ...@@ -601,7 +601,7 @@ func TestHandshake(t *testing.T) {
}) })
t.Run("Handle - invalid ack", func(t *testing.T) { t.Run("Handle - invalid ack", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -624,7 +624,7 @@ func TestHandshake(t *testing.T) { ...@@ -624,7 +624,7 @@ func TestHandshake(t *testing.T) {
Signature: node1BzzAddress.Signature, Signature: node1BzzAddress.Signature,
}, },
NetworkID: networkID, NetworkID: networkID,
Light: false, FullNode: true,
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -636,7 +636,7 @@ func TestHandshake(t *testing.T) { ...@@ -636,7 +636,7 @@ func TestHandshake(t *testing.T) {
}) })
t.Run("Handle - advertisable error", func(t *testing.T) { t.Run("Handle - advertisable error", func(t *testing.T) {
handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, false, "", logger) handshakeService, err := handshake.New(signer1, aaddresser, node1Info.BzzAddress.Overlay, networkID, true, "", logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -672,7 +672,7 @@ func TestHandshake(t *testing.T) { ...@@ -672,7 +672,7 @@ func TestHandshake(t *testing.T) {
// testInfo validates if two Info instances are equal. // testInfo validates if two Info instances are equal.
func testInfo(t *testing.T, got, want handshake.Info) { func testInfo(t *testing.T, got, want handshake.Info) {
t.Helper() t.Helper()
if !got.BzzAddress.Equal(want.BzzAddress) || got.Light != want.Light { if !got.BzzAddress.Equal(want.BzzAddress) || got.FullNode != want.FullNode {
t.Fatalf("got info %+v, want %+v", got, want) t.Fatalf("got info %+v, want %+v", got, want)
} }
} }
......
...@@ -69,7 +69,7 @@ func (m *Syn) GetObservedUnderlay() []byte { ...@@ -69,7 +69,7 @@ func (m *Syn) GetObservedUnderlay() []byte {
type Ack struct { type Ack struct {
Address *BzzAddress `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"` Address *BzzAddress `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
NetworkID uint64 `protobuf:"varint,2,opt,name=NetworkID,proto3" json:"NetworkID,omitempty"` NetworkID uint64 `protobuf:"varint,2,opt,name=NetworkID,proto3" json:"NetworkID,omitempty"`
Light bool `protobuf:"varint,3,opt,name=Light,proto3" json:"Light,omitempty"` FullNode bool `protobuf:"varint,3,opt,name=FullNode,proto3" json:"FullNode,omitempty"`
WelcomeMessage string `protobuf:"bytes,99,opt,name=WelcomeMessage,proto3" json:"WelcomeMessage,omitempty"` WelcomeMessage string `protobuf:"bytes,99,opt,name=WelcomeMessage,proto3" json:"WelcomeMessage,omitempty"`
} }
...@@ -120,9 +120,9 @@ func (m *Ack) GetNetworkID() uint64 { ...@@ -120,9 +120,9 @@ func (m *Ack) GetNetworkID() uint64 {
return 0 return 0
} }
func (m *Ack) GetLight() bool { func (m *Ack) GetFullNode() bool {
if m != nil { if m != nil {
return m.Light return m.FullNode
} }
return false return false
} }
...@@ -256,26 +256,26 @@ func init() { ...@@ -256,26 +256,26 @@ func init() {
func init() { proto.RegisterFile("handshake.proto", fileDescriptor_a77305914d5d202f) } func init() { proto.RegisterFile("handshake.proto", fileDescriptor_a77305914d5d202f) }
var fileDescriptor_a77305914d5d202f = []byte{ var fileDescriptor_a77305914d5d202f = []byte{
// 302 bytes of a gzipped FileDescriptorProto // 303 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x91, 0x4d, 0x4b, 0xc3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0xcc, 0x4b,
0x1c, 0xc6, 0x97, 0x55, 0xf7, 0xf2, 0x77, 0x4c, 0x09, 0x0a, 0x45, 0x46, 0x08, 0x3d, 0x48, 0xf1, 0x29, 0xce, 0x48, 0xcc, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x28,
0x30, 0x51, 0x3f, 0xc1, 0x86, 0x17, 0x61, 0x3a, 0x48, 0x11, 0xc1, 0x93, 0x7d, 0x09, 0xed, 0xe8, 0x19, 0x72, 0x31, 0x07, 0x57, 0xe6, 0x09, 0x69, 0x71, 0x09, 0xf8, 0x27, 0x15, 0xa7, 0x16, 0x95,
0x4c, 0x47, 0x52, 0x27, 0xdd, 0xa7, 0x10, 0x3f, 0x95, 0xc7, 0x1d, 0x3d, 0x4a, 0xfb, 0x45, 0xa4, 0xa5, 0xa6, 0x84, 0xe6, 0xa5, 0xa4, 0x16, 0xe5, 0x24, 0x56, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0xf0,
0xd9, 0x4b, 0x75, 0x1e, 0x9f, 0x97, 0x36, 0xbf, 0x27, 0x81, 0xc3, 0xc8, 0x15, 0x81, 0x8a, 0xdc, 0x04, 0x61, 0x88, 0x2b, 0xcd, 0x60, 0xe4, 0x62, 0x76, 0x4c, 0xce, 0x16, 0xd2, 0xe7, 0x62, 0x77,
0x98, 0xf7, 0x67, 0x32, 0x49, 0x13, 0xdc, 0xde, 0x1a, 0xd6, 0x25, 0x18, 0x4e, 0x26, 0xf0, 0x39, 0x4c, 0x49, 0x29, 0x4a, 0x2d, 0x2e, 0x06, 0x2b, 0xe5, 0x36, 0x12, 0xd5, 0x43, 0x58, 0xe4, 0x54,
0x1c, 0x8d, 0x3d, 0xc5, 0xe5, 0x9c, 0x07, 0x0f, 0x22, 0xe0, 0x72, 0xea, 0x66, 0x26, 0xa2, 0xc8, 0x55, 0x05, 0x95, 0x0c, 0x82, 0xa9, 0x12, 0x92, 0xe1, 0xe2, 0xf4, 0x4b, 0x2d, 0x29, 0xcf, 0x2f,
0xee, 0xb0, 0x7f, 0xbe, 0xf5, 0x81, 0xc0, 0x18, 0xf8, 0x31, 0xbe, 0x80, 0xe6, 0x20, 0x08, 0x24, 0xca, 0xf6, 0x74, 0x91, 0x60, 0x52, 0x60, 0xd4, 0x60, 0x09, 0x42, 0x08, 0x08, 0x49, 0x71, 0x71,
0x57, 0x4a, 0x57, 0x0f, 0xae, 0x4e, 0xfa, 0xd5, 0x41, 0xc3, 0xc5, 0x62, 0x1d, 0xb2, 0x4d, 0x0b, 0xb8, 0x95, 0xe6, 0xe4, 0xf8, 0xe5, 0xa7, 0xa4, 0x4a, 0x30, 0x2b, 0x30, 0x6a, 0x70, 0x04, 0xc1,
0xf7, 0xa0, 0x7d, 0xcf, 0xd3, 0xb7, 0x44, 0xc6, 0xb7, 0x37, 0x66, 0x9d, 0x22, 0x7b, 0x8f, 0x55, 0xf9, 0x42, 0x6a, 0x5c, 0x7c, 0xe1, 0xa9, 0x39, 0xc9, 0xf9, 0xb9, 0xa9, 0xbe, 0xa9, 0xc5, 0xc5,
0x06, 0x3e, 0x86, 0xfd, 0xd1, 0x24, 0x8c, 0x52, 0xd3, 0xa0, 0xc8, 0x6e, 0xb1, 0x95, 0xc0, 0x67, 0x89, 0xe9, 0xa9, 0x12, 0xc9, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x68, 0xa2, 0x4a, 0x3e, 0x5c, 0x6c,
0xd0, 0x7d, 0xe4, 0x53, 0x3f, 0x79, 0xe1, 0x77, 0x5c, 0x29, 0x37, 0xe4, 0xa6, 0x4f, 0x91, 0xdd, 0xc1, 0x95, 0x79, 0x20, 0xc7, 0x29, 0x80, 0xfd, 0x05, 0x75, 0x18, 0x1f, 0x92, 0xc3, 0x82, 0x2b,
0x66, 0x3b, 0xae, 0x35, 0x82, 0x86, 0x93, 0x89, 0x12, 0x8b, 0xea, 0x45, 0x6b, 0xa4, 0xee, 0x2f, 0xf3, 0x82, 0xc0, 0x5e, 0x56, 0x00, 0xfb, 0x02, 0xec, 0x0e, 0x54, 0x15, 0x8e, 0xc9, 0xd9, 0x41,
0x24, 0x27, 0x13, 0x4c, 0x8f, 0xa5, 0x9a, 0x5f, 0x13, 0xfc, 0x6d, 0x0c, 0xfc, 0x98, 0x95, 0x91, 0x20, 0x29, 0xa5, 0x04, 0x2e, 0x2e, 0x84, 0x37, 0x40, 0xee, 0x43, 0x0b, 0x1a, 0x38, 0x1f, 0xe4,
0xf5, 0x0c, 0x50, 0x0d, 0xc0, 0xa7, 0xd0, 0xda, 0xb9, 0x94, 0xad, 0x2e, 0x37, 0x39, 0x93, 0x50, 0xb3, 0xe0, 0xcc, 0xf4, 0xbc, 0xc4, 0x92, 0xd2, 0xa2, 0x54, 0xb0, 0x89, 0x3c, 0x41, 0x08, 0x01,
0xb8, 0xe9, 0xab, 0xe4, 0xfa, 0x8f, 0x1d, 0x56, 0x19, 0xd8, 0x84, 0xe6, 0x78, 0xbe, 0xfa, 0xd0, 0x21, 0x09, 0x2e, 0x76, 0xff, 0x32, 0x88, 0x46, 0x66, 0xb0, 0x1c, 0x8c, 0xeb, 0x24, 0x73, 0xe2,
0xd0, 0xd9, 0x46, 0x0e, 0x7b, 0x9f, 0x39, 0x41, 0xcb, 0x9c, 0xa0, 0xef, 0x9c, 0xa0, 0xf7, 0x82, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70,
0xd4, 0x96, 0x05, 0xa9, 0x7d, 0x15, 0xa4, 0xf6, 0x54, 0x9f, 0x79, 0x5e, 0x43, 0xbf, 0xd3, 0xf5, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x51, 0x4c, 0x05, 0x49, 0x49, 0x6c, 0xe0, 0xd8,
0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9d, 0x61, 0xea, 0x46, 0xba, 0x01, 0x00, 0x00, 0x32, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x30, 0x28, 0x02, 0x6f, 0xc0, 0x01, 0x00, 0x00,
} }
func (m *Syn) Marshal() (dAtA []byte, err error) { func (m *Syn) Marshal() (dAtA []byte, err error) {
...@@ -337,9 +337,9 @@ func (m *Ack) MarshalToSizedBuffer(dAtA []byte) (int, error) { ...@@ -337,9 +337,9 @@ func (m *Ack) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i-- i--
dAtA[i] = 0x9a dAtA[i] = 0x9a
} }
if m.Light { if m.FullNode {
i-- i--
if m.Light { if m.FullNode {
dAtA[i] = 1 dAtA[i] = 1
} else { } else {
dAtA[i] = 0 dAtA[i] = 0
...@@ -495,7 +495,7 @@ func (m *Ack) Size() (n int) { ...@@ -495,7 +495,7 @@ func (m *Ack) Size() (n int) {
if m.NetworkID != 0 { if m.NetworkID != 0 {
n += 1 + sovHandshake(uint64(m.NetworkID)) n += 1 + sovHandshake(uint64(m.NetworkID))
} }
if m.Light { if m.FullNode {
n += 2 n += 2
} }
l = len(m.WelcomeMessage) l = len(m.WelcomeMessage)
...@@ -722,7 +722,7 @@ func (m *Ack) Unmarshal(dAtA []byte) error { ...@@ -722,7 +722,7 @@ func (m *Ack) Unmarshal(dAtA []byte) error {
} }
case 3: case 3:
if wireType != 0 { if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Light", wireType) return fmt.Errorf("proto: wrong wireType = %d for field FullNode", wireType)
} }
var v int var v int
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
...@@ -739,7 +739,7 @@ func (m *Ack) Unmarshal(dAtA []byte) error { ...@@ -739,7 +739,7 @@ func (m *Ack) Unmarshal(dAtA []byte) error {
break break
} }
} }
m.Light = bool(v != 0) m.FullNode = bool(v != 0)
case 99: case 99:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field WelcomeMessage", wireType) return fmt.Errorf("proto: wrong wireType = %d for field WelcomeMessage", wireType)
......
...@@ -15,7 +15,7 @@ message Syn { ...@@ -15,7 +15,7 @@ message Syn {
message Ack { message Ack {
BzzAddress Address = 1; BzzAddress Address = 1;
uint64 NetworkID = 2; uint64 NetworkID = 2;
bool Light = 3; bool FullNode = 3;
string WelcomeMessage = 99; string WelcomeMessage = 99;
} }
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat" autonat "github.com/libp2p/go-libp2p-autonat"
...@@ -65,8 +66,13 @@ type Service struct { ...@@ -65,8 +66,13 @@ type Service struct {
logger logging.Logger logger logging.Logger
tracer *tracing.Tracer tracer *tracing.Tracer
ready chan struct{} ready chan struct{}
lightNodes lightnodes
protocolsmu sync.RWMutex
}
protocolsmu sync.RWMutex type lightnodes interface {
Connected(context.Context, p2p.Peer)
Disconnected(p2p.Peer)
} }
type Options struct { type Options struct {
...@@ -75,11 +81,11 @@ type Options struct { ...@@ -75,11 +81,11 @@ type Options struct {
EnableWS bool EnableWS bool
EnableQUIC bool EnableQUIC bool
Standalone bool Standalone bool
LightNode bool FullNode bool
WelcomeMessage string WelcomeMessage string
} }
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) { func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) {
host, port, err := net.SplitHostPort(addr) host, port, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
return nil, fmt.Errorf("address: %w", err) return nil, fmt.Errorf("address: %w", err)
...@@ -202,7 +208,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -202,7 +208,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
advertisableAddresser = natAddrResolver advertisableAddresser = natAddrResolver
} }
handshakeService, err := handshake.New(signer, advertisableAddresser, overlay, networkID, o.LightNode, o.WelcomeMessage, logger) handshakeService, err := handshake.New(signer, advertisableAddresser, overlay, networkID, o.FullNode, o.WelcomeMessage, logger)
if err != nil { if err != nil {
return nil, fmt.Errorf("handshake service: %w", err) return nil, fmt.Errorf("handshake service: %w", err)
} }
...@@ -224,6 +230,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -224,6 +230,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
tracer: tracer, tracer: tracer,
connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options
ready: make(chan struct{}), ready: make(chan struct{}),
lightNodes: lightNodes,
} }
peerRegistry.setDisconnecter(s) peerRegistry.setDisconnecter(s)
...@@ -294,12 +301,14 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -294,12 +301,14 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return return
} }
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) if i.FullNode {
if err != nil { err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err) if err != nil {
s.logger.Errorf("unable to persist peer %v", peerID) s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err)
_ = s.Disconnect(i.BzzAddress.Overlay) s.logger.Errorf("unable to persist peer %v", peerID)
return _ = s.Disconnect(i.BzzAddress.Overlay)
return
}
} }
peer := p2p.Peer{Address: i.BzzAddress.Overlay} peer := p2p.Peer{Address: i.BzzAddress.Overlay}
...@@ -315,7 +324,13 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -315,7 +324,13 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
s.protocolsmu.RUnlock() s.protocolsmu.RUnlock()
if s.notifier != nil { if s.notifier != nil {
if err := s.notifier.Connected(ctx, peer); err != nil { if !i.FullNode {
s.lightNodes.Connected(ctx, peer)
//light node announces explicitly
if err := s.notifier.Announce(ctx, peer.Address); err != nil {
s.logger.Debugf("notifier.Announce: %s: %v", peer.Address.String(), err)
}
} else if err := s.notifier.Connected(ctx, peer); err != nil { // full node announces implicitly
s.logger.Debugf("notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err) s.logger.Debugf("notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node // note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side // waiting on handshakeStream.FullClose() on the other side
...@@ -332,9 +347,8 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -332,9 +347,8 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
} }
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
s.logger.Debugf("successfully connected to peer %s (inbound)", i.BzzAddress.ShortString()) s.logger.Debugf("successfully connected to peer %s%s (inbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("successfully connected to peer %s (inbound)", i.BzzAddress.Overlay) s.logger.Infof("successfully connected to peer %s%s (inbound)", i.BzzAddress.Overlay, i.LightString())
}) })
h.Network().SetConnHandler(func(_ network.Conn) { h.Network().SetConnHandler(func(_ network.Conn) {
...@@ -398,18 +412,23 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -398,18 +412,23 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if err := ss.Handler(ctx, p2p.Peer{Address: overlay}, stream); err != nil { if err := ss.Handler(ctx, p2p.Peer{Address: overlay}, stream); err != nil {
var de *p2p.DisconnectError var de *p2p.DisconnectError
if errors.As(err, &de) { if errors.As(err, &de) {
_ = stream.Reset()
_ = s.Disconnect(overlay) _ = s.Disconnect(overlay)
} }
var bpe *p2p.BlockPeerError var bpe *p2p.BlockPeerError
if errors.As(err, &bpe) { if errors.As(err, &bpe) {
_ = stream.Reset()
if err := s.Blocklist(overlay, bpe.Duration()); err != nil { if err := s.Blocklist(overlay, bpe.Duration()); err != nil {
logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err) logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err)
logger.Errorf("unable to blocklist peer %v", peerID) logger.Errorf("unable to blocklist peer %v", peerID)
} }
logger.Tracef("blocklisted a peer %s", peerID) logger.Tracef("blocklisted a peer %s", peerID)
} }
// count unexpected requests
if errors.Is(err, p2p.ErrUnexpected) {
s.metrics.UnexpectedProtocolReqCount.Inc()
}
logger.Debugf("could not handle protocol %s/%s: stream %s: peer %s: error: %v", p.Name, p.Version, ss.Name, overlay, err) logger.Debugf("could not handle protocol %s/%s: stream %s: peer %s: error: %v", p.Name, p.Version, ss.Name, overlay, err)
return return
} }
...@@ -546,10 +565,12 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -546,10 +565,12 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("connect full close %w", err) return nil, fmt.Errorf("connect full close %w", err)
} }
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) if i.FullNode {
if err != nil { err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
_ = s.Disconnect(i.BzzAddress.Overlay) if err != nil {
return nil, fmt.Errorf("storing bzz address: %w", err) _ = s.Disconnect(i.BzzAddress.Overlay)
return nil, fmt.Errorf("storing bzz address: %w", err)
}
} }
s.protocolsmu.RLock() s.protocolsmu.RLock()
...@@ -564,8 +585,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -564,8 +585,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
s.protocolsmu.RUnlock() s.protocolsmu.RUnlock()
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
s.logger.Debugf("successfully connected to peer %s (outbound)", i.BzzAddress.ShortString())
s.logger.Infof("successfully connected to peer %s (outbound)", i.BzzAddress.Overlay) s.logger.Debugf("successfully connected to peer %s%s (outbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("successfully connected to peer %s%s (outbound)", i.BzzAddress.Overlay, i.LightString())
return i.BzzAddress, nil return i.BzzAddress, nil
} }
...@@ -593,6 +615,9 @@ func (s *Service) Disconnect(overlay swarm.Address) error { ...@@ -593,6 +615,9 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
if s.notifier != nil { if s.notifier != nil {
s.notifier.Disconnected(peer) s.notifier.Disconnected(peer)
} }
if s.lightNodes != nil {
s.lightNodes.Disconnected(peer)
}
return nil return nil
} }
...@@ -614,6 +639,9 @@ func (s *Service) disconnected(address swarm.Address) { ...@@ -614,6 +639,9 @@ func (s *Service) disconnected(address swarm.Address) {
if s.notifier != nil { if s.notifier != nil {
s.notifier.Disconnected(peer) s.notifier.Disconnected(peer)
} }
if s.lightNodes != nil {
s.lightNodes.Disconnected(peer)
}
} }
func (s *Service) Peers() []p2p.Peer { func (s *Service) Peers() []p2p.Peer {
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/statestore/mock" "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
) )
...@@ -65,7 +66,10 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p. ...@@ -65,7 +66,10 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p.
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s, err = libp2p.New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, o.Addressbook, statestore, o.Logger, nil, o.libp2pOpts)
lightnodes := lightnode.NewContainer()
s, err = libp2p.New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, o.Addressbook, statestore, lightnodes, o.Logger, nil, o.libp2pOpts)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -13,14 +13,15 @@ type metrics struct { ...@@ -13,14 +13,15 @@ type metrics struct {
// all metrics fields must be exported // all metrics fields must be exported
// to be able to return them by Metrics() // to be able to return them by Metrics()
// using reflection // using reflection
CreatedConnectionCount prometheus.Counter CreatedConnectionCount prometheus.Counter
HandledConnectionCount prometheus.Counter HandledConnectionCount prometheus.Counter
CreatedStreamCount prometheus.Counter CreatedStreamCount prometheus.Counter
HandledStreamCount prometheus.Counter HandledStreamCount prometheus.Counter
BlocklistedPeerCount prometheus.Counter BlocklistedPeerCount prometheus.Counter
BlocklistedPeerErrCount prometheus.Counter BlocklistedPeerErrCount prometheus.Counter
DisconnectCount prometheus.Counter DisconnectCount prometheus.Counter
ConnectBreakerCount prometheus.Counter ConnectBreakerCount prometheus.Counter
UnexpectedProtocolReqCount prometheus.Counter
} }
func newMetrics() metrics { func newMetrics() metrics {
...@@ -75,6 +76,12 @@ func newMetrics() metrics { ...@@ -75,6 +76,12 @@ func newMetrics() metrics {
Name: "connect_breaker_count", Name: "connect_breaker_count",
Help: "Number of times we got a closed breaker while connecting to another peer.", Help: "Number of times we got a closed breaker while connecting to another peer.",
}), }),
UnexpectedProtocolReqCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "unexpected_protocol_request_count",
Help: "Number of requests the peer is not expecting.",
}),
} }
} }
......
...@@ -44,6 +44,7 @@ type PickyNotifier interface { ...@@ -44,6 +44,7 @@ type PickyNotifier interface {
type Notifier interface { type Notifier interface {
Connected(context.Context, Peer) error Connected(context.Context, Peer) error
Disconnected(Peer) Disconnected(Peer)
Announce(context.Context, swarm.Address) error
} }
// DebugService extends the Service with method used for debugging. // DebugService extends the Service with method used for debugging.
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package p2p
import (
"context"
"errors"
"time"
)
var ErrUnexpected = errors.New("unexpected request while in light mode")
// WithDisconnectStreams will mutate the given spec and replace the handler with a always erroring one.
func WithDisconnectStreams(spec ProtocolSpec) {
for i := range spec.StreamSpecs {
spec.StreamSpecs[i].Handler = func(c context.Context, p Peer, s Stream) error {
return NewDisconnectError(ErrUnexpected)
}
}
}
// WithBlocklistStreams will mutate the given spec and replace the handler with a always erroring one.
func WithBlocklistStreams(dur time.Duration, spec ProtocolSpec) {
for i := range spec.StreamSpecs {
spec.StreamSpecs[i].Handler = func(c context.Context, p Peer, s Stream) error {
return NewBlockPeerError(dur, ErrUnexpected)
}
}
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package p2p_test
import (
"context"
"errors"
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p"
)
func newTestProtocol(h p2p.HandlerFunc) p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: "test",
Version: "1.0",
StreamSpecs: []p2p.StreamSpec{
{
Name: "testStreamName1",
Handler: h,
}, {
Name: "testStreamName2",
Handler: h,
},
},
}
}
func TestBlocklistError(t *testing.T) {
tp := newTestProtocol(func(context.Context, p2p.Peer, p2p.Stream) error {
return errors.New("test")
})
p2p.WithBlocklistStreams(1*time.Minute, tp)
for _, sp := range tp.StreamSpecs {
err := sp.Handler(context.Background(), p2p.Peer{}, nil)
var discErr *p2p.BlockPeerError
if !errors.As(err, &discErr) {
t.Error("unexpected error type")
}
if !errors.Is(err, p2p.ErrUnexpected) {
t.Error("unexpected wrapped error type")
}
}
}
func TestDisconnectError(t *testing.T) {
tp := newTestProtocol(func(context.Context, p2p.Peer, p2p.Stream) error {
return errors.New("test")
})
p2p.WithDisconnectStreams(tp)
for _, sp := range tp.StreamSpecs {
err := sp.Handler(context.Background(), p2p.Peer{}, nil)
var discErr *p2p.DisconnectError
if !errors.As(err, &discErr) {
t.Error("unexpected error type")
}
if !errors.Is(err, p2p.ErrUnexpected) {
t.Error("unexpected wrapped error type")
}
}
}
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"time" "time"
"github.com/ethersphere/bee/pkg/intervalstore" "github.com/ethersphere/bee/pkg/intervalstore"
mockk "github.com/ethersphere/bee/pkg/kademlia/mock"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/puller" "github.com/ethersphere/bee/pkg/puller"
mockps "github.com/ethersphere/bee/pkg/pullsync/mock" mockps "github.com/ethersphere/bee/pkg/pullsync/mock"
...@@ -20,6 +19,7 @@ import ( ...@@ -20,6 +19,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test" "github.com/ethersphere/bee/pkg/swarm/test"
mockk "github.com/ethersphere/bee/pkg/topology/kademlia/mock"
) )
const max = math.MaxUint64 const max = math.MaxUint64
......
...@@ -65,19 +65,21 @@ type PushSync struct { ...@@ -65,19 +65,21 @@ type PushSync struct {
metrics metrics metrics metrics
tracer *tracing.Tracer tracer *tracing.Tracer
signer crypto.Signer signer crypto.Signer
isFullNode bool
} }
var timeToLive = 5 * time.Second // request time to live var timeToLive = 5 * time.Second // request time to live
var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storage.Putter, topologyDriver topology.Driver, tagger *tags.Tags, unwrap func(swarm.Chunk), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer) *PushSync { func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storage.Putter, topologyDriver topology.Driver, tagger *tags.Tags, isFullNode bool, unwrap func(swarm.Chunk), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer) *PushSync {
ps := &PushSync{ ps := &PushSync{
address: address, address: address,
streamer: streamer, streamer: streamer,
storer: storer, storer: storer,
topologyDriver: topologyDriver, topologyDriver: topologyDriver,
tagger: tagger, tagger: tagger,
isFullNode: isFullNode,
unwrap: unwrap, unwrap: unwrap,
logger: logger, logger: logger,
accounting: accounting, accounting: accounting,
...@@ -301,8 +303,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R ...@@ -301,8 +303,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
defersFn() defersFn()
includeSelf := ps.isFullNode
// find the next closest peer // find the next closest peer
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), skipPeers...) peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, skipPeers...)
if err != nil { if err != nil {
// ClosestPeer can return ErrNotFound in case we are not connected to any peers // ClosestPeer can return ErrNotFound in case we are not connected to any peers
// in which case we should return immediately. // in which case we should return immediately.
......
...@@ -570,7 +570,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameter ...@@ -570,7 +570,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameter
unwrap = func(swarm.Chunk) {} unwrap = func(swarm.Chunk) {}
} }
return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, unwrap, logger, mockAccounting, mockPricer, signer, nil), storer, mtag, mockAccounting return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, logger, mockAccounting, mockPricer, signer, nil), storer, mtag, mockAccounting
} }
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) { func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
......
...@@ -16,11 +16,11 @@ import ( ...@@ -16,11 +16,11 @@ import (
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery" "github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/kademlia/pslice"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/pslice"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
...@@ -648,12 +648,12 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr, ...@@ -648,12 +648,12 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr,
return errOverlayMismatch return errOverlayMismatch
} }
return k.announce(ctx, peer) return k.Announce(ctx, peer)
} }
// announce a newly connected peer to our connected peers, but also // announce a newly connected peer to our connected peers, but also
// notify the peer about our already connected peers // notify the peer about our already connected peers
func (k *Kad) announce(ctx context.Context, peer swarm.Address) error { func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error {
addrs := []swarm.Address{} addrs := []swarm.Address{}
_ = k.connectedPeers.EachBinRev(func(connectedPeer swarm.Address, _ uint8) (bool, bool, error) { _ = k.connectedPeers.EachBinRev(func(connectedPeer swarm.Address, _ uint8) (bool, bool, error) {
...@@ -747,7 +747,7 @@ func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error { ...@@ -747,7 +747,7 @@ func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {
} }
func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
if err := k.announce(ctx, addr); err != nil { if err := k.Announce(ctx, addr); err != nil {
return err return err
} }
...@@ -858,16 +858,24 @@ func isIn(a swarm.Address, addresses []p2p.Peer) bool { ...@@ -858,16 +858,24 @@ func isIn(a swarm.Address, addresses []p2p.Peer) bool {
} }
// ClosestPeer returns the closest peer to a given address. // ClosestPeer returns the closest peer to a given address.
func (k *Kad) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm.Address, error) { func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, skipPeers ...swarm.Address) (swarm.Address, error) {
if k.connectedPeers.Length() == 0 { if k.connectedPeers.Length() == 0 {
return swarm.Address{}, topology.ErrNotFound return swarm.Address{}, topology.ErrNotFound
} }
peers := k.p2p.Peers() peers := k.p2p.Peers()
var peersToDisconnect []swarm.Address var peersToDisconnect []swarm.Address
closest := k.base var closest = swarm.ZeroAddress
if includeSelf {
closest = k.base
}
err := k.connectedPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) { err := k.connectedPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
if closest.IsZero() {
closest = peer
}
for _, a := range skipPeers { for _, a := range skipPeers {
if a.Equal(peer) { if a.Equal(peer) {
return false, false, nil return false, false, nil
...@@ -901,6 +909,10 @@ func (k *Kad) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm ...@@ -901,6 +909,10 @@ func (k *Kad) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm
return swarm.Address{}, err return swarm.Address{}, err
} }
if closest.IsZero() { //no peers
return swarm.Address{}, topology.ErrNotFound // only for light nodes
}
for _, v := range peersToDisconnect { for _, v := range peersToDisconnect {
k.Disconnected(p2p.Peer{Address: v}) k.Disconnected(p2p.Peer{Address: v})
} }
......
...@@ -19,11 +19,8 @@ import ( ...@@ -19,11 +19,8 @@ import (
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz" "github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
beeCrypto "github.com/ethersphere/bee/pkg/crypto" beeCrypto "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/discovery/mock" "github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/kademlia"
"github.com/ethersphere/bee/pkg/kademlia/pslice"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock" p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
...@@ -31,6 +28,8 @@ import ( ...@@ -31,6 +28,8 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test" "github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/kademlia"
"github.com/ethersphere/bee/pkg/topology/pslice"
) )
func init() { func init() {
...@@ -536,7 +535,7 @@ func TestNotifierHooks(t *testing.T) { ...@@ -536,7 +535,7 @@ func TestNotifierHooks(t *testing.T) {
connectOne(t, signer, kad, ab, peer, nil) connectOne(t, signer, kad, ab, peer, nil)
p, err := kad.ClosestPeer(addr) p, err := kad.ClosestPeer(addr, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -547,7 +546,7 @@ func TestNotifierHooks(t *testing.T) { ...@@ -547,7 +546,7 @@ func TestNotifierHooks(t *testing.T) {
// disconnect the peer, expect error // disconnect the peer, expect error
kad.Disconnected(p2p.Peer{Address: peer}) kad.Disconnected(p2p.Peer{Address: peer})
_, err = kad.ClosestPeer(addr) _, err = kad.ClosestPeer(addr, true)
if !errors.Is(err, topology.ErrNotFound) { if !errors.Is(err, topology.ErrNotFound) {
t.Fatalf("expected topology.ErrNotFound but got %v", err) t.Fatalf("expected topology.ErrNotFound but got %v", err)
} }
...@@ -745,7 +744,7 @@ func TestClosestPeer(t *testing.T) { ...@@ -745,7 +744,7 @@ func TestClosestPeer(t *testing.T) {
} }
defer kad.Close() defer kad.Close()
pk, _ := crypto.GenerateSecp256k1Key() pk, _ := beeCrypto.GenerateSecp256k1Key()
for _, v := range connectedPeers { for _, v := range connectedPeers {
addOne(t, beeCrypto.NewDefaultSigner(pk), kad, ab, v.Address) addOne(t, beeCrypto.NewDefaultSigner(pk), kad, ab, v.Address)
} }
...@@ -755,37 +754,50 @@ func TestClosestPeer(t *testing.T) { ...@@ -755,37 +754,50 @@ func TestClosestPeer(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
chunkAddress swarm.Address // chunk address to test chunkAddress swarm.Address // chunk address to test
expectedPeer int // points to the index of the connectedPeers slice. -1 means self (baseOverlay) expectedPeer int // points to the index of the connectedPeers slice. -1 means self (baseOverlay)
includeSelf bool
}{ }{
{ {
chunkAddress: swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000"), // 0111, wants peer 2 chunkAddress: swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000"), // 0111, wants peer 2
expectedPeer: 2, expectedPeer: 2,
includeSelf: true,
}, },
{ {
chunkAddress: swarm.MustParseHexAddress("c000000000000000000000000000000000000000000000000000000000000000"), // 1100, want peer 0 chunkAddress: swarm.MustParseHexAddress("c000000000000000000000000000000000000000000000000000000000000000"), // 1100, want peer 0
expectedPeer: 0, expectedPeer: 0,
includeSelf: true,
}, },
{ {
chunkAddress: swarm.MustParseHexAddress("e000000000000000000000000000000000000000000000000000000000000000"), // 1110, want peer 0 chunkAddress: swarm.MustParseHexAddress("e000000000000000000000000000000000000000000000000000000000000000"), // 1110, want peer 0
expectedPeer: 0, expectedPeer: 0,
includeSelf: true,
}, },
{ {
chunkAddress: swarm.MustParseHexAddress("a000000000000000000000000000000000000000000000000000000000000000"), // 1010, want peer 0 chunkAddress: swarm.MustParseHexAddress("a000000000000000000000000000000000000000000000000000000000000000"), // 1010, want peer 0
expectedPeer: 0, expectedPeer: 0,
includeSelf: true,
}, },
{ {
chunkAddress: swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000"), // 0100, want peer 1 chunkAddress: swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000"), // 0100, want peer 1
expectedPeer: 1, expectedPeer: 1,
includeSelf: true,
}, },
{ {
chunkAddress: swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000"), // 0101, want peer 1 chunkAddress: swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000"), // 0101, want peer 1
expectedPeer: 1, expectedPeer: 1,
includeSelf: true,
}, },
{ {
chunkAddress: swarm.MustParseHexAddress("0000001000000000000000000000000000000000000000000000000000000000"), // want self chunkAddress: swarm.MustParseHexAddress("0000001000000000000000000000000000000000000000000000000000000000"), // 1000 want self
expectedPeer: -1, expectedPeer: -1,
includeSelf: true,
},
{
chunkAddress: swarm.MustParseHexAddress("0000001000000000000000000000000000000000000000000000000000000000"), // 1000 want peer 1
expectedPeer: 1, // smallest distance: 2894...
includeSelf: false,
}, },
} { } {
peer, err := kad.ClosestPeer(tc.chunkAddress) peer, err := kad.ClosestPeer(tc.chunkAddress, tc.includeSelf)
if err != nil { if err != nil {
if tc.expectedPeer == -1 && !errors.Is(err, topology.ErrWantSelf) { if tc.expectedPeer == -1 && !errors.Is(err, topology.ErrWantSelf) {
t.Fatalf("wanted %v but got %v", topology.ErrWantSelf, err) t.Fatalf("wanted %v but got %v", topology.ErrWantSelf, err)
...@@ -1010,7 +1022,7 @@ func TestStart(t *testing.T) { ...@@ -1010,7 +1022,7 @@ func TestStart(t *testing.T) {
func newTestKademlia(connCounter, failedConnCounter *int32, kadOpts kademlia.Options) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) { func newTestKademlia(connCounter, failedConnCounter *int32, kadOpts kademlia.Options) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
var ( var (
pk, _ = crypto.GenerateSecp256k1Key() // random private key pk, _ = beeCrypto.GenerateSecp256k1Key() // random private key
signer = beeCrypto.NewDefaultSigner(pk) // signer signer = beeCrypto.NewDefaultSigner(pk) // signer
base = test.RandomAddress() // base address base = test.RandomAddress() // base address
ab = addressbook.New(mockstate.NewStateStore()) // address book ab = addressbook.New(mockstate.NewStateStore()) // address book
......
...@@ -64,7 +64,7 @@ func (m *Mock) AddPeers(ctx context.Context, addr ...swarm.Address) error { ...@@ -64,7 +64,7 @@ func (m *Mock) AddPeers(ctx context.Context, addr ...swarm.Address) error {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
func (m *Mock) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) { func (m *Mock) ClosestPeer(addr swarm.Address, _ bool, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package lightnode
import (
"context"
"sync"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/pslice"
)
type Container struct {
connectedPeers *pslice.PSlice
disconnectedPeers *pslice.PSlice
peerMu sync.Mutex
}
func NewContainer() *Container {
return &Container{
connectedPeers: pslice.New(1),
disconnectedPeers: pslice.New(1),
}
}
const defaultBin = uint8(0)
func (c *Container) Connected(ctx context.Context, peer p2p.Peer) {
c.peerMu.Lock()
defer c.peerMu.Unlock()
addr := peer.Address
c.connectedPeers.Add(addr, defaultBin)
c.disconnectedPeers.Remove(addr, defaultBin)
}
func (c *Container) Disconnected(peer p2p.Peer) {
c.peerMu.Lock()
defer c.peerMu.Unlock()
addr := peer.Address
if found := c.connectedPeers.Exists(addr); found {
c.connectedPeers.Remove(addr, defaultBin)
c.disconnectedPeers.Add(addr, defaultBin)
}
}
func (c *Container) PeerInfo() topology.BinInfo {
return topology.BinInfo{
BinPopulation: uint(c.connectedPeers.Length()),
BinConnected: uint(c.connectedPeers.Length()),
DisconnectedPeers: toAddrs(c.disconnectedPeers),
ConnectedPeers: toAddrs(c.connectedPeers),
}
}
func toAddrs(s *pslice.PSlice) (addrs []string) {
_ = s.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) {
addrs = append(addrs, addr.String())
return false, false, nil
})
return
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package lightnode_test
import (
"context"
"reflect"
"testing"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode"
)
func TestContainer(t *testing.T) {
t.Run("new container is empty container", func(t *testing.T) {
c := lightnode.NewContainer()
var empty topology.BinInfo
if !reflect.DeepEqual(empty, c.PeerInfo()) {
t.Errorf("expected %v, got %v", empty, c.PeerInfo())
}
})
t.Run("can add peers to container", func(t *testing.T) {
c := lightnode.NewContainer()
c.Connected(context.Background(), p2p.Peer{Address: swarm.NewAddress([]byte("123"))})
c.Connected(context.Background(), p2p.Peer{Address: swarm.NewAddress([]byte("456"))})
peerCount := len(c.PeerInfo().ConnectedPeers)
if peerCount != 2 {
t.Errorf("expected %d connected peer, got %d", 2, peerCount)
}
})
t.Run("empty container after peer disconnect", func(t *testing.T) {
c := lightnode.NewContainer()
peer := p2p.Peer{Address: swarm.NewAddress([]byte("123"))}
c.Connected(context.Background(), peer)
c.Disconnected(peer)
discPeerCount := len(c.PeerInfo().DisconnectedPeers)
if discPeerCount != 1 {
t.Errorf("expected %d connected peer, got %d", 1, discPeerCount)
}
connPeerCount := len(c.PeerInfo().ConnectedPeers)
if connPeerCount != 0 {
t.Errorf("expected %d connected peer, got %d", 0, connPeerCount)
}
})
}
...@@ -92,7 +92,7 @@ func (d *mock) Peers() []swarm.Address { ...@@ -92,7 +92,7 @@ func (d *mock) Peers() []swarm.Address {
return d.peers return d.peers
} }
func (d *mock) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) { func (d *mock) ClosestPeer(addr swarm.Address, _ bool, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
if len(skipPeers) == 0 { if len(skipPeers) == 0 {
if d.closestPeerErr != nil { if d.closestPeerErr != nil {
return d.closestPeer, d.closestPeerErr return d.closestPeer, d.closestPeerErr
......
...@@ -8,9 +8,9 @@ import ( ...@@ -8,9 +8,9 @@ import (
"errors" "errors"
"testing" "testing"
"github.com/ethersphere/bee/pkg/kademlia/pslice"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test" "github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/topology/pslice"
) )
// TestShallowestEmpty tests that ShallowestEmpty functionality works correctly. // TestShallowestEmpty tests that ShallowestEmpty functionality works correctly.
......
...@@ -42,7 +42,7 @@ type ClosestPeerer interface { ...@@ -42,7 +42,7 @@ type ClosestPeerer interface {
// given chunk address. // given chunk address.
// This function will ignore peers with addresses provided in skipPeers. // This function will ignore peers with addresses provided in skipPeers.
// Returns topology.ErrWantSelf in case base is the closest to the address. // Returns topology.ErrWantSelf in case base is the closest to the address.
ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) ClosestPeer(addr swarm.Address, includeSelf bool, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error)
} }
type EachPeerer interface { type EachPeerer interface {
...@@ -114,4 +114,5 @@ type KadParams struct { ...@@ -114,4 +114,5 @@ type KadParams struct {
NNLowWatermark int `json:"nnLowWatermark"` // low watermark for depth calculation NNLowWatermark int `json:"nnLowWatermark"` // low watermark for depth calculation
Depth uint8 `json:"depth"` // current depth Depth uint8 `json:"depth"` // current depth
Bins KadBins `json:"bins"` // individual bin info Bins KadBins `json:"bins"` // individual bin info
LightNodes BinInfo `json:"lightNodes"` // light nodes bin info
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment