Commit 881f8251 authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

add initial accounting for retrieval (#434)

parent 76e4f59a
......@@ -26,25 +26,26 @@ import (
func (c *command) initStartCmd() (err error) {
const (
optionNameDataDir = "data-dir"
optionNameDBCapacity = "db-capacity"
optionNamePassword = "password"
optionNamePasswordFile = "password-file"
optionNameAPIAddr = "api-addr"
optionNameP2PAddr = "p2p-addr"
optionNameNATAddr = "nat-addr"
optionNameP2PWSEnable = "p2p-ws-enable"
optionNameP2PQUICEnable = "p2p-quic-enable"
optionNameDebugAPIEnable = "debug-api-enable"
optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode"
optionNameNetworkID = "network-id"
optionWelcomeMessage = "welcome-message"
optionCORSAllowedOrigins = "cors-allowed-origins"
optionNameTracingEnabled = "tracing-enable"
optionNameTracingEndpoint = "tracing-endpoint"
optionNameTracingServiceName = "tracing-service-name"
optionNameVerbosity = "verbosity"
optionNameDataDir = "data-dir"
optionNameDBCapacity = "db-capacity"
optionNamePassword = "password"
optionNamePasswordFile = "password-file"
optionNameAPIAddr = "api-addr"
optionNameP2PAddr = "p2p-addr"
optionNameNATAddr = "nat-addr"
optionNameP2PWSEnable = "p2p-ws-enable"
optionNameP2PQUICEnable = "p2p-quic-enable"
optionNameDebugAPIEnable = "debug-api-enable"
optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode"
optionNameNetworkID = "network-id"
optionWelcomeMessage = "welcome-message"
optionCORSAllowedOrigins = "cors-allowed-origins"
optionNameTracingEnabled = "tracing-enable"
optionNameTracingEndpoint = "tracing-endpoint"
optionNameTracingServiceName = "tracing-service-name"
optionNameVerbosity = "verbosity"
optionNameDisconnectThreshold = "disconnect-threshold"
)
cmd := &cobra.Command{
......@@ -112,23 +113,24 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
}
b, err := node.NewBee(node.Options{
DataDir: c.config.GetString(optionNameDataDir),
DBCapacity: c.config.GetUint64(optionNameDBCapacity),
Password: password,
APIAddr: c.config.GetString(optionNameAPIAddr),
DebugAPIAddr: debugAPIAddr,
Addr: c.config.GetString(optionNameP2PAddr),
NATAddr: c.config.GetString(optionNameNATAddr),
EnableWS: c.config.GetBool(optionNameP2PWSEnable),
EnableQUIC: c.config.GetBool(optionNameP2PQUICEnable),
NetworkID: c.config.GetUint64(optionNameNetworkID),
WelcomeMessage: c.config.GetString(optionWelcomeMessage),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins),
TracingEnabled: c.config.GetBool(optionNameTracingEnabled),
TracingEndpoint: c.config.GetString(optionNameTracingEndpoint),
TracingServiceName: c.config.GetString(optionNameTracingServiceName),
Logger: logger,
DataDir: c.config.GetString(optionNameDataDir),
DBCapacity: c.config.GetUint64(optionNameDBCapacity),
Password: password,
APIAddr: c.config.GetString(optionNameAPIAddr),
DebugAPIAddr: debugAPIAddr,
Addr: c.config.GetString(optionNameP2PAddr),
NATAddr: c.config.GetString(optionNameNATAddr),
EnableWS: c.config.GetBool(optionNameP2PWSEnable),
EnableQUIC: c.config.GetBool(optionNameP2PQUICEnable),
NetworkID: c.config.GetUint64(optionNameNetworkID),
WelcomeMessage: c.config.GetString(optionWelcomeMessage),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins),
TracingEnabled: c.config.GetBool(optionNameTracingEnabled),
TracingEndpoint: c.config.GetString(optionNameTracingEndpoint),
TracingServiceName: c.config.GetString(optionNameTracingServiceName),
Logger: logger,
DisconnectThreshold: c.config.GetUint64(optionNameDisconnectThreshold),
})
if err != nil {
return err
......@@ -192,6 +194,7 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
cmd.Flags().String(optionNameTracingServiceName, "bee", "service name identifier for tracing")
cmd.Flags().String(optionNameVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace")
cmd.Flags().String(optionWelcomeMessage, "", "send a welcome message string during handshakes")
cmd.Flags().Uint64(optionNameDisconnectThreshold, 100000000000, "threshold in BZZ until which you allow peers to be indebted before disconnecting")
c.root.AddCommand(cmd)
return nil
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package accounting
import (
"errors"
"fmt"
"sync"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var _ Interface = (*Accounting)(nil)
// Interface is the main interface for Accounting
type Interface interface {
// Reserve reserves a portion of the balance for peer
// It returns an error if the operation would risk exceeding the disconnect threshold
// This should be called (always in combination with Release) before a Credit action to prevent overspending in case of concurrent requests
Reserve(peer swarm.Address, price uint64) error
// Release releases reserved funds
Release(peer swarm.Address, price uint64)
// Credit increases the balance the peer has with us (we "pay" the peer)
Credit(peer swarm.Address, price uint64) error
// Debit increases the balance we have with the peer (we get "paid")
Debit(peer swarm.Address, price uint64) error
// Balance returns the current balance for the given peer
Balance(peer swarm.Address) (int64, error)
}
// PeerBalance holds all relevant accounting information for one peer
type PeerBalance struct {
lock sync.Mutex
balance int64 // amount that the peer owes us if positive, our debt if negative
reserved uint64 // amount currently reserved for active peer interaction
}
// Options for accounting
type Options struct {
DisconnectThreshold uint64
Logger logging.Logger
Store storage.StateStorer
}
// Accounting is the main implementation of the accounting interface
type Accounting struct {
balancesMu sync.Mutex // mutex for accessing the balances map
balances map[string]*PeerBalance
logger logging.Logger
store storage.StateStorer
disconnectThreshold uint64 // the debt threshold at which we will disconnect from a peer
}
var (
ErrOverdraft = errors.New("attempted overdraft")
)
// NewAccounting creates a new Accounting instance with the provided options
func NewAccounting(o Options) *Accounting {
return &Accounting{
balances: make(map[string]*PeerBalance),
disconnectThreshold: o.DisconnectThreshold,
logger: o.Logger,
store: o.Store,
}
}
// Reserve reserves a portion of the balance for peer
func (a *Accounting) Reserve(peer swarm.Address, price uint64) error {
balance, err := a.getPeerBalance(peer)
if err != nil {
return err
}
balance.lock.Lock()
defer balance.lock.Unlock()
// the previously reserved balance plus the new price is the maximum amount paid if all current operations are successful
// since we pay this we have to reduce this (positive quantity) from the balance
// the disconnectThreshold is stored as a positive value which is why it must be negated prior to comparison
if balance.freeBalance()-int64(price) < -int64(a.disconnectThreshold) {
return fmt.Errorf("%w with peer %v", ErrOverdraft, peer)
}
balance.reserved += price
return nil
}
// Release releases reserved funds
func (a *Accounting) Release(peer swarm.Address, price uint64) {
balance, err := a.getPeerBalance(peer)
if err != nil {
a.logger.Errorf("cannot release balance for peer: %v", err)
return
}
balance.lock.Lock()
defer balance.lock.Unlock()
if price > balance.reserved {
// If Reserve and Release calls are always paired this should never happen
a.logger.Error("attempting to release more balance than was reserved for peer")
balance.reserved = 0
} else {
balance.reserved -= price
}
}
// Credit increases the amount of credit we have with the given peer (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
balance, err := a.getPeerBalance(peer)
if err != nil {
return err
}
balance.lock.Lock()
defer balance.lock.Unlock()
nextBalance := balance.balance - int64(price)
a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance)
err = a.store.Put(balanceKey(peer), nextBalance)
if err != nil {
return err
}
balance.balance = nextBalance
// TODO: try to initiate payment if payment threshold is reached
// if balance.balance < -int64(a.paymentThreshold) { }
return nil
}
// Debit increases the amount of debt we have with the given peer (and decreases existing credit)
func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
balance, err := a.getPeerBalance(peer)
if err != nil {
return err
}
balance.lock.Lock()
defer balance.lock.Unlock()
nextBalance := balance.balance + int64(price)
a.logger.Tracef("debiting peer %v with price %d, new balance is %d", peer, price, nextBalance)
err = a.store.Put(balanceKey(peer), nextBalance)
if err != nil {
return err
}
balance.balance = nextBalance
if nextBalance >= int64(a.disconnectThreshold) {
// peer to much in debt
return p2p.NewDisconnectError(fmt.Errorf("disconnect threshold exceeded for peer %s", peer.String()))
}
return nil
}
// Balance returns the current balance for the given peer
func (a *Accounting) Balance(peer swarm.Address) (int64, error) {
peerBalance, err := a.getPeerBalance(peer)
if err != nil {
return 0, err
}
return peerBalance.balance, nil
}
// get the balance storage key for the given peer
func balanceKey(peer swarm.Address) string {
return fmt.Sprintf("balance_%s", peer.String())
}
// getPeerBalance gets the PeerBalance for a given peer
// If not in memory it will try to load it from the state store
// if not found it will initialise it with 0 balance
func (a *Accounting) getPeerBalance(peer swarm.Address) (*PeerBalance, error) {
a.balancesMu.Lock()
defer a.balancesMu.Unlock()
peerBalance, ok := a.balances[peer.String()]
if !ok {
// balance not yet in memory, load from state store
var balance int64
err := a.store.Get(balanceKey(peer), &balance)
if err == nil {
peerBalance = &PeerBalance{
balance: balance,
reserved: 0,
}
} else if err == storage.ErrNotFound {
// no prior records in state store
peerBalance = &PeerBalance{
balance: 0,
reserved: 0,
}
} else {
// other error in state store
return nil, err
}
a.balances[peer.String()] = peerBalance
}
return peerBalance, nil
}
func (pb *PeerBalance) freeBalance() int64 {
return pb.balance - int64(pb.reserved)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package accounting_test
import (
"errors"
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
"io/ioutil"
"testing"
)
const (
testDisconnectThreshold = 10000
testPrice = uint64(10)
)
// booking represents an accounting action and the expected result afterwards
type booking struct {
peer swarm.Address
price int64 // Credit if <0, Debit otherwise
expectedBalance int64
}
// TestAccountingAddBalance does several accounting actions and verifies the balance after each steep
func TestAccountingAddBalance(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
})
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
peer2Addr, err := swarm.ParseHexAddress("00112244")
if err != nil {
t.Fatal(err)
}
bookings := []booking{
{peer: peer1Addr, price: 100, expectedBalance: 100},
{peer: peer2Addr, price: 200, expectedBalance: 200},
{peer: peer1Addr, price: 300, expectedBalance: 400},
{peer: peer1Addr, price: -100, expectedBalance: 300},
{peer: peer2Addr, price: -1000, expectedBalance: -800},
}
for i, booking := range bookings {
if booking.price < 0 {
err = acc.Reserve(booking.peer, uint64(booking.price))
if err != nil {
t.Fatal(err)
}
err = acc.Credit(booking.peer, uint64(-booking.price))
if err != nil {
t.Fatal(err)
}
} else {
err = acc.Debit(booking.peer, uint64(booking.price))
if err != nil {
t.Fatal(err)
}
}
balance, err := acc.Balance(booking.peer)
if err != nil {
t.Fatal(err)
}
if balance != booking.expectedBalance {
t.Fatalf("balance for peer %v not as expected after booking %d. got %d, wanted %d", booking.peer.String(), i, balance, booking.expectedBalance)
}
if booking.price < 0 {
acc.Release(booking.peer, uint64(booking.price))
}
}
}
// TestAccountingAdd_persistentBalances tests that balances are actually persisted
// It creates an accounting instance, does some accounting
// Then it creates a new accounting instance with the same store and verifies the balances
func TestAccountingAdd_persistentBalances(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
})
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
peer2Addr, err := swarm.ParseHexAddress("00112244")
if err != nil {
t.Fatal(err)
}
peer1DebitAmount := testPrice
err = acc.Debit(peer1Addr, peer1DebitAmount)
if err != nil {
t.Fatal(err)
}
peer2CreditAmount := 2 * testPrice
err = acc.Credit(peer2Addr, peer2CreditAmount)
if err != nil {
t.Fatal(err)
}
acc = accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
})
peer1Balance, err := acc.Balance(peer1Addr)
if err != nil {
t.Fatal(err)
}
if peer1Balance != int64(peer1DebitAmount) {
t.Fatalf("peer1Balance not loaded correctly. got %d, wanted %d", peer1Balance, peer1DebitAmount)
}
peer2Balance, err := acc.Balance(peer2Addr)
if err != nil {
t.Fatal(err)
}
if peer2Balance != -int64(peer2CreditAmount) {
t.Fatalf("peer2Balance not loaded correctly. got %d, wanted %d", peer2Balance, -int64(peer2CreditAmount))
}
}
// TestAccountingReserve tests that reserve returns an error if the disconnect threshold would be exceeded
func TestAccountingReserve(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
})
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
err = acc.Reserve(peer1Addr, testDisconnectThreshold-100)
if err != nil {
t.Fatal(err)
}
err = acc.Reserve(peer1Addr, 101)
if err == nil {
t.Fatal("expected error from reserve")
}
if !errors.Is(err, accounting.ErrOverdraft) {
t.Fatalf("expected overdraft error from reserve, got %v", err)
}
}
// TestAccountingDisconnect tests that exceeding the disconnect threshold with Debit returns a p2p.DisconnectError
func TestAccountingDisconnect(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
})
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
err = acc.Debit(peer1Addr, testDisconnectThreshold)
if err == nil {
t.Fatal("expected Add to return error")
}
var e *p2p.DisconnectError
if !errors.As(err, &e) {
t.Fatalf("expected DisconnectError, got %v", err)
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"github.com/ethersphere/bee/pkg/swarm"
"sync"
)
type MockAccounting struct {
lock sync.Mutex
balances map[string]int64
}
func (ma *MockAccounting) Reserve(peer swarm.Address, price uint64) error {
return nil
}
func (ma *MockAccounting) Release(peer swarm.Address, price uint64) {
}
func (ma *MockAccounting) Credit(peer swarm.Address, price uint64) error {
ma.lock.Lock()
defer ma.lock.Unlock()
ma.balances[peer.String()] -= int64(price)
return nil
}
func (ma *MockAccounting) Debit(peer swarm.Address, price uint64) error {
ma.lock.Lock()
defer ma.lock.Unlock()
ma.balances[peer.String()] += int64(price)
return nil
}
func (ma *MockAccounting) Balance(peer swarm.Address) (int64, error) {
ma.lock.Lock()
defer ma.lock.Unlock()
return ma.balances[peer.String()], nil
}
func NewAccounting() *MockAccounting {
return &MockAccounting{
balances: make(map[string]int64),
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"github.com/ethersphere/bee/pkg/swarm"
)
type MockPricer struct {
peerPrice uint64
price uint64
}
func NewPricer(price, peerPrice uint64) *MockPricer {
return &MockPricer{
peerPrice: peerPrice,
price: price,
}
}
func (pricer *MockPricer) PeerPrice(peer, chunk swarm.Address) uint64 {
return pricer.peerPrice
}
func (pricer *MockPricer) Price(chunk swarm.Address) uint64 {
return pricer.price
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package accounting
import (
"github.com/ethersphere/bee/pkg/swarm"
)
type Pricer interface {
// PeerPrice is the price the peer charges for a given chunk hash
PeerPrice(peer, chunk swarm.Address) uint64
// Price is the price we charge for a given chunk hash
Price(chunk swarm.Address) uint64
}
type FixedPricer struct {
overlay swarm.Address
poPrice uint64
}
func NewFixedPricer(overlay swarm.Address, poPrice uint64) *FixedPricer {
return &FixedPricer{
overlay: overlay,
poPrice: poPrice,
}
}
func (pricer *FixedPricer) PeerPrice(peer, chunk swarm.Address) uint64 {
return uint64(swarm.MaxPO-swarm.Proximity(peer.Bytes(), chunk.Bytes())) * pricer.poPrice
}
func (pricer *FixedPricer) Price(chunk swarm.Address) uint64 {
return pricer.PeerPrice(pricer.overlay, chunk)
}
......@@ -16,6 +16,7 @@ import (
"sync"
"time"
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/crypto"
......@@ -65,23 +66,24 @@ type Bee struct {
}
type Options struct {
DataDir string
DBCapacity uint64
Password string
APIAddr string
DebugAPIAddr string
Addr string
NATAddr string
EnableWS bool
EnableQUIC bool
NetworkID uint64
WelcomeMessage string
Bootnodes []string
CORSAllowedOrigins []string
Logger logging.Logger
TracingEnabled bool
TracingEndpoint string
TracingServiceName string
DataDir string
DBCapacity uint64
Password string
APIAddr string
DebugAPIAddr string
Addr string
NATAddr string
EnableWS bool
EnableQUIC bool
NetworkID uint64
WelcomeMessage string
Bootnodes []string
CORSAllowedOrigins []string
Logger logging.Logger
TracingEnabled bool
TracingEndpoint string
TracingServiceName string
DisconnectThreshold uint64
}
func NewBee(o Options) (*Bee, error) {
......@@ -232,10 +234,18 @@ func NewBee(o Options) (*Bee, error) {
}
b.localstoreCloser = storer
acc := accounting.NewAccounting(accounting.Options{
Logger: logger,
Store: stateStore,
DisconnectThreshold: o.DisconnectThreshold,
})
retrieve := retrieval.New(retrieval.Options{
Streamer: p2ps,
ChunkPeerer: topologyDriver,
Logger: logger,
Accounting: acc,
Pricer: accounting.NewFixedPricer(address, 10),
})
tagg := tags.NewTags()
......
......@@ -7,6 +7,7 @@ package retrieval
import (
"context"
"fmt"
"github.com/ethersphere/bee/pkg/accounting"
"time"
"github.com/ethersphere/bee/pkg/logging"
......@@ -39,6 +40,8 @@ type Service struct {
storer storage.Storer
singleflight singleflight.Group
logger logging.Logger
accounting accounting.Interface
pricer accounting.Pricer
}
type Options struct {
......@@ -46,6 +49,8 @@ type Options struct {
ChunkPeerer topology.EachPeerer
Storer storage.Storer
Logger logging.Logger
Accounting accounting.Interface
Pricer accounting.Pricer
}
func New(o Options) *Service {
......@@ -54,6 +59,8 @@ func New(o Options) *Service {
peerSuggester: o.ChunkPeerer,
storer: o.Storer,
logger: o.Logger,
accounting: o.Accounting,
pricer: o.Pricer,
}
}
......@@ -118,6 +125,15 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
if err != nil {
return nil, peer, fmt.Errorf("get closest: %w", err)
}
// compute the price we pay for this chunk and reserve it for the rest of this function
chunkPrice := s.pricer.PeerPrice(peer, addr)
err = s.accounting.Reserve(peer, chunkPrice)
if err != nil {
return nil, peer, err
}
defer s.accounting.Release(peer, chunkPrice)
s.logger.Tracef("retrieval: requesting chunk %s from peer %s", addr, peer)
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
......@@ -144,6 +160,12 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
return nil, peer, fmt.Errorf("read delivery: %w peer %s", err, peer.String())
}
// credit the peer after successful delivery
err = s.accounting.Credit(peer, chunkPrice)
if err != nil {
return nil, peer, err
}
return d.Data, peer, nil
}
......@@ -212,6 +234,13 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
return fmt.Errorf("write delivery: %w peer %s", err, p.Address.String())
}
// compute the price we charge for this chunk and debit it from p's balance
chunkPrice := s.pricer.Price(chunk.Address())
err = s.accounting.Debit(p.Address, chunkPrice)
if err != nil {
return err
}
return nil
}
......
......@@ -13,6 +13,7 @@ import (
"testing"
"time"
accountingmock "github.com/ethersphere/bee/pkg/accounting/mock"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
......@@ -43,15 +44,24 @@ func TestDelivery(t *testing.T) {
t.Fatal(err)
}
serverMockAccounting := accountingmock.NewAccounting()
price := uint64(10)
pricerMock := accountingmock.NewPricer(price, price)
// create the server that will handle the request and will serve the response
server := retrieval.New(retrieval.Options{
Storer: mockStorer,
Logger: logger,
Storer: mockStorer,
Logger: logger,
Accounting: serverMockAccounting,
Pricer: pricerMock,
})
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
)
clientMockAccounting := accountingmock.NewAccounting()
// client mock storer does not store any data at this point
// but should be checked at at the end of the test for the
// presence of the reqAddr key and value to ensure delivery
......@@ -68,6 +78,8 @@ func TestDelivery(t *testing.T) {
ChunkPeerer: ps,
Storer: clientMockStorer,
Logger: logger,
Accounting: clientMockAccounting,
Pricer: pricerMock,
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
......@@ -120,6 +132,11 @@ func TestDelivery(t *testing.T) {
t.Fatalf("got too many deliveries. want 1 got %d", len(gotDeliveries))
}
clientBalance, _ := clientMockAccounting.Balance(peerID)
if clientBalance != -int64(price) {
t.Fatalf("unexpected balance on client. want %d got %d", -price, clientBalance)
}
}
type mockPeerSuggester struct {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment