pushsync.go 7.25 KB
Newer Older
1 2 3 4 5
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package pushsync
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/ethersphere/bee/pkg/logging"
	"github.com/ethersphere/bee/pkg/p2p"
	"github.com/ethersphere/bee/pkg/p2p/protobuf"
	"github.com/ethersphere/bee/pkg/pushsync/pb"
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
	"github.com/ethersphere/bee/pkg/topology"
)

const (
	protocolName    = "pushsync"
	protocolVersion = "1.0.0"
	streamName      = "pushsync"
)

28 29 30 31 32 33 34 35
type PushSyncer interface {
	PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error)
}

type Receipt struct {
	Address swarm.Address
}

36
type PushSync struct {
37 38 39 40 41
	streamer      p2p.Streamer
	storer        storage.Putter
	peerSuggester topology.ClosestPeerer
	logger        logging.Logger
	metrics       metrics
42 43 44 45
}

type Options struct {
	Streamer      p2p.Streamer
46
	Storer        storage.Putter
47 48 49 50
	ClosestPeerer topology.ClosestPeerer
	Logger        logging.Logger
}

51
var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
52 53 54

func New(o Options) *PushSync {
	ps := &PushSync{
55 56 57 58 59
		streamer:      o.Streamer,
		storer:        o.Storer,
		peerSuggester: o.ClosestPeerer,
		logger:        o.Logger,
		metrics:       newMetrics(),
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
	}
	return ps
}

func (s *PushSync) Protocol() p2p.ProtocolSpec {
	return p2p.ProtocolSpec{
		Name:    protocolName,
		Version: protocolVersion,
		StreamSpecs: []p2p.StreamSpec{
			{
				Name:    streamName,
				Handler: s.handler,
			},
		},
	}
}

77 78
// handler handles chunk delivery from other node and forwards to its destination node.
// If the current node is the destination, it stores in the local store and sends a receipt.
79
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
80
	w, r := protobuf.NewWriterAndReader(stream)
81 82
	defer stream.Close()

83 84 85
	// Get the delivery
	chunk, err := ps.getChunkDelivery(r)
	if err != nil {
86
		return fmt.Errorf("chunk delivery from peer %s: %w", p.Address.String(), err)
87 88 89 90 91 92 93
	}

	// Select the closest peer to forward the chunk
	peer, err := ps.peerSuggester.ClosestPeer(chunk.Address())
	if err != nil {
		// If i am the closest peer then store the chunk and send receipt
		if errors.Is(err, topology.ErrWantSelf) {
94

95 96 97 98 99 100 101
			// Store the chunk in the local store
			_, err := ps.storer.Put(ctx, storage.ModePutSync, chunk)
			if err != nil {
				return fmt.Errorf("chunk store: %w", err)
			}
			ps.metrics.TotalChunksStoredInDB.Inc()

102
			// Send a receipt immediately once the storage of the chunk is successfully
103 104 105
			receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
			err = ps.sendReceipt(w, receipt)
			if err != nil {
106
				return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
107
			}
108 109 110 111 112
			return nil
		}
		return err
	}

113 114 115 116 117 118 119 120 121 122 123
	// This is a special situation in that the other peer thinks thats we are the closest node
	// and we think that the sending peer
	if p.Address.Equal(peer) {

		// Store the chunk in the local store
		_, err := ps.storer.Put(ctx, storage.ModePutSync, chunk)
		if err != nil {
			return fmt.Errorf("chunk store: %w", err)
		}
		ps.metrics.TotalChunksStoredInDB.Inc()

124
		// Send a receipt immediately once the storage of the chunk is successfully
125 126 127 128 129 130
		receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
		return ps.sendReceipt(w, receipt)
	}

	// Forward chunk to closest peer
	streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
131
	if err != nil {
132
		return fmt.Errorf("new stream peer %s: %w", peer.String(), err)
133 134 135 136 137 138
	}
	defer streamer.Close()

	wc, rc := protobuf.NewWriterAndReader(streamer)

	if err := ps.sendChunkDelivery(wc, chunk); err != nil {
139
		return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err)
140
	}
141
	receiptRTTTimer := time.Now()
142

143
	receipt, err := ps.receiveReceipt(rc)
144
	if err != nil {
145
		return fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
146 147 148 149 150 151
	}
	ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())

	// Check if the receipt is valid
	if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) {
		ps.metrics.InvalidReceiptReceived.Inc()
152
		return fmt.Errorf("invalid receipt from peer %s", peer.String())
153 154 155 156 157
	}

	// pass back the received receipt in the previously received stream
	err = ps.sendReceipt(w, &receipt)
	if err != nil {
158
		return fmt.Errorf("send receipt to peer %s: %w", peer.String(), err)
159
	}
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
	ps.metrics.ReceiptsSentCounter.Inc()

	return nil
}

func (ps *PushSync) getChunkDelivery(r protobuf.Reader) (chunk swarm.Chunk, err error) {
	var ch pb.Delivery
	if err = r.ReadMsg(&ch); err != nil {
		ps.metrics.ReceivedChunkErrorCounter.Inc()
		return nil, err
	}
	ps.metrics.ChunksSentCounter.Inc()

	// create chunk
	addr := swarm.NewAddress(ch.Address)
	chunk = swarm.NewChunk(addr, ch.Data)
	return chunk, nil
}

func (ps *PushSync) sendChunkDelivery(w protobuf.Writer, chunk swarm.Chunk) (err error) {
	startTimer := time.Now()
	if err = w.WriteMsgWithTimeout(timeToWaitForReceipt, &pb.Delivery{
		Address: chunk.Address().Bytes(),
		Data:    chunk.Data(),
	}); err != nil {
185 186 187
		ps.metrics.SendChunkErrorCounter.Inc()
		return err
	}
188 189 190 191 192 193 194 195 196 197 198 199 200
	ps.metrics.SendChunkTimer.Observe(time.Since(startTimer).Seconds())
	ps.metrics.ChunksSentCounter.Inc()
	return nil
}

func (ps *PushSync) sendReceipt(w protobuf.Writer, receipt *pb.Receipt) (err error) {
	if err := w.WriteMsg(receipt); err != nil {
		ps.metrics.SendReceiptErrorCounter.Inc()
		return err
	}
	ps.metrics.ReceiptsSentCounter.Inc()
	return nil
}
201

202 203 204 205 206 207 208
func (ps *PushSync) receiveReceipt(r protobuf.Reader) (receipt pb.Receipt, err error) {
	if err := r.ReadMsg(&receipt); err != nil {
		ps.metrics.ReceiveReceiptErrorCounter.Inc()
		return receipt, err
	}
	ps.metrics.ReceiptsReceivedCounter.Inc()
	return receipt, nil
209 210
}

211 212 213 214 215 216 217 218 219 220 221
// PushChunkToClosest sends chunk to the closest peer by opening a stream. It then waits for
// a receipt from that peer and returns error or nil based on the receiving and
// the validity of the receipt.
func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) {
	peer, err := ps.peerSuggester.ClosestPeer(ch.Address())
	if err != nil {
		if errors.Is(err, topology.ErrWantSelf) {
			// if you are the closest node return a receipt immediately
			return &Receipt{
				Address: ch.Address(),
			}, nil
222
		}
223
		return nil, fmt.Errorf("closest peer: %w", err)
224 225 226 227
	}

	streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
	if err != nil {
228
		return nil, fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
229 230 231
	}
	defer streamer.Close()

232 233
	w, r := protobuf.NewWriterAndReader(streamer)
	if err := ps.sendChunkDelivery(w, ch); err != nil {
234
		return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err)
235 236
	}
	receiptRTTTimer := time.Now()
237

238 239
	receipt, err := ps.receiveReceipt(r)
	if err != nil {
240
		return nil, fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
241
	}
242
	ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
243

244 245 246
	// Check if the receipt is valid
	if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
		ps.metrics.InvalidReceiptReceived.Inc()
247
		return nil, fmt.Errorf("invalid receipt. peer %s", peer.String())
248 249
	}

250 251
	rec := &Receipt{
		Address: swarm.NewAddress(receipt.Address),
252 253
	}

254
	return rec, nil
255
}