Commit 5fb8cd84 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

[bee #118] Receipts for pushsync (#122)

* pushsync: add receipt forwarding logic
Co-authored-by: default avataracud <12988138+acud@users.noreply.github.com>
parent 8e958f02
...@@ -14,29 +14,58 @@ type metrics struct { ...@@ -14,29 +14,58 @@ type metrics struct {
// to be able to return them by Metrics() // to be able to return them by Metrics()
// using reflection // using reflection
SendChunkCounter prometheus.Counter TotalChunksToBeSentCounter prometheus.Counter
SendChunkTimer prometheus.Counter TotalChunksSynced prometheus.Counter
TotalChunksStoredInDB prometheus.Counter
ChunksSentCounter prometheus.Counter
ChunksReceivedCounter prometheus.Counter
SendChunkErrorCounter prometheus.Counter SendChunkErrorCounter prometheus.Counter
MarkAndSweepTimer prometheus.Counter ReceivedChunkErrorCounter prometheus.Counter
ReceiptsReceivedCounter prometheus.Counter
ChunksInBatch prometheus.Gauge ReceiptsSentCounter prometheus.Counter
SendReceiptErrorCounter prometheus.Counter
ReceiveReceiptErrorCounter prometheus.Counter
ErrorSettingChunkToSynced prometheus.Counter
RetriesExhaustedCounter prometheus.Counter
InvalidReceiptReceived prometheus.Counter
SendChunkTimer prometheus.Histogram
ReceiptRTT prometheus.Histogram
MarkAndSweepTimer prometheus.Histogram
} }
func newMetrics() metrics { func newMetrics() metrics {
subsystem := "pushsync" subsystem := "pushsync"
return metrics{ return metrics{
SendChunkCounter: prometheus.NewCounter(prometheus.CounterOpts{ TotalChunksToBeSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "send_chunk", Name: "total_chunk_to_be_sent",
Help: "Total chunks to be sent.", Help: "Total chunks to be sent.",
}), }),
SendChunkTimer: prometheus.NewCounter(prometheus.CounterOpts{ TotalChunksSynced: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_chunk_synced",
Help: "Total chunks synced succesfully with valid receipts.",
}),
TotalChunksStoredInDB: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_chunk_stored_in_DB",
Help: "Total chunks stored succesfully in local store.",
}),
ChunksSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sent_chunk",
Help: "Total chunks sent.",
}),
ChunksReceivedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "send_chunk_time_taken", Name: "received_chunk",
Help: "Total time taken to send a chunk.", Help: "Total chunks received.",
}), }),
SendChunkErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{ SendChunkErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
...@@ -44,18 +73,75 @@ func newMetrics() metrics { ...@@ -44,18 +73,75 @@ func newMetrics() metrics {
Name: "send_chunk_error", Name: "send_chunk_error",
Help: "Total no of time error received while sending chunk.", Help: "Total no of time error received while sending chunk.",
}), }),
MarkAndSweepTimer: prometheus.NewCounter(prometheus.CounterOpts{ ReceivedChunkErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "received_chunk_error",
Help: "Total no of time error received while receiving chunk.",
}),
ReceiptsReceivedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "received_receipts",
Help: "Total no of times receipts received.",
}),
ReceiptsSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sent_receipts",
Help: "Total no of times receipts are sent.",
}),
SendReceiptErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "mark_and_sweep_time", Name: "sent_receipts_error",
Help: "Total time spent in mark and sweep.", Help: "Total no of times receipts were sent and error was encountered.",
}),
ReceiveReceiptErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "receive_receipt_error",
Help: "Total no of time error received while receiving receipt.",
}), }),
ChunksInBatch: prometheus.NewGauge(prometheus.GaugeOpts{ ErrorSettingChunkToSynced: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "cannot_set_chunk_sync_in_DB",
Help: "Total no of times the chunk cannot be synced in DB.",
}),
RetriesExhaustedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_retries_exhausted",
Help: "CHunk retries exhausted.",
}),
InvalidReceiptReceived: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "invalid_receipt_receipt",
Help: "Invalid receipt received from peer.",
}),
SendChunkTimer: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "send_chunk_time_histogram",
Help: "Histogram for Time taken to send a chunk.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
MarkAndSweepTimer: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "mark_and_sweep_time_histogram",
Help: "Histogram of time spent in mark and sweep.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
ReceiptRTT: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "chunks_in_batch", Name: "receipt_rtt_histogram",
Help: "Chunks in batch at a given time.", Help: "Histogram of RTT for receiving receipt for a pushed chunk.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}), }),
} }
} }
......
...@@ -74,22 +74,68 @@ func (m *Delivery) GetData() []byte { ...@@ -74,22 +74,68 @@ func (m *Delivery) GetData() []byte {
return nil return nil
} }
type Receipt struct {
Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
}
func (m *Receipt) Reset() { *m = Receipt{} }
func (m *Receipt) String() string { return proto.CompactTextString(m) }
func (*Receipt) ProtoMessage() {}
func (*Receipt) Descriptor() ([]byte, []int) {
return fileDescriptor_723cf31bfc02bfd6, []int{1}
}
func (m *Receipt) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Receipt) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Receipt.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Receipt) XXX_Merge(src proto.Message) {
xxx_messageInfo_Receipt.Merge(m, src)
}
func (m *Receipt) XXX_Size() int {
return m.Size()
}
func (m *Receipt) XXX_DiscardUnknown() {
xxx_messageInfo_Receipt.DiscardUnknown(m)
}
var xxx_messageInfo_Receipt proto.InternalMessageInfo
func (m *Receipt) GetAddress() []byte {
if m != nil {
return m.Address
}
return nil
}
func init() { func init() {
proto.RegisterType((*Delivery)(nil), "pb.Delivery") proto.RegisterType((*Delivery)(nil), "pb.Delivery")
proto.RegisterType((*Receipt)(nil), "pb.Receipt")
} }
func init() { proto.RegisterFile("pushsync.proto", fileDescriptor_723cf31bfc02bfd6) } func init() { proto.RegisterFile("pushsync.proto", fileDescriptor_723cf31bfc02bfd6) }
var fileDescriptor_723cf31bfc02bfd6 = []byte{ var fileDescriptor_723cf31bfc02bfd6 = []byte{
// 122 bytes of a gzipped FileDescriptorProto // 135 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0x2d, 0xce, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0x2d, 0xce,
0x28, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0xb2, 0x28, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0xb2,
0xe0, 0xe2, 0x70, 0x49, 0xcd, 0xc9, 0x2c, 0x4b, 0x2d, 0xaa, 0x14, 0x92, 0xe0, 0x62, 0x77, 0x4c, 0xe0, 0xe2, 0x70, 0x49, 0xcd, 0xc9, 0x2c, 0x4b, 0x2d, 0xaa, 0x14, 0x92, 0xe0, 0x62, 0x77, 0x4c,
0x49, 0x29, 0x4a, 0x2d, 0x2e, 0x96, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x82, 0x71, 0x85, 0x84, 0x49, 0x29, 0x4a, 0x2d, 0x2e, 0x96, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x82, 0x71, 0x85, 0x84,
0xb8, 0x58, 0x5c, 0x12, 0x4b, 0x12, 0x25, 0x98, 0xc0, 0xc2, 0x60, 0xb6, 0x93, 0xc4, 0x89, 0x47, 0xb8, 0x58, 0x5c, 0x12, 0x4b, 0x12, 0x25, 0x98, 0xc0, 0xc2, 0x60, 0xb6, 0x92, 0x32, 0x17, 0x7b,
0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0x50, 0x6a, 0x72, 0x6a, 0x66, 0x41, 0x09, 0x6e, 0x8d, 0x4e, 0x12, 0x27, 0x1e, 0xc9, 0x31, 0x5e,
0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x8d, 0x37, 0x06, 0x04, 0x00, 0x00, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, 0x31,
0xff, 0xff, 0x79, 0xb1, 0x76, 0x9e, 0x70, 0x00, 0x00, 0x00, 0xdc, 0x78, 0x2c, 0xc7, 0x90, 0xc4, 0x06, 0x76, 0x83, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x9d,
0x84, 0x4d, 0xb6, 0x95, 0x00, 0x00, 0x00,
} }
func (m *Delivery) Marshal() (dAtA []byte, err error) { func (m *Delivery) Marshal() (dAtA []byte, err error) {
...@@ -129,6 +175,36 @@ func (m *Delivery) MarshalToSizedBuffer(dAtA []byte) (int, error) { ...@@ -129,6 +175,36 @@ func (m *Delivery) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *Receipt) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Receipt) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Receipt) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Address) > 0 {
i -= len(m.Address)
copy(dAtA[i:], m.Address)
i = encodeVarintPushsync(dAtA, i, uint64(len(m.Address)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintPushsync(dAtA []byte, offset int, v uint64) int { func encodeVarintPushsync(dAtA []byte, offset int, v uint64) int {
offset -= sovPushsync(v) offset -= sovPushsync(v)
base := offset base := offset
...@@ -157,6 +233,19 @@ func (m *Delivery) Size() (n int) { ...@@ -157,6 +233,19 @@ func (m *Delivery) Size() (n int) {
return n return n
} }
func (m *Receipt) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Address)
if l > 0 {
n += 1 + l + sovPushsync(uint64(l))
}
return n
}
func sovPushsync(x uint64) (n int) { func sovPushsync(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7 return (math_bits.Len64(x|1) + 6) / 7
} }
...@@ -284,6 +373,93 @@ func (m *Delivery) Unmarshal(dAtA []byte) error { ...@@ -284,6 +373,93 @@ func (m *Delivery) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *Receipt) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPushsync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Receipt: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Receipt: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Address", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPushsync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthPushsync
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthPushsync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Address = append(m.Address[:0], dAtA[iNdEx:postIndex]...)
if m.Address == nil {
m.Address = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPushsync(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPushsync
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPushsync
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPushsync(dAtA []byte) (n int, err error) { func skipPushsync(dAtA []byte) (n int, err error) {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0
......
...@@ -10,3 +10,7 @@ message Delivery { ...@@ -10,3 +10,7 @@ message Delivery {
bytes Address = 1; bytes Address = 1;
bytes Data = 2; bytes Data = 2;
} }
message Receipt {
bytes Address = 1;
}
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"time" "time"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
...@@ -30,9 +29,10 @@ type PushSync struct { ...@@ -30,9 +29,10 @@ type PushSync struct {
streamer p2p.Streamer streamer p2p.Streamer
storer storage.Storer storer storage.Storer
peerSuggester topology.ClosestPeerer peerSuggester topology.ClosestPeerer
quit chan struct{}
logger logging.Logger logger logging.Logger
metrics metrics metrics metrics
quit chan struct{}
chunksWorkerQuitC chan struct{}
} }
type Options struct { type Options struct {
...@@ -42,7 +42,10 @@ type Options struct { ...@@ -42,7 +42,10 @@ type Options struct {
Logger logging.Logger Logger logging.Logger
} }
var retryInterval = 10 * time.Second // time interval between retries var (
retryInterval = 10 * time.Second // time interval between retries
timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
)
func New(o Options) *PushSync { func New(o Options) *PushSync {
ps := &PushSync{ ps := &PushSync{
...@@ -52,10 +55,10 @@ func New(o Options) *PushSync { ...@@ -52,10 +55,10 @@ func New(o Options) *PushSync {
logger: o.Logger, logger: o.Logger,
metrics: newMetrics(), metrics: newMetrics(),
quit: make(chan struct{}), quit: make(chan struct{}),
chunksWorkerQuitC: make(chan struct{}),
} }
go ps.chunksWorker() go ps.chunksWorker()
return ps return ps
} }
...@@ -74,62 +77,164 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec { ...@@ -74,62 +77,164 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec {
func (ps *PushSync) Close() error { func (ps *PushSync) Close() error {
close(ps.quit) close(ps.quit)
// Wait for chunks worker to finish
select {
case <-ps.chunksWorkerQuitC:
case <-time.After(3 * time.Second):
}
return nil return nil
} }
// handler handles chunk delivery from other node and inserts it to localstore. // handler handles chunk delivery from other node and forwards to its destination node.
// it also sends this chunk to the closest peer if one exists. // If the current node is the destination, it stores in the local store and sends a receipt.
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error { func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
_, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close() defer stream.Close()
var ch pb.Delivery // Get the delivery
chunk, err := ps.getChunkDelivery(r)
if err != nil {
return fmt.Errorf("chunk delivery: %w", err)
}
if err := r.ReadMsg(&ch); err != nil { // Select the closest peer to forward the chunk
if err == io.EOF { peer, err := ps.peerSuggester.ClosestPeer(chunk.Address())
if err != nil {
// If i am the closest peer then store the chunk and send receipt
if errors.Is(err, topology.ErrWantSelf) {
// Store the chunk in the local store
_, err := ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
return fmt.Errorf("chunk store: %w", err)
}
ps.metrics.TotalChunksStoredInDB.Inc()
// Send a receipt immediately once the storage of the chunk is successfull
receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
err = ps.sendReceipt(w, receipt)
if err != nil {
return fmt.Errorf("send receipt: %w", err)
}
return nil return nil
} }
return err return err
} }
// create chunk and store it in the local store // This is a special situation in that the other peer thinks thats we are the closest node
addr := swarm.NewAddress(ch.Address) // and we think that the sending peer
chunk := swarm.NewChunk(addr, ch.Data) if p.Address.Equal(peer) {
// Store the chunk in the local store
_, err := ps.storer.Put(ctx, storage.ModePutSync, chunk) _, err := ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil { if err != nil {
return err return fmt.Errorf("chunk store: %w", err)
} }
ps.metrics.TotalChunksStoredInDB.Inc()
// push this to the closest node too // Send a receipt immediately once the storage of the chunk is successfull
peer, err := ps.peerSuggester.ClosestPeer(addr) receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
return ps.sendReceipt(w, receipt)
}
// Forward chunk to closest peer
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
if errors.Is(err, topology.ErrWantSelf) { return fmt.Errorf("new stream: %w", err)
// i'm the closest - nothing to do }
defer streamer.Close()
wc, rc := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(wc, chunk); err != nil {
return fmt.Errorf("forward chunk: %w", err)
}
receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(rc)
if err != nil {
return fmt.Errorf("receive receipt: %w", err)
}
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
// Check if the receipt is valid
if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) {
ps.metrics.InvalidReceiptReceived.Inc()
return errors.New("invalid receipt")
}
// pass back the received receipt in the previously received stream
err = ps.sendReceipt(w, &receipt)
if err != nil {
return fmt.Errorf("send receipt: %w", err)
}
ps.metrics.ReceiptsSentCounter.Inc()
return nil return nil
}
func (ps *PushSync) getChunkDelivery(r protobuf.Reader) (chunk swarm.Chunk, err error) {
var ch pb.Delivery
if err = r.ReadMsg(&ch); err != nil {
ps.metrics.ReceivedChunkErrorCounter.Inc()
return nil, err
} }
ps.metrics.ChunksSentCounter.Inc()
// create chunk
addr := swarm.NewAddress(ch.Address)
chunk = swarm.NewChunk(addr, ch.Data)
return chunk, nil
}
func (ps *PushSync) sendChunkDelivery(w protobuf.Writer, chunk swarm.Chunk) (err error) {
startTimer := time.Now()
if err = w.WriteMsgWithTimeout(timeToWaitForReceipt, &pb.Delivery{
Address: chunk.Address().Bytes(),
Data: chunk.Data(),
}); err != nil {
ps.metrics.SendChunkErrorCounter.Inc()
return err return err
} }
if err := ps.sendChunkMsg(ctx, peer, chunk); err != nil { ps.metrics.SendChunkTimer.Observe(time.Since(startTimer).Seconds())
ps.metrics.SendChunkErrorCounter.Inc() ps.metrics.ChunksSentCounter.Inc()
return nil
}
func (ps *PushSync) sendReceipt(w protobuf.Writer, receipt *pb.Receipt) (err error) {
if err := w.WriteMsg(receipt); err != nil {
ps.metrics.SendReceiptErrorCounter.Inc()
return err return err
} }
ps.metrics.ReceiptsSentCounter.Inc()
return nil
}
return ps.storer.Set(ctx, storage.ModeSetSyncPush, chunk.Address()) func (ps *PushSync) receiveReceipt(r protobuf.Reader) (receipt pb.Receipt, err error) {
if err := r.ReadMsg(&receipt); err != nil {
ps.metrics.ReceiveReceiptErrorCounter.Inc()
return receipt, err
}
ps.metrics.ReceiptsReceivedCounter.Inc()
return receipt, nil
} }
// chunksWorker polls localstore sends chunks to peers. // chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex )
// and pushes them to the closest peer and get a receipt.
func (ps *PushSync) chunksWorker() { func (ps *PushSync) chunksWorker() {
var ( var chunks <-chan swarm.Chunk
chunks <-chan swarm.Chunk var unsubscribe func()
unsubscribe func()
ctx = context.Background()
)
// timer, initially set to 0 to fall through select case on timer.C for initialisation // timer, initially set to 0 to fall through select case on timer.C for initialisation
timer := time.NewTimer(0) timer := time.NewTimer(0)
defer timer.Stop() defer timer.Stop()
defer close(ps.chunksWorkerQuitC)
chunksInBatch := -1 chunksInBatch := -1
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ps.quit
cancel()
}()
for { for {
select { select {
// handle incoming chunks // handle incoming chunks
...@@ -146,25 +251,21 @@ func (ps *PushSync) chunksWorker() { ...@@ -146,25 +251,21 @@ func (ps *PushSync) chunksWorker() {
} }
chunksInBatch++ chunksInBatch++
ps.metrics.SendChunkCounter.Inc() ps.metrics.TotalChunksToBeSentCounter.Inc()
peer, err := ps.peerSuggester.ClosestPeer(ch.Address()) peer, err := ps.peerSuggester.ClosestPeer(ch.Address())
if err != nil { if err != nil {
if errors.Is(err, topology.ErrWantSelf) { if errors.Is(err, topology.ErrWantSelf) {
if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil { if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil {
ps.logger.Error("pushsync: error setting chunks to synced", "err", err) ps.logger.Errorf("pushsync: error setting chunks to synced: %v", err)
} }
continue continue
} }
} }
if err := ps.sendChunkMsg(ctx, peer, ch); err != nil { // TODO: make this function as a go routine and process several chunks in parallel
ps.metrics.SendChunkErrorCounter.Inc() if err := ps.SendChunkAndReceiveReceipt(ctx, peer, ch); err != nil {
ps.logger.Errorf("error sending chunk", "addr", ch.Address().String(), "err", err) ps.logger.Errorf("pushsync: error while sending chunk or receiving receipt: %v", err)
} continue
// set chunk status to synced, insert to db GC index
if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil {
ps.logger.Error("pushsync: error setting chunks to synced", "err", err)
} }
// retry interval timer triggers starting from new // retry interval timer triggers starting from new
...@@ -182,9 +283,7 @@ func (ps *PushSync) chunksWorker() { ...@@ -182,9 +283,7 @@ func (ps *PushSync) chunksWorker() {
// reset timer to go off after retryInterval // reset timer to go off after retryInterval
timer.Reset(retryInterval) timer.Reset(retryInterval)
ps.metrics.MarkAndSweepTimer.Observe(time.Since(startTime).Seconds())
timeSpent := float64(time.Since(startTime))
ps.metrics.MarkAndSweepTimer.Add(timeSpent)
case <-ps.quit: case <-ps.quit:
if unsubscribe != nil { if unsubscribe != nil {
...@@ -195,25 +294,41 @@ func (ps *PushSync) chunksWorker() { ...@@ -195,25 +294,41 @@ func (ps *PushSync) chunksWorker() {
} }
} }
// sendChunkMsg sends a chunk to a given peer. // sendChunkAndReceiveReceipt sends chunk to a given peer
func (ps *PushSync) sendChunkMsg(ctx context.Context, peer swarm.Address, ch swarm.Chunk) error { // by opening a stream. It then waits for a receipt from that peer.
startTimer := time.Now() // Once the receipt is received within a given time frame it marks that this chunk
// as synced in the localstore.
func (ps *PushSync) SendChunkAndReceiveReceipt(ctx context.Context, peer swarm.Address, ch swarm.Chunk) error {
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return fmt.Errorf("new stream: %w", err) return fmt.Errorf("new stream: %w", err)
} }
defer streamer.Close() defer streamer.Close()
w, _ := protobuf.NewWriterAndReader(streamer) w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(w, ch); err != nil {
return fmt.Errorf("chunk deliver: %w", err)
}
receiptRTTTimer := time.Now()
if err := w.WriteMsg(&pb.Delivery{ receipt, err := ps.receiveReceipt(r)
Address: ch.Address().Bytes(), if err != nil {
Data: ch.Data(), return fmt.Errorf("receive receipt: %w", err)
}); err != nil {
return err
} }
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
timeSpent := float64(time.Since(startTimer)) // Check if the receipt is valid
ps.metrics.SendChunkTimer.Add(timeSpent) if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
return err ps.metrics.InvalidReceiptReceived.Inc()
return errors.New("invalid receipt")
}
// set chunk status to synced, insert to db GC index
if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil {
ps.metrics.ErrorSettingChunkToSynced.Inc()
return fmt.Errorf("chunk store: %w", err)
}
ps.metrics.TotalChunksSynced.Inc()
return nil
} }
...@@ -8,223 +8,204 @@ import ( ...@@ -8,223 +8,204 @@ import (
"bytes" "bytes"
"context" "context"
"io/ioutil" "io/ioutil"
"sync"
"testing" "testing"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
"github.com/ethersphere/bee/pkg/localstore" "github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest" "github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
) )
// TestSendToClosest tests that a chunk that is uploaded to localstore is sent to the appropriate peer. // TestSendChunkAndGetReceipt inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node
func TestSendToClosest(t *testing.T) { // and expects a receipt. The message are intercepted in the outgoing stream to check for correctness.
logger := logging.New(ioutil.Discard, 0) func TestSendChunkAndReceiveReceipt(t *testing.T) {
// chunk data to upload // chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000") chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234") chunkData := []byte("1234")
chunk := swarm.NewChunk(chunkAddress, chunkData)
// create a pivot node and a mocked closest node // create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1 closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1
// Create a mock connectivity between the peers // peer is the node responding to the chunk receipt message
mockTopology := mock.NewTopologyDriver(mock.WithClosestPeer(closestPeer)) // mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
storer, err := localstore.New("", pivotNode.Bytes(), nil, logger) defer storerPeer.Close()
if err != nil { defer psPeer.Close()
t.Fatal(err)
}
// setup the stream recorder to record stream data
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols(psPeer.Protocol()),
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc { streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return func(context.Context, p2p.Peer, p2p.Stream) error { return f
// dont call any handlers
return nil
}
}), }),
) )
// instantiate a pushsync instance // pivot node needs the streamer since the chunk is intercepted by
ps := pushsync.New(pushsync.Options{ // the chunk worker, then gets sent by opening a new stream
Streamer: recorder, psPivot, storerPivot := createPushSyncNode(t, pivotNode, recorder, mock.WithClosestPeer(closestPeer))
Logger: logger, defer storerPivot.Close()
ClosestPeerer: mockTopology,
Storer: storer,
})
defer ps.Close()
recorder.SetProtocols(ps.Protocol())
// upload the chunk to the pivot node // upload the chunk to the pivot node
_, err = storer.Put(context.Background(), storage.ModePutUpload, swarm.NewChunk(chunkAddress, chunkData)) _, err := storerPivot.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
records := recorder.WaitRecords(t, closestPeer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5) // this intercepts the outgoing delivery message
messages, err := protobuf.ReadMessages( waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, chunkData)
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil {
t.Fatal(err)
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
delivery := messages[0].(*pb.Delivery)
chunk := swarm.NewChunk(swarm.NewAddress(delivery.Address), delivery.Data)
if !bytes.Equal(chunk.Address().Bytes(), chunkAddress.Bytes()) { // this intercepts the incoming receipt message
t.Fatalf("chunk address mismatch") waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, nil)
}
if !bytes.Equal(chunk.Data(), chunkData) { // Close the pushsync and then the DB
t.Fatalf("chunk data mismatch") psPivot.Close()
}
}
// TestForwardChunk tests that when a closer node exists within the topology, we forward a received }
// chunk to it.
func TestForwardChunk(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
// sends back a receipt. This is tested by intercepting the incoming stream for proper messages.
// It also sends the chunk to the closest peerand receives a receipt.
//
// Chunk moves from TriggerPeer -> PivotPeer -> ClosestPeer
//
func TestHandler(t *testing.T) {
// chunk data to upload // chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000") chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234") chunkData := []byte("1234")
chunk := swarm.NewChunk(chunkAddress, chunkData)
// create a pivot node and a closest mocked closer node address // create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // pivot is 0000 pivotPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
// Create a mock connectivity driver
mockTopology := mock.NewTopologyDriver(mock.WithClosestPeer(closestPeer))
storer, err := localstore.New("", pivotNode.Bytes(), nil, logger)
if err != nil {
t.Fatal(err)
}
targetCalled := false // Create the closest peer
var mtx sync.Mutex psClosestPeer, closestStorerPeerDB := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer closestStorerPeerDB.Close()
// setup the stream recorder to record stream data closestRecorder := streamtest.New(
recorder := streamtest.New( streamtest.WithProtocols(psClosestPeer.Protocol()),
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc { streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
// this is a custom middleware that is needed because of the design of
// the recorder. since we want to test only one unit, but the first message
// is supposedly coming from another node, we don't want to execute the handler
// when the peer address is the peer of `closestPeer`, since this will create an
// unnecessary entry in the recorder
return func(ctx context.Context, p p2p.Peer, s p2p.Stream) error {
if p.Address.Equal(closestPeer) {
mtx.Lock()
defer mtx.Unlock()
if targetCalled {
t.Fatal("target called more than once")
}
targetCalled = true
return nil
}
return f(ctx, p, s)
}
}), }),
) )
ps := pushsync.New(pushsync.Options{ // creating the pivot peer
Streamer: recorder, psPivot, storerPivotDB := createPushSyncNode(t, pivotPeer, closestRecorder, mock.WithClosestPeer(closestPeer))
Logger: logger, defer storerPivotDB.Close()
ClosestPeerer: mockTopology,
Storer: storer,
})
defer ps.Close()
recorder.SetProtocols(ps.Protocol()) pivotRecorder := streamtest.New(
streamtest.WithProtocols(psPivot.Protocol()),
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
}),
)
stream, err := recorder.NewStream(context.Background(), pivotNode, nil, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName) // Creating the trigger peer
if err != nil { psTriggerPeer, triggerStorerDB := createPushSyncNode(t, triggerPeer, pivotRecorder, mock.WithClosestPeer(pivotPeer))
t.Fatal(err) defer triggerStorerDB.Close()
}
defer stream.Close()
w := protobuf.NewWriter(stream)
// this triggers the handler of the pivot with a delivery stream // upload the chunk to the trigger node DB which triggers the chain reaction of sending this chunk
err = w.WriteMsg(&pb.Delivery{ // from trigger peer to pivot peer to closest peer.
Address: chunkAddress.Bytes(), _, err := triggerStorerDB.Put(context.Background(), storage.ModePutUpload, chunk)
Data: chunkData,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_ = recorder.WaitRecords(t, closestPeer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5) // In pivot peer, intercept the incoming delivery chunk from the trigger peer and check for correctness
mtx.Lock() waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunkAddress, chunkData)
defer mtx.Unlock()
if !targetCalled {
t.Fatal("target not called")
}
}
// TestNoForwardChunk tests that the closest node to a chunk doesn't forward it to other nodes. // Pivot peer will forward the chunk to its closest peer. Intercept the incoming stream from pivot node and check
func TestNoForwardChunk(t *testing.T) { // for the correctness of the chunk
logger := logging.New(ioutil.Discard, 0) waitOnRecordAndTest(t, closestPeer, closestRecorder, chunkAddress, chunkData)
// chunk data to upload // Similarly intercept the same incoming stream to see if the closest peer is sending a proper receipt
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000") // binary 0111 waitOnRecordAndTest(t, closestPeer, closestRecorder, chunkAddress, nil)
chunkData := []byte("1234")
// create a pivot node and a cluster of nodes // In the received stream, check if a receipt is sent from pivot peer and check for its correctness.
pivotNode := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // pivot is 0110 waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunkAddress, nil)
// Create a mock connectivity // close push sync before the storers are closed to avoid data race
mockTopology := mock.NewTopologyDriver(mock.WithClosestPeerErr(topology.ErrWantSelf)) psClosestPeer.Close()
psPivot.Close()
psTriggerPeer.Close()
}
storer, err := localstore.New("", pivotNode.Bytes(), nil, logger) func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB) {
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
recorder := streamtest.New( mockTopology := mock.NewTopologyDriver(mockOpts...)
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
}),
)
ps := pushsync.New(pushsync.Options{ ps := pushsync.New(pushsync.Options{
Streamer: recorder, Streamer: recorder,
Logger: logger,
ClosestPeerer: mockTopology,
Storer: storer, Storer: storer,
ClosestPeerer: mockTopology,
Logger: logger,
}) })
defer ps.Close()
recorder.SetProtocols(ps.Protocol()) return ps, storer
}
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
t.Helper()
records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)
stream, err := recorder.NewStream(context.Background(), pivotNode, nil, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName) if data != nil {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer stream.Close() if messages == nil {
w := protobuf.NewWriter(stream) t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
delivery := messages[0].(*pb.Delivery)
// this triggers the handler of the pivot with a delivery stream if !bytes.Equal(delivery.Address, add.Bytes()) {
err = w.WriteMsg(&pb.Delivery{ t.Fatalf("chunk address mismatch")
Address: chunkAddress.Bytes(), }
Data: chunkData,
}) if !bytes.Equal(delivery.Data, data) {
t.Fatalf("chunk data mismatch")
}
} else {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Receipt) },
)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
receipt := messages[0].(*pb.Receipt)
receiptAddress := swarm.NewAddress(receipt.Address)
if !receiptAddress.Equal(add) {
t.Fatalf("receipt address mismatch")
}
}
_ = recorder.WaitRecords(t, pivotNode, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment