Commit 0d466d40 authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

settlement protocol and payment threshold (#465)

* introduce pseudosettle protocol and switch to payment threshold + tolerance
* reduce default threshold and tolerance by a few magnitudes
* remove peer from error messages
parent 7ab07b46
......@@ -26,26 +26,27 @@ import (
func (c *command) initStartCmd() (err error) {
const (
optionNameDataDir = "data-dir"
optionNameDBCapacity = "db-capacity"
optionNamePassword = "password"
optionNamePasswordFile = "password-file"
optionNameAPIAddr = "api-addr"
optionNameP2PAddr = "p2p-addr"
optionNameNATAddr = "nat-addr"
optionNameP2PWSEnable = "p2p-ws-enable"
optionNameP2PQUICEnable = "p2p-quic-enable"
optionNameDebugAPIEnable = "debug-api-enable"
optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode"
optionNameNetworkID = "network-id"
optionWelcomeMessage = "welcome-message"
optionCORSAllowedOrigins = "cors-allowed-origins"
optionNameTracingEnabled = "tracing-enable"
optionNameTracingEndpoint = "tracing-endpoint"
optionNameTracingServiceName = "tracing-service-name"
optionNameVerbosity = "verbosity"
optionNameDisconnectThreshold = "disconnect-threshold"
optionNameDataDir = "data-dir"
optionNameDBCapacity = "db-capacity"
optionNamePassword = "password"
optionNamePasswordFile = "password-file"
optionNameAPIAddr = "api-addr"
optionNameP2PAddr = "p2p-addr"
optionNameNATAddr = "nat-addr"
optionNameP2PWSEnable = "p2p-ws-enable"
optionNameP2PQUICEnable = "p2p-quic-enable"
optionNameDebugAPIEnable = "debug-api-enable"
optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode"
optionNameNetworkID = "network-id"
optionWelcomeMessage = "welcome-message"
optionCORSAllowedOrigins = "cors-allowed-origins"
optionNameTracingEnabled = "tracing-enable"
optionNameTracingEndpoint = "tracing-endpoint"
optionNameTracingServiceName = "tracing-service-name"
optionNameVerbosity = "verbosity"
optionNamePaymentThreshold = "payment-threshold"
optionNamePaymentTolerance = "payment-tolerance"
)
cmd := &cobra.Command{
......@@ -113,24 +114,25 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
}
b, err := node.NewBee(node.Options{
DataDir: c.config.GetString(optionNameDataDir),
DBCapacity: c.config.GetUint64(optionNameDBCapacity),
Password: password,
APIAddr: c.config.GetString(optionNameAPIAddr),
DebugAPIAddr: debugAPIAddr,
Addr: c.config.GetString(optionNameP2PAddr),
NATAddr: c.config.GetString(optionNameNATAddr),
EnableWS: c.config.GetBool(optionNameP2PWSEnable),
EnableQUIC: c.config.GetBool(optionNameP2PQUICEnable),
NetworkID: c.config.GetUint64(optionNameNetworkID),
WelcomeMessage: c.config.GetString(optionWelcomeMessage),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins),
TracingEnabled: c.config.GetBool(optionNameTracingEnabled),
TracingEndpoint: c.config.GetString(optionNameTracingEndpoint),
TracingServiceName: c.config.GetString(optionNameTracingServiceName),
Logger: logger,
DisconnectThreshold: c.config.GetUint64(optionNameDisconnectThreshold),
DataDir: c.config.GetString(optionNameDataDir),
DBCapacity: c.config.GetUint64(optionNameDBCapacity),
Password: password,
APIAddr: c.config.GetString(optionNameAPIAddr),
DebugAPIAddr: debugAPIAddr,
Addr: c.config.GetString(optionNameP2PAddr),
NATAddr: c.config.GetString(optionNameNATAddr),
EnableWS: c.config.GetBool(optionNameP2PWSEnable),
EnableQUIC: c.config.GetBool(optionNameP2PQUICEnable),
NetworkID: c.config.GetUint64(optionNameNetworkID),
WelcomeMessage: c.config.GetString(optionWelcomeMessage),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins),
TracingEnabled: c.config.GetBool(optionNameTracingEnabled),
TracingEndpoint: c.config.GetString(optionNameTracingEndpoint),
TracingServiceName: c.config.GetString(optionNameTracingServiceName),
Logger: logger,
PaymentThreshold: c.config.GetUint64(optionNamePaymentThreshold),
PaymentTolerance: c.config.GetUint64(optionNamePaymentTolerance),
})
if err != nil {
return err
......@@ -194,7 +196,8 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
cmd.Flags().String(optionNameTracingServiceName, "bee", "service name identifier for tracing")
cmd.Flags().String(optionNameVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace")
cmd.Flags().String(optionWelcomeMessage, "", "send a welcome message string during handshakes")
cmd.Flags().Uint64(optionNameDisconnectThreshold, 100000000000, "threshold in BZZ until which you allow peers to be indebted before disconnecting")
cmd.Flags().Uint64(optionNamePaymentThreshold, 100000, "threshold in BZZ where you expect to get paid from your peers")
cmd.Flags().Uint64(optionNamePaymentTolerance, 10000, "excess debt above payment threshold in BZZ where you disconnect from your peer")
c.root.AddCommand(cmd)
return nil
......
......@@ -5,6 +5,7 @@
package accounting
import (
"context"
"errors"
"fmt"
"strings"
......@@ -12,6 +13,7 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -48,34 +50,49 @@ type PeerBalance struct {
// Options for accounting
type Options struct {
DisconnectThreshold uint64
Logger logging.Logger
Store storage.StateStorer
PaymentThreshold uint64
PaymentTolerance uint64
Logger logging.Logger
Store storage.StateStorer
Settlement settlement.Interface
}
// Accounting is the main implementation of the accounting interface
type Accounting struct {
balancesMu sync.Mutex // mutex for accessing the balances map
balances map[string]*PeerBalance
logger logging.Logger
store storage.StateStorer
disconnectThreshold uint64 // the debt threshold at which we will disconnect from a peer
metrics metrics
balancesMu sync.Mutex // mutex for accessing the balances map
balances map[string]*PeerBalance
logger logging.Logger
store storage.StateStorer
paymentThreshold uint64 // the payment threshold in BZZ we communicate to our peers
paymentTolerance uint64 // the amount in BZZ we let peers exceed the payment threshold before disconnected
settlement settlement.Interface
metrics metrics
}
var (
// ErrOverdraft is the error returned if the expected debt in Reserve would exceed the payment thresholds
ErrOverdraft = errors.New("attempted overdraft")
// ErrDisconnectThresholdExceeded is the error returned if a peer has exceeded the disconnect threshold
ErrDisconnectThresholdExceeded = errors.New("disconnect threshold exceeded")
// ErrInvalidPaymentTolerance is the error returned if the payment tolerance is too high compared to the payment threshold
ErrInvalidPaymentTolerance = errors.New("payment tolerance must be less than half the payment threshold")
)
// NewAccounting creates a new Accounting instance with the provided options
func NewAccounting(o Options) *Accounting {
return &Accounting{
balances: make(map[string]*PeerBalance),
disconnectThreshold: o.DisconnectThreshold,
logger: o.Logger,
store: o.Store,
metrics: newMetrics(),
func NewAccounting(o Options) (*Accounting, error) {
if o.PaymentTolerance > o.PaymentThreshold/2 {
return nil, ErrInvalidPaymentTolerance
}
return &Accounting{
balances: make(map[string]*PeerBalance),
paymentThreshold: o.PaymentThreshold,
paymentTolerance: o.PaymentTolerance,
logger: o.Logger,
store: o.Store,
settlement: o.Settlement,
metrics: newMetrics(),
}, nil
}
// Reserve reserves a portion of the balance for peer
......@@ -88,16 +105,13 @@ func (a *Accounting) Reserve(peer swarm.Address, price uint64) error {
balance.lock.Lock()
defer balance.lock.Unlock()
// the previously reserved balance plus the new price is the maximum amount paid if all current operations are successful
// since we pay this we have to reduce this (positive quantity) from the balance
// the disconnectThreshold is stored as a positive value which is why it must be negated prior to comparison
if balance.freeBalance()-int64(price) < -int64(a.disconnectThreshold) {
// check if the expected debt is already over the payment threshold
if balance.expectedDebt() > a.paymentThreshold {
a.metrics.AccountingBlocksCount.Inc()
return fmt.Errorf("%w with peer %v", ErrOverdraft, peer)
return ErrOverdraft
}
balance.reserved += price
return nil
}
......@@ -135,22 +149,62 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance)
// compute expected debt before update because reserve still includes the amount that is deducted from the balance
expectedDebt := balance.expectedDebt()
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return err
return fmt.Errorf("failed to persist balance: %w", err)
}
balance.balance = nextBalance
a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc()
// TODO: try to initiate payment if payment threshold is reached
// if balance.balance < -int64(a.paymentThreshold) { }
// if our expected debt exceeds our payment threshold (which we assume is also the peers payment threshold), trigger settlement
if expectedDebt >= a.paymentThreshold {
err = a.settle(peer, balance)
if err != nil {
a.logger.Errorf("failed to settle with peer %v: %v", peer, err)
}
}
return nil
}
// settle all debt with a peer
// the lock on balance must be held when called
func (a *Accounting) settle(peer swarm.Address, balance *PeerBalance) error {
// don't do anything if there is no actual debt
// this might be the case if the peer owes us and the total reserve for a peer exceeds the payment treshhold
if balance.balance >= 0 {
return nil
}
paymentAmount := uint64(-balance.balance)
oldBalance := balance.balance
nextBalance := oldBalance + int64(paymentAmount)
// try to save the next balance first
// otherwise we might pay and then not be able to save, thus paying again after restart
err := a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
err = a.settlement.Pay(context.Background(), peer, paymentAmount)
if err != nil {
err = fmt.Errorf("settlement for amount %d failed: %w", paymentAmount, err)
// if the payment didn't work we should restore the old balance in the state store
if storeErr := a.store.Put(peerBalanceKey(peer), nextBalance); storeErr != nil {
a.logger.Errorf("failed to restore balance after failed settlement for peer %v: %v", peer, storeErr)
}
return err
}
balance.balance = nextBalance
return nil
}
// Debit increases the amount of debt we have with the given peer (and decreases existing credit)
func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
balance, err := a.getPeerBalance(peer)
......@@ -167,7 +221,7 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return err
return fmt.Errorf("failed to persist balance: %w", err)
}
balance.balance = nextBalance
......@@ -175,10 +229,10 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
a.metrics.TotalDebitedAmount.Add(float64(price))
a.metrics.DebitEventsCount.Inc()
if nextBalance >= int64(a.disconnectThreshold) {
if nextBalance >= int64(a.paymentThreshold+a.paymentTolerance) {
// peer too much in debt
a.metrics.AccountingDisconnectsCount.Inc()
return p2p.NewDisconnectError(fmt.Errorf("disconnect threshold exceeded for peer %s", peer.String()))
return p2p.NewDisconnectError(ErrDisconnectThresholdExceeded)
}
return nil
......@@ -289,6 +343,47 @@ func balanceKeyPeer(key []byte) (swarm.Address, error) {
return addr, nil
}
func (pb *PeerBalance) freeBalance() int64 {
// expectedBalance returns the balance we expect to have with a peer if all reserved funds are actually credited
func (pb *PeerBalance) expectedBalance() int64 {
return pb.balance - int64(pb.reserved)
}
// expectedDebt returns the debt we expect to have with a peer if all reserved funds are actually credited
func (pb *PeerBalance) expectedDebt() uint64 {
expectedBalance := pb.expectedBalance()
if expectedBalance >= 0 {
return 0
}
return uint64(-expectedBalance)
}
// NotifyPayment is called by Settlement when we received payment
// Implements the PaymentObserver interface
func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
balance, err := a.getPeerBalance(peer)
if err != nil {
return err
}
balance.lock.Lock()
defer balance.lock.Unlock()
nextBalance := balance.balance - int64(amount)
// don't allow a payment to put use more into debt than the tolerance
// this is to prevent another node tricking us into settling by settling first (e.g. send a bouncing cheque to trigger an honest cheque in swap)
if nextBalance < -int64(a.paymentTolerance) {
return fmt.Errorf("refusing to accept payment which would put us too much in debt, new balance would have been %d", nextBalance)
}
a.logger.Tracef("crediting peer %v with amount %d due to payment, new balance is %d", peer, amount, nextBalance)
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
balance.balance = nextBalance
return nil
}
......@@ -5,19 +5,22 @@
package accounting_test
import (
"context"
"errors"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
"io/ioutil"
"testing"
)
const (
testDisconnectThreshold = 10000
testPrice = uint64(10)
testPaymentThreshold = 10000
testPaymentTolerance = 1000
testPrice = uint64(10)
)
// booking represents an accounting action and the expected result afterwards
......@@ -34,11 +37,14 @@ func TestAccountingAddBalance(t *testing.T) {
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThreshold,
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
......@@ -99,11 +105,14 @@ func TestAccountingAdd_persistentBalances(t *testing.T) {
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThreshold,
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
......@@ -127,11 +136,13 @@ func TestAccountingAdd_persistentBalances(t *testing.T) {
t.Fatal(err)
}
acc = accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
acc, err = accounting.NewAccounting(accounting.Options{
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Balance, err := acc.Balance(peer1Addr)
if err != nil {
......@@ -152,30 +163,34 @@ func TestAccountingAdd_persistentBalances(t *testing.T) {
}
}
// TestAccountingReserve tests that reserve returns an error if the disconnect threshold would be exceeded
// TestAccountingReserve tests that reserve returns an error if the payment threshold would be exceeded for a second time
func TestAccountingReserve(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThreshold,
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
err = acc.Reserve(peer1Addr, testDisconnectThreshold-100)
// it should allow to cross the threshold one time
err = acc.Reserve(peer1Addr, testPaymentThreshold+1)
if err != nil {
t.Fatal(err)
}
err = acc.Reserve(peer1Addr, 101)
err = acc.Reserve(peer1Addr, 1)
if err == nil {
t.Fatal("expected error from reserve")
}
......@@ -192,18 +207,29 @@ func TestAccountingDisconnect(t *testing.T) {
store := mock.NewStateStore()
defer store.Close()
acc := accounting.NewAccounting(accounting.Options{
DisconnectThreshold: testDisconnectThreshold,
Logger: logger,
Store: store,
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentTolerance,
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
err = acc.Debit(peer1Addr, testDisconnectThreshold)
// put the peer 1 unit away from disconnect
err = acc.Debit(peer1Addr, testPaymentThreshold+testPaymentTolerance-1)
if err != nil {
t.Fatal("expected no error while still within tolerance")
}
// put the peer over thee threshold
err = acc.Debit(peer1Addr, 1)
if err == nil {
t.Fatal("expected Add to return error")
}
......@@ -213,3 +239,160 @@ func TestAccountingDisconnect(t *testing.T) {
t.Fatalf("expected DisconnectError, got %v", err)
}
}
type settlementMock struct {
paidAmount uint64
paidPeer swarm.Address
}
func (s *settlementMock) Pay(ctx context.Context, peer swarm.Address, amount uint64) error {
s.paidPeer = peer
s.paidAmount = amount
return nil
}
// TestAccountingCallSettlement tests that settlement is called correctly if the payment threshold is hit
func TestAccountingCallSettlement(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
settlement := &settlementMock{}
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentTolerance,
Logger: logger,
Store: store,
Settlement: settlement,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
err = acc.Reserve(peer1Addr, testPaymentThreshold)
if err != nil {
t.Fatal(err)
}
// Credit until payment treshold
err = acc.Credit(peer1Addr, testPaymentThreshold)
if err != nil {
t.Fatal(err)
}
acc.Release(peer1Addr, testPaymentThreshold)
if !settlement.paidPeer.Equal(peer1Addr) {
t.Fatalf("paid to wrong peer. got %v wanted %v", settlement.paidPeer, peer1Addr)
}
if settlement.paidAmount != testPaymentThreshold {
t.Fatalf("paid wrong amount. got %d wanted %d", settlement.paidAmount, testPaymentThreshold)
}
balance, err := acc.Balance(peer1Addr)
if err != nil {
t.Fatal(err)
}
if balance != 0 {
t.Fatalf("expected balance to be reset. got %d", balance)
}
// Assume 100 is reserved by some other request
err = acc.Reserve(peer1Addr, 100)
if err != nil {
t.Fatal(err)
}
// Credit until the expected debt exceeeds payment threshold
expectedAmount := uint64(testPaymentThreshold - 100)
err = acc.Reserve(peer1Addr, expectedAmount)
if err != nil {
t.Fatal(err)
}
err = acc.Credit(peer1Addr, expectedAmount)
if err != nil {
t.Fatal(err)
}
if !settlement.paidPeer.Equal(peer1Addr) {
t.Fatalf("paid to wrong peer. got %v wanted %v", settlement.paidPeer, peer1Addr)
}
if settlement.paidAmount != expectedAmount {
t.Fatalf("paid wrong amount. got %d wanted %d", settlement.paidAmount, expectedAmount)
}
}
// TestAccountingNotifyPayment tests that payments adjust the balance and payment which put us into debt are rejected
func TestAccountingNotifyPayment(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentTolerance,
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
debtAmount := uint64(100)
err = acc.Debit(peer1Addr, debtAmount+testPaymentTolerance)
if err != nil {
t.Fatal(err)
}
err = acc.NotifyPayment(peer1Addr, debtAmount+testPaymentTolerance)
if err != nil {
t.Fatal(err)
}
err = acc.Debit(peer1Addr, debtAmount)
if err != nil {
t.Fatal(err)
}
err = acc.NotifyPayment(peer1Addr, debtAmount+testPaymentTolerance+1)
if err == nil {
t.Fatal("expected payment to be rejected")
}
}
func TestAccountingInvalidPaymentTolerance(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
_, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentThreshold/2 + 1,
Logger: logger,
Store: store,
})
if err == nil {
t.Fatal("expected error")
}
if err != accounting.ErrInvalidPaymentTolerance {
t.Fatalf("got wrong error. got %v wanted %v", err, accounting.ErrInvalidPaymentTolerance)
}
}
......@@ -40,6 +40,7 @@ import (
"github.com/ethersphere/bee/pkg/pusher"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
......@@ -68,24 +69,25 @@ type Bee struct {
}
type Options struct {
DataDir string
DBCapacity uint64
Password string
APIAddr string
DebugAPIAddr string
Addr string
NATAddr string
EnableWS bool
EnableQUIC bool
NetworkID uint64
WelcomeMessage string
Bootnodes []string
CORSAllowedOrigins []string
Logger logging.Logger
TracingEnabled bool
TracingEndpoint string
TracingServiceName string
DisconnectThreshold uint64
DataDir string
DBCapacity uint64
Password string
APIAddr string
DebugAPIAddr string
Addr string
NATAddr string
EnableWS bool
EnableQUIC bool
NetworkID uint64
WelcomeMessage string
Bootnodes []string
CORSAllowedOrigins []string
Logger logging.Logger
TracingEnabled bool
TracingEndpoint string
TracingServiceName string
PaymentThreshold uint64
PaymentTolerance uint64
}
func NewBee(o Options) (*Bee, error) {
......@@ -233,12 +235,28 @@ func NewBee(o Options) (*Bee, error) {
}
b.localstoreCloser = storer
acc := accounting.NewAccounting(accounting.Options{
Logger: logger,
Store: stateStore,
DisconnectThreshold: o.DisconnectThreshold,
settlement := pseudosettle.New(pseudosettle.Options{
Streamer: p2ps,
Logger: logger,
})
if err = p2ps.AddProtocol(settlement.Protocol()); err != nil {
return nil, fmt.Errorf("pseudosettle service: %w", err)
}
acc, err := accounting.NewAccounting(accounting.Options{
Logger: logger,
Store: stateStore,
PaymentThreshold: o.PaymentThreshold,
PaymentTolerance: o.PaymentTolerance,
Settlement: settlement,
})
if err != nil {
return nil, fmt.Errorf("accounting: %w", err)
}
settlement.SetPaymentObserver(acc)
chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
retrieve := retrieval.New(retrieval.Options{
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package settlement
import (
"context"
"github.com/ethersphere/bee/pkg/swarm"
)
// Interface is the interface used by Accounting to trigger settlement
type Interface interface {
// Pay initiates a payment to the given peer
// It should return without error it is likely that the payment worked
Pay(ctx context.Context, peer swarm.Address, amount uint64) error
}
// PaymentObserver is the interface Settlement uses to notify other components of an incoming payment
type PaymentObserver interface {
// NotifyPayment is called when a payment from peer was successfully received
NotifyPayment(peer swarm.Address, amount uint64) error
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
Package pseudosettle implements a pretend settlement protocol where nodes send pretend payment messages consisting only of the payment amount.
Its purpose is to be able to have the full payment / disconnect treshold machinery in place without having to send cheques or real values.
*/
package pseudosettle
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. pseudosettle.proto"
package pb
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pseudosettle.proto
package pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Payment struct {
Amount uint64 `protobuf:"varint,1,opt,name=Amount,proto3" json:"Amount,omitempty"`
}
func (m *Payment) Reset() { *m = Payment{} }
func (m *Payment) String() string { return proto.CompactTextString(m) }
func (*Payment) ProtoMessage() {}
func (*Payment) Descriptor() ([]byte, []int) {
return fileDescriptor_3ff21bb6c9cf5e84, []int{0}
}
func (m *Payment) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Payment) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Payment.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Payment) XXX_Merge(src proto.Message) {
xxx_messageInfo_Payment.Merge(m, src)
}
func (m *Payment) XXX_Size() int {
return m.Size()
}
func (m *Payment) XXX_DiscardUnknown() {
xxx_messageInfo_Payment.DiscardUnknown(m)
}
var xxx_messageInfo_Payment proto.InternalMessageInfo
func (m *Payment) GetAmount() uint64 {
if m != nil {
return m.Amount
}
return 0
}
func init() {
proto.RegisterType((*Payment)(nil), "pseudosettle.Payment")
}
func init() { proto.RegisterFile("pseudosettle.proto", fileDescriptor_3ff21bb6c9cf5e84) }
var fileDescriptor_3ff21bb6c9cf5e84 = []byte{
// 114 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2a, 0x28, 0x4e, 0x2d,
0x4d, 0xc9, 0x2f, 0x4e, 0x2d, 0x29, 0xc9, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2,
0x41, 0x16, 0x53, 0x52, 0xe4, 0x62, 0x0f, 0x48, 0xac, 0xcc, 0x4d, 0xcd, 0x2b, 0x11, 0x12, 0xe3,
0x62, 0x73, 0xcc, 0xcd, 0x2f, 0xcd, 0x2b, 0x91, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x09, 0x82, 0xf2,
0x9c, 0x64, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x09,
0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e, 0x21, 0x8a, 0xa9, 0x20, 0x29,
0x89, 0x0d, 0x6c, 0xaa, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xfb, 0x97, 0x5c, 0xf8, 0x6b, 0x00,
0x00, 0x00,
}
func (m *Payment) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Payment) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Payment) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Amount != 0 {
i = encodeVarintPseudosettle(dAtA, i, uint64(m.Amount))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintPseudosettle(dAtA []byte, offset int, v uint64) int {
offset -= sovPseudosettle(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Payment) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Amount != 0 {
n += 1 + sovPseudosettle(uint64(m.Amount))
}
return n
}
func sovPseudosettle(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozPseudosettle(x uint64) (n int) {
return sovPseudosettle(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Payment) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPseudosettle
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Payment: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Payment: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Amount", wireType)
}
m.Amount = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPseudosettle
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Amount |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipPseudosettle(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPseudosettle
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPseudosettle
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPseudosettle(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPseudosettle
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPseudosettle
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPseudosettle
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthPseudosettle
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupPseudosettle
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthPseudosettle
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthPseudosettle = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPseudosettle = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupPseudosettle = fmt.Errorf("proto: unexpected end of group")
)
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
syntax = "proto3";
package pseudosettle;
option go_package = "pb";
message Payment {
uint64 Amount = 1;
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pseudosettle
import (
"context"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/settlement"
pb "github.com/ethersphere/bee/pkg/settlement/pseudosettle/pb"
"github.com/ethersphere/bee/pkg/swarm"
)
const (
protocolName = "pseudosettle"
protocolVersion = "1.0.0"
streamName = "pseudosettle"
)
type Service struct {
streamer p2p.Streamer
logger logging.Logger
observer settlement.PaymentObserver
}
type Options struct {
Streamer p2p.Streamer
Logger logging.Logger
}
func New(o Options) *Service {
return &Service{
streamer: o.Streamer,
logger: o.Logger,
}
}
func (s *Service) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Handler: s.handler,
},
},
}
}
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
r := protobuf.NewReader(stream)
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var req pb.Payment
if err := r.ReadMsg(&req); err != nil {
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
s.logger.Tracef("received payment message from peer %v of %d", p.Address, req.Amount)
return s.observer.NotifyPayment(p.Address, req.Amount)
}
// Pay initiates a payment to the given peer
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount uint64) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = stream.Reset()
} else {
go stream.FullClose()
}
}()
s.logger.Tracef("sending payment message to peer %v of %d", peer, amount)
w := protobuf.NewWriter(stream)
err = w.WriteMsgWithContext(ctx, &pb.Payment{
Amount: amount,
})
return err
}
// SetPaymentObserver sets the payment observer which will be notified of incoming payments
func (s *Service) SetPaymentObserver(observer settlement.PaymentObserver) {
s.observer = observer
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pseudosettle_test
import (
"bytes"
"context"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle/pb"
"github.com/ethersphere/bee/pkg/swarm"
)
type testObserver struct {
called bool
peer swarm.Address
amount uint64
}
func (t *testObserver) NotifyPayment(peer swarm.Address, amount uint64) error {
t.called = true
t.peer = peer
t.amount = amount
return nil
}
func TestPayment(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
observer := &testObserver{}
recipient := pseudosettle.New(pseudosettle.Options{
Logger: logger,
})
recipient.SetPaymentObserver(observer)
recorder := streamtest.New(
streamtest.WithProtocols(recipient.Protocol()),
)
payer := pseudosettle.New(pseudosettle.Options{
Streamer: recorder,
Logger: logger,
})
peerID := swarm.MustParseHexAddress("9ee7add7")
amount := uint64(10000)
err := payer.Pay(context.Background(), peerID, amount)
if err != nil {
t.Fatal(err)
}
records, err := recorder.Records(peerID, "pseudosettle", "1.0.0", "pseudosettle")
if err != nil {
t.Fatal(err)
}
if l := len(records); l != 1 {
t.Fatalf("got %v records, want %v", l, 1)
}
record := records[0]
messages, err := protobuf.ReadMessages(
bytes.NewReader(record.In()),
func() protobuf.Message { return new(pb.Payment) },
)
if err != nil {
t.Fatal(err)
}
if len(messages) != 1 {
t.Fatalf("got %v messages, want %v", len(messages), 1)
}
sentAmount := messages[0].(*pb.Payment).Amount
if sentAmount != amount {
t.Fatalf("got message with amount %v, want %v", sentAmount, amount)
}
if !observer.called {
t.Fatal("expected observer to be called")
}
if observer.amount != amount {
t.Fatalf("observer called with wrong amount. got %d, want %d", observer.amount, amount)
}
if !observer.peer.Equal(peerID) {
t.Fatalf("observer called with wrong peer. got %v, want %v", observer.peer, peerID)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment