Commit 0fe8681d authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

accounting in push sync (#588)

parent 0e223cb1
......@@ -276,7 +276,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
}
retrieve.SetStorer(ns)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, psss.TryUnwrap, logger)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, psss.TryUnwrap, logger, acc, accounting.NewFixedPricer(address, 10))
// set the pushSyncer in the PSS
psss.WithPushSyncer(pushSyncProtocol)
......
......@@ -10,6 +10,7 @@ import (
"fmt"
"time"
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
......@@ -41,12 +42,14 @@ type PushSync struct {
tagg *tags.Tags
deliveryCallback func(context.Context, swarm.Chunk) error // callback func to be invoked to deliver chunks to PSS
logger logging.Logger
accounting accounting.Interface
pricer accounting.Pricer
metrics metrics
}
var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
func New(streamer p2p.Streamer, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, deliveryCallback func(context.Context, swarm.Chunk) error, logger logging.Logger) *PushSync {
func New(streamer p2p.Streamer, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, deliveryCallback func(context.Context, swarm.Chunk) error, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer) *PushSync {
ps := &PushSync{
streamer: streamer,
storer: storer,
......@@ -54,6 +57,8 @@ func New(streamer p2p.Streamer, storer storage.Putter, closestPeerer topology.Cl
tagg: tagger,
deliveryCallback: deliveryCallback,
logger: logger,
accounting: accounting,
pricer: pricer,
metrics: newMetrics(),
}
return ps
......@@ -101,11 +106,19 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
// This is a special situation in that the other peer thinks thats we are the closest node
// and we think that the sending peer
// and we think that the sending peer is the closest
if p.Address.Equal(peer) {
return ps.handleDeliveryResponse(ctx, w, p, chunk)
}
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, chunk.Address())
err = ps.accounting.Reserve(peer, receiptPrice)
if err != nil {
return err
}
defer ps.accounting.Release(peer, receiptPrice)
// Forward chunk to closest peer
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
......@@ -137,6 +150,11 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return fmt.Errorf("invalid receipt from peer %s", peer.String())
}
err = ps.accounting.Credit(peer, receiptPrice)
if err != nil {
return err
}
// pass back the received receipt in the previously received stream
err = ps.sendReceipt(w, &receipt)
if err != nil {
......@@ -144,7 +162,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
ps.metrics.ReceiptsSentCounter.Inc()
return nil
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
}
func (ps *PushSync) getChunkDelivery(r protobuf.Reader) (chunk swarm.Chunk, err error) {
......@@ -214,6 +232,14 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
return nil, fmt.Errorf("closest peer: %w", err)
}
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
err = ps.accounting.Reserve(peer, receiptPrice)
if err != nil {
return nil, err
}
defer ps.accounting.Release(peer, receiptPrice)
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return nil, fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
......@@ -247,6 +273,11 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
return nil, fmt.Errorf("invalid receipt. peer %s", peer.String())
}
err = ps.accounting.Credit(peer, receiptPrice)
if err != nil {
return nil, err
}
rec := &Receipt{
Address: swarm.NewAddress(receipt.Address),
}
......@@ -276,6 +307,12 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write
if err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
err = ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
if err != nil {
return err
}
// since all PSS messages comes through push sync, deliver them here if this node is the destination
return ps.deliverToPSS(ctx, chunk)
}
......@@ -11,6 +11,8 @@ import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/accounting"
accountingmock "github.com/ethersphere/bee/pkg/accounting/mock"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
......@@ -23,6 +25,10 @@ import (
"github.com/ethersphere/bee/pkg/topology/mock"
)
const (
fixedPrice = uint64(10)
)
// TestSendChunkAndGetReceipt inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node
// and expects a receipt. The message are intercepted in the outgoing stream to check for correctness.
func TestSendChunkAndReceiveReceipt(t *testing.T) {
......@@ -37,14 +43,14 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) {
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer, _ := createPushSyncNode(t, closestPeer, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()))
// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, _ := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close()
// Trigger the sending of chunk to the closest node
......@@ -63,6 +69,23 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) {
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, nil)
balance, err := pivotAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}
balance, err = peerAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer. want %d got %d", int64(fixedPrice), balance)
}
}
// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
......@@ -78,14 +101,14 @@ func TestPushChunkToClosest(t *testing.T) {
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer, _ := createPushSyncNode(t, closestPeer, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()))
// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, pivotTags := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close()
ta, err := pivotTags.Create("test", 1, false)
......@@ -127,6 +150,23 @@ func TestPushChunkToClosest(t *testing.T) {
t.Fatalf("tags error")
}
balance, err := pivotAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}
balance, err = peerAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer. want %d got %d", int64(fixedPrice), balance)
}
}
// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
......@@ -154,19 +194,19 @@ func TestHandler(t *testing.T) {
}
// Create the closest peer
psClosestPeer, closestStorerPeerDB, _ := createPushSyncNode(t, closestPeer, nil, pssDeliver, mock.WithClosestPeerErr(topology.ErrWantSelf))
psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, nil, pssDeliver, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer closestStorerPeerDB.Close()
closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()))
// creating the pivot peer
psPivot, storerPivotDB, _ := createPushSyncNode(t, pivotPeer, closestRecorder, nil, mock.WithClosestPeer(closestPeer))
psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, closestRecorder, nil, mock.WithClosestPeer(closestPeer))
defer storerPivotDB.Close()
pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()))
// Creating the trigger peer
psTriggerPeer, triggerStorerDB, _ := createPushSyncNode(t, triggerPeer, pivotRecorder, nil, mock.WithClosestPeer(pivotPeer))
psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, pivotRecorder, nil, mock.WithClosestPeer(pivotPeer))
defer triggerStorerDB.Close()
receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
......@@ -198,9 +238,46 @@ func TestHandler(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Fatal("recovery hook was not called")
}
balance, err := triggerAccounting.Balance(pivotPeer)
if err != nil {
t.Fatal(err)
}
if balance != -int64(fixedPrice) {
t.Fatalf("unexpected balance on trigger. want %d got %d", -int64(fixedPrice), balance)
}
// we need to check here for pivotPeer instead of triggerPeer because during streamtest the peer in the handler is actually the receiver
balance, err = pivotAccounting.Balance(pivotPeer)
if err != nil {
t.Fatal(err)
}
if balance != int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", int64(fixedPrice), balance)
}
balance, err = pivotAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}
balance, err = closestAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance != int64(fixedPrice) {
t.Fatalf("unexpected balance on closest. want %d got %d", int64(fixedPrice), balance)
}
}
func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, pssDeliver func(context.Context, swarm.Chunk) error, mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB, *tags.Tags) {
func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, pssDeliver func(context.Context, swarm.Chunk) error, mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB, *tags.Tags, accounting.Interface) {
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, logger)
......@@ -210,7 +287,11 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.R
mockTopology := mock.NewTopologyDriver(mockOpts...)
mtag := tags.NewTags()
return pushsync.New(recorder, storer, mockTopology, mtag, pssDeliver, logger), storer, mtag
mockAccounting := accountingmock.NewAccounting()
mockPricer := accountingmock.NewPricer(fixedPrice, fixedPrice)
return pushsync.New(recorder, storer, mockTopology, mtag, pssDeliver, logger, mockAccounting, mockPricer), storer, mtag, mockAccounting
}
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment