hive.go 5.87 KB
Newer Older
Petar Radovic's avatar
Petar Radovic committed
1 2 3 4
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

5 6 7 8 9 10
// Package hive exposes the hive protocol implementation
// which is the discovery protocol used to inform and be
// informed about other peers in the network. It gossips
// about all peers by default and performs no specific
// prioritization about which peers are gossipped to
// others.
Petar Radovic's avatar
Petar Radovic committed
11 12 13 14
package hive

import (
	"context"
15
	"errors"
Petar Radovic's avatar
Petar Radovic committed
16
	"fmt"
17
	"sync"
Petar Radovic's avatar
Petar Radovic committed
18 19 20
	"time"

	"github.com/ethersphere/bee/pkg/addressbook"
21
	"github.com/ethersphere/bee/pkg/bzz"
Petar Radovic's avatar
Petar Radovic committed
22 23 24 25 26
	"github.com/ethersphere/bee/pkg/hive/pb"
	"github.com/ethersphere/bee/pkg/logging"
	"github.com/ethersphere/bee/pkg/p2p"
	"github.com/ethersphere/bee/pkg/p2p/protobuf"
	"github.com/ethersphere/bee/pkg/swarm"
27
	ma "github.com/multiformats/go-multiaddr"
28 29

	"golang.org/x/time/rate"
Petar Radovic's avatar
Petar Radovic committed
30 31 32 33 34 35
)

const (
	protocolName    = "hive"
	protocolVersion = "1.0.0"
	peersStreamName = "peers"
36
	messageTimeout  = 1 * time.Minute // maximum allowed time for a message to be read or written.
37
	maxBatchSize    = 30
Petar Radovic's avatar
Petar Radovic committed
38 39
)

40 41 42 43 44 45
var (
	ErrRateLimitExceeded = errors.New("rate limit exceeded")
	limitBurst           = 4 * int(swarm.MaxBins)
	limitRate            = rate.Every(time.Minute)
)

Petar Radovic's avatar
Petar Radovic committed
46
type Service struct {
47 48
	streamer        p2p.Streamer
	addressBook     addressbook.GetPutter
49
	addPeersHandler func(...swarm.Address)
50 51
	networkID       uint64
	logger          logging.Logger
acud's avatar
acud committed
52
	metrics         metrics
53 54
	limiter         map[string]*rate.Limiter
	limiterLock     sync.Mutex
Petar Radovic's avatar
Petar Radovic committed
55 56
}

57
func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service {
Petar Radovic's avatar
Petar Radovic committed
58
	return &Service{
59 60 61 62
		streamer:    streamer,
		logger:      logger,
		addressBook: addressbook,
		networkID:   networkID,
acud's avatar
acud committed
63
		metrics:     newMetrics(),
64
		limiter:     make(map[string]*rate.Limiter),
Petar Radovic's avatar
Petar Radovic committed
65 66 67 68 69 70 71 72 73 74 75 76 77
	}
}

func (s *Service) Protocol() p2p.ProtocolSpec {
	return p2p.ProtocolSpec{
		Name:    protocolName,
		Version: protocolVersion,
		StreamSpecs: []p2p.StreamSpec{
			{
				Name:    peersStreamName,
				Handler: s.peersHandler,
			},
		},
78 79
		DisconnectIn:  s.disconnect,
		DisconnectOut: s.disconnect,
Petar Radovic's avatar
Petar Radovic committed
80 81 82
	}
}

83
func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error {
Petar Radovic's avatar
Petar Radovic committed
84
	max := maxBatchSize
acud's avatar
acud committed
85 86 87
	s.metrics.BroadcastPeers.Inc()
	s.metrics.BroadcastPeersPeers.Add(float64(len(peers)))

Petar Radovic's avatar
Petar Radovic committed
88 89 90 91 92 93 94 95 96 97 98 99 100 101
	for len(peers) > 0 {
		if max > len(peers) {
			max = len(peers)
		}
		if err := s.sendPeers(ctx, addressee, peers[:max]); err != nil {
			return err
		}

		peers = peers[max:]
	}

	return nil
}

102
func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) {
103
	s.addPeersHandler = h
104 105
}

106
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) {
acud's avatar
acud committed
107
	s.metrics.BroadcastPeersSends.Inc()
108
	stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, peersStreamName)
Petar Radovic's avatar
Petar Radovic committed
109 110 111
	if err != nil {
		return fmt.Errorf("new stream: %w", err)
	}
112 113 114 115
	defer func() {
		if err != nil {
			_ = stream.Reset()
		} else {
116 117 118
			// added this because Recorder (unit test) emits an unnecessary EOF when Close is called
			time.Sleep(time.Millisecond * 50)
			_ = stream.Close()
119 120
		}
	}()
Petar Radovic's avatar
Petar Radovic committed
121 122 123
	w, _ := protobuf.NewWriterAndReader(stream)
	var peersRequest pb.Peers
	for _, p := range peers {
124 125
		addr, err := s.addressBook.Get(p)
		if err != nil {
126 127
			if err == addressbook.ErrNotFound {
				s.logger.Debugf("hive broadcast peers: peer not found in the addressbook. Skipping peer %s", p)
128 129 130
				continue
			}
			return err
131 132
		}

Petar Radovic's avatar
Petar Radovic committed
133
		peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{
134 135 136 137
			Overlay:     addr.Overlay.Bytes(),
			Underlay:    addr.Underlay.Bytes(),
			Signature:   addr.Signature,
			Transaction: addr.Transaction,
Petar Radovic's avatar
Petar Radovic committed
138 139 140
		})
	}

141
	if err := w.WriteMsgWithContext(ctx, &peersRequest); err != nil {
Petar Radovic's avatar
Petar Radovic committed
142 143 144
		return fmt.Errorf("write Peers message: %w", err)
	}

145
	return nil
Petar Radovic's avatar
Petar Radovic committed
146 147
}

148
func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
acud's avatar
acud committed
149
	s.metrics.PeersHandler.Inc()
Petar Radovic's avatar
Petar Radovic committed
150
	_, r := protobuf.NewWriterAndReader(stream)
151 152
	ctx, cancel := context.WithTimeout(ctx, messageTimeout)
	defer cancel()
Petar Radovic's avatar
Petar Radovic committed
153
	var peersReq pb.Peers
154
	if err := r.ReadMsgWithContext(ctx, &peersReq); err != nil {
155
		_ = stream.Reset()
Petar Radovic's avatar
Petar Radovic committed
156 157 158
		return fmt.Errorf("read requestPeers message: %w", err)
	}

acud's avatar
acud committed
159 160
	s.metrics.PeersHandlerPeers.Add(float64(len(peersReq.Peers)))

161 162 163 164 165
	if err := s.rateLimitPeer(peer.Address, len(peersReq.Peers)); err != nil {
		_ = stream.Reset()
		return err
	}

166
	// close the stream before processing in order to unblock the sending side
Michelle Plur's avatar
Michelle Plur committed
167
	// fullclose is called async because there is no need to wait for confirmation,
168 169 170
	// but we still want to handle not closed stream from the other side to avoid zombie stream
	go stream.FullClose()

171
	var peers []swarm.Address
Petar Radovic's avatar
Petar Radovic committed
172
	for _, newPeer := range peersReq.Peers {
173 174

		multiUnderlay, err := ma.NewMultiaddrBytes(newPeer.Underlay)
Petar Radovic's avatar
Petar Radovic committed
175
		if err != nil {
176
			s.logger.Errorf("hive: multi address underlay err: %v", err)
Petar Radovic's avatar
Petar Radovic committed
177 178 179
			continue
		}

180 181 182 183 184 185 186 187
		bzzAddress := bzz.Address{
			Overlay:     swarm.NewAddress(newPeer.Overlay),
			Underlay:    multiUnderlay,
			Signature:   newPeer.Signature,
			Transaction: newPeer.Transaction,
		}

		err = s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
188
		if err != nil {
189
			s.logger.Warningf("skipping peer in response %s: %v", newPeer.String(), err)
190
			continue
191
		}
192

193 194 195 196
		peers = append(peers, bzzAddress.Overlay)
	}

	if s.addPeersHandler != nil {
197
		s.addPeersHandler(peers...)
Petar Radovic's avatar
Petar Radovic committed
198 199 200 201
	}

	return nil
}
202 203 204 205 206 207

func (s *Service) rateLimitPeer(peer swarm.Address, count int) error {

	s.limiterLock.Lock()
	defer s.limiterLock.Unlock()

208
	addr := peer.ByteString()
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230

	limiter, ok := s.limiter[addr]
	if !ok {
		limiter = rate.NewLimiter(limitRate, limitBurst)
		s.limiter[addr] = limiter
	}

	if limiter.AllowN(time.Now(), count) {
		return nil
	}

	return ErrRateLimitExceeded
}

func (s *Service) disconnect(peer p2p.Peer) error {
	s.limiterLock.Lock()
	defer s.limiterLock.Unlock()

	delete(s.limiter, peer.Address.String())

	return nil
}