Commit 0bc70f21 authored by acud's avatar acud Committed by GitHub

[bee #48] pushsync: initial pushsync implementation (#115)

* pushsync: initial pushsync implementation
Co-authored-by: default avatarZahoor Mohamed <zahoor@ethswarm.org>
parent 338d88b7
......@@ -18,28 +18,30 @@ import (
mockstore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
"github.com/multiformats/go-multiaddr"
"resenje.org/web"
)
type testServerOptions struct {
Overlay swarm.Address
P2P p2p.Service
Storer storage.Storer
Overlay swarm.Address
P2P p2p.Service
Storer storage.Storer
TopologyOpts []mock.Option
}
type testServer struct {
Client *http.Client
Addressbook addressbook.GetPutter
TopologyDriver *mock.TopologyDriver
TopologyDriver topology.Driver
Cleanup func()
}
func newTestServer(t *testing.T, o testServerOptions) *testServer {
statestore := mockstore.NewStateStore()
addressbook := addressbook.New(statestore)
topologyDriver := mock.NewTopologyDriver()
topologyDriver := mock.NewTopologyDriver(o.TopologyOpts...)
s := debugapi.New(debugapi.Options{
Overlay: o.Overlay,
......
......@@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
topmock "github.com/ethersphere/bee/pkg/topology/mock"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -73,9 +74,9 @@ func TestConnect(t *testing.T) {
disconnectCalled = true
return nil
})),
TopologyOpts: []topmock.Option{topmock.WithAddPeerErr(testErr)},
})
defer testServer.Cleanup()
testServer.TopologyDriver.SetAddPeerErr(testErr)
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
......
......@@ -9,6 +9,7 @@ import (
"errors"
"io"
"sync"
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p"
......@@ -52,6 +53,10 @@ func New(opts ...Option) *Recorder {
return r
}
func (r *Recorder) SetProtocols(protocols ...p2p.ProtocolSpec) {
r.protocols = append(r.protocols, protocols...)
}
func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
recordIn := newRecord()
recordOut := newRecord()
......@@ -111,6 +116,32 @@ func (r *Recorder) Records(addr swarm.Address, protocolName, protocolVersio, str
return records, nil
}
// WaitRecords waits for some time for records to come into the recorder. If msgs is 0, the timeoutSec period is waited to verify
// that _no_ messages arrive during this time period.
func (r *Recorder) WaitRecords(t *testing.T, addr swarm.Address, proto, version, stream string, msgs int, timeoutSec int) []*Record {
t.Helper()
wait := 10 * time.Millisecond
iters := int((time.Duration(timeoutSec) * time.Second) / wait)
for i := 0; i < iters; i++ {
recs, _ := r.Records(addr, proto, version, stream)
if l := len(recs); l > msgs {
t.Fatalf("too many records. want %d got %d", msgs, l)
} else if msgs > 0 && l == msgs {
return recs
}
// we can be here if msgs == 0 && l == 0
// or msgs = x && l < x, both cases are fine
// and we should continue waiting
time.Sleep(wait)
}
if msgs > 0 {
t.Fatal("timed out while waiting for records")
}
return nil
}
type Record struct {
in *record
out *record
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pushsync
var (
ProtocolName = protocolName
ProtocolVersion = protocolVersion
StreamName = streamName
)
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pushsync
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
// all metrics fields must be exported
// to be able to return them by Metrics()
// using reflection
SendChunkCounter prometheus.Counter
SendChunkTimer prometheus.Counter
SendChunkErrorCounter prometheus.Counter
MarkAndSweepTimer prometheus.Counter
ChunksInBatch prometheus.Gauge
}
func newMetrics() metrics {
subsystem := "pushsync"
return metrics{
SendChunkCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "send_chunk",
Help: "Total chunks to be sent.",
}),
SendChunkTimer: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "send_chunk_time_taken",
Help: "Total time taken to send a chunk.",
}),
SendChunkErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "send_chunk_error",
Help: "Total no of time error received while sending chunk.",
}),
MarkAndSweepTimer: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "mark_and_sweep_time",
Help: "Total time spent in mark and sweep.",
}),
ChunksInBatch: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunks_in_batch",
Help: "Chunks in batch at a given time.",
}),
}
}
func (s *PushSync) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gofast_out=. pushsync.proto"
package pb
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pushsync.proto
package pb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Delivery struct {
Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Delivery) Reset() { *m = Delivery{} }
func (m *Delivery) String() string { return proto.CompactTextString(m) }
func (*Delivery) ProtoMessage() {}
func (*Delivery) Descriptor() ([]byte, []int) {
return fileDescriptor_723cf31bfc02bfd6, []int{0}
}
func (m *Delivery) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Delivery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Delivery.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Delivery) XXX_Merge(src proto.Message) {
xxx_messageInfo_Delivery.Merge(m, src)
}
func (m *Delivery) XXX_Size() int {
return m.Size()
}
func (m *Delivery) XXX_DiscardUnknown() {
xxx_messageInfo_Delivery.DiscardUnknown(m)
}
var xxx_messageInfo_Delivery proto.InternalMessageInfo
func (m *Delivery) GetAddress() []byte {
if m != nil {
return m.Address
}
return nil
}
func (m *Delivery) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func init() {
proto.RegisterType((*Delivery)(nil), "pb.Delivery")
}
func init() { proto.RegisterFile("pushsync.proto", fileDescriptor_723cf31bfc02bfd6) }
var fileDescriptor_723cf31bfc02bfd6 = []byte{
// 114 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0x2d, 0xce,
0x28, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0xb2,
0xe0, 0xe2, 0x70, 0x49, 0xcd, 0xc9, 0x2c, 0x4b, 0x2d, 0xaa, 0x14, 0x92, 0xe0, 0x62, 0x77, 0x4c,
0x49, 0x29, 0x4a, 0x2d, 0x2e, 0x96, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x82, 0x71, 0x85, 0x84,
0xb8, 0x58, 0x5c, 0x12, 0x4b, 0x12, 0x25, 0x98, 0xc0, 0xc2, 0x60, 0xb6, 0x93, 0xc0, 0x89, 0x47,
0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe3, 0xb1, 0x1c, 0x43, 0x12,
0x1b, 0xd8, 0x58, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3f, 0x80, 0xe2, 0x4e, 0x68, 0x00,
0x00, 0x00,
}
func (m *Delivery) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Delivery) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Delivery) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Data) > 0 {
i -= len(m.Data)
copy(dAtA[i:], m.Data)
i = encodeVarintPushsync(dAtA, i, uint64(len(m.Data)))
i--
dAtA[i] = 0x12
}
if len(m.Address) > 0 {
i -= len(m.Address)
copy(dAtA[i:], m.Address)
i = encodeVarintPushsync(dAtA, i, uint64(len(m.Address)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintPushsync(dAtA []byte, offset int, v uint64) int {
offset -= sovPushsync(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Delivery) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Address)
if l > 0 {
n += 1 + l + sovPushsync(uint64(l))
}
l = len(m.Data)
if l > 0 {
n += 1 + l + sovPushsync(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovPushsync(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozPushsync(x uint64) (n int) {
return sovPushsync(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Delivery) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPushsync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Delivery: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Delivery: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Address", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPushsync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthPushsync
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthPushsync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Address = append(m.Address[:0], dAtA[iNdEx:postIndex]...)
if m.Address == nil {
m.Address = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPushsync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthPushsync
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthPushsync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
if m.Data == nil {
m.Data = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPushsync(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPushsync
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPushsync
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPushsync(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPushsync
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPushsync
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPushsync
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthPushsync
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupPushsync
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthPushsync
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthPushsync = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPushsync = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupPushsync = fmt.Errorf("proto: unexpected end of group")
)
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
syntax = "proto3";
package pb;
message Delivery {
bytes Address = 1;
bytes Data = 2;
}
......@@ -3,3 +3,217 @@
// license that can be found in the LICENSE file.
package pushsync
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
const (
protocolName = "pushsync"
protocolVersion = "1.0.0"
streamName = "pushsync"
)
type PushSync struct {
streamer p2p.Streamer
storer storage.Storer
peerSuggester topology.ClosestPeerer
quit chan struct{}
logger logging.Logger
metrics metrics
}
type Options struct {
Streamer p2p.Streamer
Storer storage.Storer
ClosestPeerer topology.ClosestPeerer
Logger logging.Logger
}
var retryInterval = 10 * time.Second // time interval between retries
func New(o Options) *PushSync {
ps := &PushSync{
streamer: o.Streamer,
storer: o.Storer,
peerSuggester: o.ClosestPeerer,
logger: o.Logger,
metrics: newMetrics(),
quit: make(chan struct{}),
}
go ps.chunksWorker()
return ps
}
func (s *PushSync) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Handler: s.handler,
},
},
}
}
func (ps *PushSync) Close() error {
close(ps.quit)
return nil
}
// handler handles chunk delivery from other node and inserts it to localstore.
// it also sends this chunk to the closest peer if one exists.
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
_, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
var ch pb.Delivery
if err := r.ReadMsg(&ch); err != nil {
if err == io.EOF {
return nil
}
return err
}
// create chunk and store it in the local store
addr := swarm.NewAddress(ch.Address)
chunk := swarm.NewChunk(addr, ch.Data)
_, err := ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
return err
}
// push this to the closest node too
peer, err := ps.peerSuggester.ClosestPeer(addr)
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
// i'm the closest - nothing to do
return nil
}
return err
}
if err := ps.sendChunkMsg(ctx, peer, chunk); err != nil {
ps.metrics.SendChunkErrorCounter.Inc()
return err
}
return ps.storer.Set(ctx, storage.ModeSetSyncPush, chunk.Address())
}
// chunksWorker polls localstore sends chunks to peers.
func (ps *PushSync) chunksWorker() {
var (
chunks <-chan swarm.Chunk
unsubscribe func()
ctx = context.Background()
)
// timer, initially set to 0 to fall through select case on timer.C for initialisation
timer := time.NewTimer(0)
defer timer.Stop()
chunksInBatch := -1
for {
select {
// handle incoming chunks
case ch, more := <-chunks:
// if no more, set to nil, reset timer to 0 to finalise batch immediately
if !more {
chunks = nil
var dur time.Duration
if chunksInBatch == 0 {
dur = 500 * time.Millisecond
}
timer.Reset(dur)
break
}
chunksInBatch++
ps.metrics.SendChunkCounter.Inc()
peer, err := ps.peerSuggester.ClosestPeer(ch.Address())
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil {
ps.logger.Error("pushsync: error setting chunks to synced", "err", err)
}
continue
}
}
if err := ps.sendChunkMsg(ctx, peer, ch); err != nil {
ps.metrics.SendChunkErrorCounter.Inc()
ps.logger.Errorf("error sending chunk", "addr", ch.Address().String(), "err", err)
}
// set chunk status to synced, insert to db GC index
if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil {
ps.logger.Error("pushsync: error setting chunks to synced", "err", err)
}
// retry interval timer triggers starting from new
case <-timer.C:
// initially timer is set to go off as well as every time we hit the end of push index
startTime := time.Now()
// if subscribe was running, stop it
if unsubscribe != nil {
unsubscribe()
}
// and start iterating on Push index from the beginning
chunks, unsubscribe = ps.storer.SubscribePush(ctx)
// reset timer to go off after retryInterval
timer.Reset(retryInterval)
timeSpent := float64(time.Since(startTime))
ps.metrics.MarkAndSweepTimer.Add(timeSpent)
case <-ps.quit:
if unsubscribe != nil {
unsubscribe()
}
return
}
}
}
// sendChunkMsg sends a chunk to a given peer.
func (ps *PushSync) sendChunkMsg(ctx context.Context, peer swarm.Address, ch swarm.Chunk) error {
startTimer := time.Now()
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
defer streamer.Close()
w, _ := protobuf.NewWriterAndReader(streamer)
if err := w.WriteMsg(&pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
}); err != nil {
return err
}
timeSpent := float64(time.Since(startTimer))
ps.metrics.SendChunkTimer.Add(timeSpent)
return err
}
......@@ -2,4 +2,229 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pushsync
package pushsync_test
import (
"bytes"
"context"
"io/ioutil"
"sync"
"testing"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestSendToClosest tests that a chunk that is uploaded to localstore is sent to the appropriate peer.
func TestSendToClosest(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234")
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1
// Create a mock connectivity between the peers
mockTopology := mock.NewTopologyDriver(mock.WithClosestPeer(closestPeer))
storer, err := localstore.New("", pivotNode.Bytes(), nil, logger)
if err != nil {
t.Fatal(err)
}
// setup the stream recorder to record stream data
recorder := streamtest.New(
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return func(context.Context, p2p.Peer, p2p.Stream) error {
// dont call any handlers
return nil
}
}),
)
// instantiate a pushsync instance
ps := pushsync.New(pushsync.Options{
Streamer: recorder,
Logger: logger,
ClosestPeerer: mockTopology,
Storer: storer,
})
defer ps.Close()
recorder.SetProtocols(ps.Protocol())
// upload the chunk to the pivot node
_, err = storer.Put(context.Background(), storage.ModePutUpload, swarm.NewChunk(chunkAddress, chunkData))
if err != nil {
t.Fatal(err)
}
records := recorder.WaitRecords(t, closestPeer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil {
t.Fatal(err)
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
delivery := messages[0].(*pb.Delivery)
chunk := swarm.NewChunk(swarm.NewAddress(delivery.Address), delivery.Data)
if !bytes.Equal(chunk.Address().Bytes(), chunkAddress.Bytes()) {
t.Fatalf("chunk address mismatch")
}
if !bytes.Equal(chunk.Data(), chunkData) {
t.Fatalf("chunk data mismatch")
}
}
// TestForwardChunk tests that when a closer node exists within the topology, we forward a received
// chunk to it.
func TestForwardChunk(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234")
// create a pivot node and a closest mocked closer node address
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // pivot is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110
// Create a mock connectivity driver
mockTopology := mock.NewTopologyDriver(mock.WithClosestPeer(closestPeer))
storer, err := localstore.New("", pivotNode.Bytes(), nil, logger)
if err != nil {
t.Fatal(err)
}
targetCalled := false
var mtx sync.Mutex
// setup the stream recorder to record stream data
recorder := streamtest.New(
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
// this is a custom middleware that is needed because of the design of
// the recorder. since we want to test only one unit, but the first message
// is supposedly coming from another node, we don't want to execute the handler
// when the peer address is the peer of `closestPeer`, since this will create an
// unnecessary entry in the recorder
return func(ctx context.Context, p p2p.Peer, s p2p.Stream) error {
if p.Address.Equal(closestPeer) {
mtx.Lock()
defer mtx.Unlock()
if targetCalled {
t.Fatal("target called more than once")
}
targetCalled = true
return nil
}
return f(ctx, p, s)
}
}),
)
ps := pushsync.New(pushsync.Options{
Streamer: recorder,
Logger: logger,
ClosestPeerer: mockTopology,
Storer: storer,
})
defer ps.Close()
recorder.SetProtocols(ps.Protocol())
stream, err := recorder.NewStream(context.Background(), pivotNode, nil, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
w := protobuf.NewWriter(stream)
// this triggers the handler of the pivot with a delivery stream
err = w.WriteMsg(&pb.Delivery{
Address: chunkAddress.Bytes(),
Data: chunkData,
})
if err != nil {
t.Fatal(err)
}
_ = recorder.WaitRecords(t, closestPeer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)
mtx.Lock()
defer mtx.Unlock()
if !targetCalled {
t.Fatal("target not called")
}
}
// TestNoForwardChunk tests that the closest node to a chunk doesn't forward it to other nodes.
func TestNoForwardChunk(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000") // binary 0111
chunkData := []byte("1234")
// create a pivot node and a cluster of nodes
pivotNode := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // pivot is 0110
// Create a mock connectivity
mockTopology := mock.NewTopologyDriver(mock.WithClosestPeerErr(topology.ErrWantSelf))
storer, err := localstore.New("", pivotNode.Bytes(), nil, logger)
if err != nil {
t.Fatal(err)
}
recorder := streamtest.New(
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
}),
)
ps := pushsync.New(pushsync.Options{
Streamer: recorder,
Logger: logger,
ClosestPeerer: mockTopology,
Storer: storer,
})
defer ps.Close()
recorder.SetProtocols(ps.Protocol())
stream, err := recorder.NewStream(context.Background(), pivotNode, nil, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
w := protobuf.NewWriter(stream)
// this triggers the handler of the pivot with a delivery stream
err = w.WriteMsg(&pb.Delivery{
Address: chunkAddress.Bytes(),
Data: chunkData,
})
if err != nil {
t.Fatal(err)
}
_ = recorder.WaitRecords(t, pivotNode, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)
}
......@@ -73,6 +73,10 @@ func (m *mockStorer) SubscribePull(ctx context.Context, bin uint8, since uint64,
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func()) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) Close() error {
panic("not implemented") // TODO: Implement
}
......@@ -135,6 +135,7 @@ type Storer interface {
Set(ctx context.Context, mode ModeSet, addrs ...swarm.Address) (err error)
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func())
SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func())
io.Closer
}
......
......@@ -9,23 +9,44 @@ import (
"sync"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
type TopologyDriver struct {
peers []swarm.Address
addPeerErr error
mtx sync.Mutex
type mock struct {
peers []swarm.Address
closestPeer swarm.Address
closestPeerErr error
addPeerErr error
mtx sync.Mutex
}
func NewTopologyDriver() *TopologyDriver {
return &TopologyDriver{}
func WithAddPeerErr(err error) Option {
return optionFunc(func(d *mock) {
d.addPeerErr = err
})
}
func (d *TopologyDriver) SetAddPeerErr(err error) {
d.addPeerErr = err
func WithClosestPeer(addr swarm.Address) Option {
return optionFunc(func(d *mock) {
d.closestPeer = addr
})
}
func (d *TopologyDriver) AddPeer(_ context.Context, addr swarm.Address) error {
func WithClosestPeerErr(err error) Option {
return optionFunc(func(d *mock) {
d.closestPeerErr = err
})
}
func NewTopologyDriver(opts ...Option) topology.Driver {
d := new(mock)
for _, o := range opts {
o.apply(d)
}
return d
}
func (d *mock) AddPeer(_ context.Context, addr swarm.Address) error {
if d.addPeerErr != nil {
return d.addPeerErr
}
......@@ -36,6 +57,18 @@ func (d *TopologyDriver) AddPeer(_ context.Context, addr swarm.Address) error {
return nil
}
func (d *TopologyDriver) Peers() []swarm.Address {
func (d *mock) Peers() []swarm.Address {
return d.peers
}
func (d *mock) ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err error) {
return d.closestPeer, d.closestPeerErr
}
type Option interface {
apply(*mock)
}
type optionFunc func(*mock)
func (f optionFunc) apply(r *mock) { f(r) }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment