Commit f245cc88 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Block potentially misbehaving peers (#628)

- blocklisting in p2p layer and libp2p implementation
parent db47d88c
...@@ -138,7 +138,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service, ...@@ -138,7 +138,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
addressbook := addressbook.New(stateStore) addressbook := addressbook.New(stateStore)
signer := crypto.NewDefaultSigner(swarmPrivateKey) signer := crypto.NewDefaultSigner(swarmPrivateKey)
p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, logger, tracer, libp2p.Options{ p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey, PrivateKey: libp2pPrivateKey,
NATAddr: o.NATAddr, NATAddr: o.NATAddr,
EnableWS: o.EnableWS, EnableWS: o.EnableWS,
......
...@@ -65,6 +65,34 @@ func (e *DisconnectError) Error() string { ...@@ -65,6 +65,34 @@ func (e *DisconnectError) Error() string {
return e.err.Error() return e.err.Error()
} }
type BlockPeerError struct {
duration time.Duration
err error
}
// NewBlockPeerError wraps error and creates a special error that is treated specially
// by p2p. It causes peer to be disconnected and blocks any new connection for this peer for the provided duration.
func NewBlockPeerError(duration time.Duration, err error) error {
return &BlockPeerError{
duration: duration,
err: err,
}
}
// Unwrap returns an underlying error.
func (e *BlockPeerError) Unwrap() error { return e.err }
// Error implements function of the standard go error interface.
func (e *BlockPeerError) Error() string {
return e.err.Error()
}
// Duration represents the period for which the peer will be blocked.
// 0 duration is treated as infinity
func (e *BlockPeerError) Duration() time.Duration {
return e.duration
}
// IncompatibleStreamError is the error that should be returned by p2p service // IncompatibleStreamError is the error that should be returned by p2p service
// NewStream method when the stream or its version is not supported. // NewStream method when the stream or its version is not supported.
type IncompatibleStreamError struct { type IncompatibleStreamError struct {
......
...@@ -458,6 +458,48 @@ func TestTopologySupportMultipleNotifiers(t *testing.T) { ...@@ -458,6 +458,48 @@ func TestTopologySupportMultipleNotifiers(t *testing.T) {
waitAddrSet(t, &n22connectedAddr, &mtx, overlay2) waitAddrSet(t, &n22connectedAddr, &mtx, overlay2)
} }
func TestBlocklisting(t *testing.T) {
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
addr1 := serviceUnderlayAddress(t, s1)
addr2 := serviceUnderlayAddress(t, s2)
// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err := s2.Connect(context.Background(), addr1)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if err := s2.Blocklist(overlay1, 0); err != nil {
t.Fatal(err)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err = s2.Connect(context.Background(), addr1)
if err == nil {
t.Fatal("expected error during connection, got nil")
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err = s1.Connect(context.Background(), addr2)
if err == nil {
t.Fatal("expected error during connection, got nil")
}
expectPeers(t, s1)
expectPeersEventually(t, s2)
}
func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.Address) { func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.Address) {
t.Helper() t.Helper()
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blocklist
import (
"time"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var keyPrefix = "blocklist-"
// timeNow is used to deterministically mock time.Now() in tests.
var timeNow = time.Now
type Blocklist struct {
store storage.StateStorer
}
func NewBlocklist(store storage.StateStorer) *Blocklist {
return &Blocklist{store: store}
}
type entry struct {
Timestamp time.Time `json:"timestamp"`
Duration string `json:"duration"` // Duration is string because the time.Duration does not implement MarshalJSON/UnmarshalJSON methods.
}
func (b *Blocklist) Exists(overlay swarm.Address) (bool, error) {
key := generateKey(overlay)
timestamp, duration, err := b.get(key)
if err != nil {
if err == storage.ErrNotFound {
return false, nil
}
return false, err
}
// using timeNow.Sub() so it can be mocked in unit tests
if timeNow().Sub(timestamp) > duration && duration != 0 {
_ = b.store.Delete(key)
return false, nil
}
return true, nil
}
func (b *Blocklist) Add(overlay swarm.Address, duration time.Duration) (err error) {
key := generateKey(overlay)
_, d, err := b.get(key)
if err != nil {
if err != storage.ErrNotFound {
return err
}
}
// if peer is already blacklisted, blacklist it for the maximum amount of time
if duration < d && duration != 0 || d == 0 {
duration = d
}
return b.store.Put(key, &entry{
Timestamp: timeNow(),
Duration: duration.String(),
})
}
func (b *Blocklist) get(key string) (timestamp time.Time, duration time.Duration, err error) {
var e entry
if err := b.store.Get(key, &e); err != nil {
return time.Time{}, -1, err
}
duration, err = time.ParseDuration(e.Duration)
if err != nil {
return time.Time{}, -1, err
}
return e.Timestamp, duration, nil
}
func generateKey(overlay swarm.Address) string {
return keyPrefix + overlay.String()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blocklist_test
import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/blocklist"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestExist(t *testing.T) {
addr1 := swarm.NewAddress([]byte{0, 1, 2, 3})
addr2 := swarm.NewAddress([]byte{4, 5, 6, 7})
bl := blocklist.NewBlocklist(mock.NewStateStore())
exists, err := bl.Exists(addr1)
if err != nil {
t.Fatal(err)
}
if exists {
t.Fatal("got exists, expected not exists")
}
// add forever
if err := bl.Add(addr1, 0); err != nil {
t.Fatal(err)
}
// add for 50 miliseconds
if err := bl.Add(addr2, time.Millisecond*50); err != nil {
t.Fatal(err)
}
blocklist.SetTimeNow(func() time.Time { return time.Now().Add(100 * time.Millisecond) })
exists, err = bl.Exists(addr1)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatal("got not exists, expected exists")
}
exists, err = bl.Exists(addr2)
if err != nil {
t.Fatal(err)
}
if exists {
t.Fatal("got exists, expected not exists")
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blocklist
import "time"
func SetTimeNow(f func() time.Time) {
timeNow = f
}
...@@ -10,14 +10,17 @@ import ( ...@@ -10,14 +10,17 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"time"
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz" "github.com/ethersphere/bee/pkg/bzz"
beecrypto "github.com/ethersphere/bee/pkg/crypto" beecrypto "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/blocklist"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/breaker" "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/breaker"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
...@@ -55,6 +58,7 @@ type Service struct { ...@@ -55,6 +58,7 @@ type Service struct {
peers *peerRegistry peers *peerRegistry
topologyNotifiers []topology.Notifier topologyNotifiers []topology.Notifier
connectionBreaker breaker.Interface connectionBreaker breaker.Interface
blocklist *blocklist.Blocklist
logger logging.Logger logger logging.Logger
tracer *tracing.Tracer tracer *tracing.Tracer
} }
...@@ -69,7 +73,7 @@ type Options struct { ...@@ -69,7 +73,7 @@ type Options struct {
WelcomeMessage string WelcomeMessage string
} }
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) { func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) {
host, port, err := net.SplitHostPort(addr) host, port, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
return nil, fmt.Errorf("address: %w", err) return nil, fmt.Errorf("address: %w", err)
...@@ -199,6 +203,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -199,6 +203,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
networkID: networkID, networkID: networkID,
peers: peerRegistry, peers: peerRegistry,
addressbook: ab, addressbook: ab,
blocklist: blocklist.NewBlocklist(storer),
logger: logger, logger: logger,
tracer: tracer, tracer: tracer,
connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options
...@@ -223,6 +228,20 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -223,6 +228,20 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return return
} }
blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay)
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", peerID, err)
s.logger.Errorf("internal error while connecting with peer %s", peerID)
_ = s.disconnect(peerID)
return
}
if blocked {
s.logger.Errorf("blocked connection from blocklisted peer %s", peerID)
_ = s.disconnect(peerID)
return
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err = handshakeStream.FullClose(); err != nil { if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
...@@ -236,6 +255,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -236,6 +255,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.disconnect(peerID) _ = s.disconnect(peerID)
return
} }
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
...@@ -311,8 +331,20 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -311,8 +331,20 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
if err := ss.Handler(ctx, p2p.Peer{Address: overlay}, stream); err != nil { if err := ss.Handler(ctx, p2p.Peer{Address: overlay}, stream); err != nil {
var e *p2p.DisconnectError var de *p2p.DisconnectError
if errors.As(err, &e) { if errors.As(err, &de) {
_ = s.Disconnect(overlay)
}
var bpe *p2p.BlockPeerError
if errors.As(err, &bpe) {
if err := s.blocklist.Add(overlay, bpe.Duration()); err != nil {
s.logger.Debugf("blocklist: could blocklist peer %s: %v", peerID, err)
s.logger.Errorf("unable to blocklist peer %v", peerID)
_ = s.Disconnect(overlay)
}
s.logger.Trace("blocklisted a peer %s", peerID)
_ = s.Disconnect(overlay) _ = s.Disconnect(overlay)
} }
...@@ -341,6 +373,17 @@ func (s *Service) NATManager() basichost.NATManager { ...@@ -341,6 +373,17 @@ func (s *Service) NATManager() basichost.NATManager {
return s.natManager return s.natManager
} }
func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error {
if err := s.blocklist.Add(overlay, duration); err != nil {
s.logger.Debugf("blocklist: blocklist peer %s: %v", overlay, err)
_ = s.Disconnect(overlay)
return err
}
_ = s.Disconnect(overlay)
return nil
}
func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error) { func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error) {
// Build host multiaddress // Build host multiaddress
hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty())) hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty()))
...@@ -383,6 +426,20 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -383,6 +426,20 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("handshake: %w", err) return nil, fmt.Errorf("handshake: %w", err)
} }
blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay)
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err)
s.logger.Errorf("internal error while connecting with peer %s", info.ID)
_ = s.disconnect(info.ID)
return nil, fmt.Errorf("peer blocklisted")
}
if blocked {
s.logger.Errorf("blocked connection from blocklisted peer %s", info.ID)
_ = s.disconnect(info.ID)
return nil, fmt.Errorf("peer blocklisted")
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err := handshakeStream.FullClose(); err != nil { if err := handshakeStream.FullClose(); err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
......
...@@ -50,8 +50,8 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p. ...@@ -50,8 +50,8 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p.
o.Logger = logging.New(ioutil.Discard, 0) o.Logger = logging.New(ioutil.Discard, 0)
} }
statestore := mock.NewStateStore()
if o.Addressbook == nil { if o.Addressbook == nil {
statestore := mock.NewStateStore()
o.Addressbook = addressbook.New(statestore) o.Addressbook = addressbook.New(statestore)
} }
...@@ -65,7 +65,7 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p. ...@@ -65,7 +65,7 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p.
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s, err = libp2p.New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, o.Addressbook, o.Logger, nil, o.libp2pOpts) s, err = libp2p.New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, o.Addressbook, statestore, o.Logger, nil, o.libp2pOpts)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -7,6 +7,7 @@ package mock ...@@ -7,6 +7,7 @@ package mock
import ( import (
"context" "context"
"errors" "errors"
"time"
"github.com/ethersphere/bee/pkg/bzz" "github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
...@@ -25,6 +26,7 @@ type Service struct { ...@@ -25,6 +26,7 @@ type Service struct {
addressesFunc func() ([]ma.Multiaddr, error) addressesFunc func() ([]ma.Multiaddr, error)
setWelcomeMessageFunc func(string) error setWelcomeMessageFunc func(string) error
getWelcomeMessageFunc func() string getWelcomeMessageFunc func() string
blocklistFunc func(swarm.Address, time.Duration) error
welcomeMessage string welcomeMessage string
} }
...@@ -84,6 +86,12 @@ func WithSetWelcomeMessageFunc(f func(string) error) Option { ...@@ -84,6 +86,12 @@ func WithSetWelcomeMessageFunc(f func(string) error) Option {
}) })
} }
func WithBlocklistFunc(f func(swarm.Address, time.Duration) error) Option {
return optionFunc(func(s *Service) {
s.blocklistFunc = f
})
}
// New will create a new mock P2P Service with the given options // New will create a new mock P2P Service with the given options
func New(opts ...Option) *Service { func New(opts ...Option) *Service {
s := new(Service) s := new(Service)
...@@ -151,6 +159,13 @@ func (s *Service) GetWelcomeMessage() string { ...@@ -151,6 +159,13 @@ func (s *Service) GetWelcomeMessage() string {
return s.welcomeMessage return s.welcomeMessage
} }
func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error {
if s.blocklistFunc == nil {
return errors.New("function blocklist not configured")
}
return s.blocklistFunc(overlay, duration)
}
type Option interface { type Option interface {
apply(*Service) apply(*Service)
} }
......
...@@ -7,6 +7,7 @@ package p2p ...@@ -7,6 +7,7 @@ package p2p
import ( import (
"context" "context"
"io" "io"
"time"
"github.com/ethersphere/bee/pkg/bzz" "github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -20,6 +21,9 @@ type Service interface { ...@@ -20,6 +21,9 @@ type Service interface {
// Connect to a peer but do not notify topology about the established connection. // Connect to a peer but do not notify topology about the established connection.
Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnect(overlay swarm.Address) error Disconnect(overlay swarm.Address) error
// Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration
// duration 0 is treated as an infinite duration
Blocklist(overlay swarm.Address, duration time.Duration) error
Peers() []Peer Peers() []Peer
AddNotifier(topology.Notifier) AddNotifier(topology.Notifier)
Addresses() ([]ma.Multiaddr, error) Addresses() ([]ma.Multiaddr, error)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment