Commit 1742618a authored by metacertain's avatar metacertain Committed by GitHub

feat: Negotiate cheques (#1924)

Co-authored-by: default avatarAnatol Lupacescu <anatollupacescu@gmail.com>
Co-authored-by: default avatarRalph Pichler <pichler.ralph@gmail.com>
parent 050102b9
......@@ -64,6 +64,7 @@ const (
optionNameSwapDeploymentGasPrice = "swap-deployment-gas-price"
optionNameFullNode = "full-node"
optionNamePostageContractAddress = "postage-stamp-address"
optionNamePriceOracleAddress = "price-oracle-address"
optionNameBlockTime = "block-time"
optionWarmUpTime = "warmup-time"
)
......@@ -219,9 +220,9 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace")
cmd.Flags().String(optionWelcomeMessage, "", "send a welcome message string during handshakes")
cmd.Flags().Bool(optionNameGlobalPinningEnabled, false, "enable global pinning")
cmd.Flags().String(optionNamePaymentThreshold, "10000000000000", "threshold in BZZ where you expect to get paid from your peers")
cmd.Flags().String(optionNamePaymentTolerance, "10000000000000", "excess debt above payment threshold in BZZ where you disconnect from your peer")
cmd.Flags().String(optionNamePaymentEarly, "1000000000000", "amount in BZZ below the peers payment threshold when we initiate settlement")
cmd.Flags().String(optionNamePaymentThreshold, "100000000", "threshold in BZZ where you expect to get paid from your peers")
cmd.Flags().String(optionNamePaymentTolerance, "100000000", "excess debt above payment threshold in BZZ where you disconnect from your peer")
cmd.Flags().String(optionNamePaymentEarly, "10000000", "amount in BZZ below the peers payment threshold when we initiate settlement")
cmd.Flags().StringSlice(optionNameResolverEndpoints, []string{}, "ENS compatible API endpoint for a TLD and with contract address, can be repeated, format [tld:][contract-addr@]url")
cmd.Flags().Bool(optionNameGatewayMode, false, "disable a set of sensitive features in the api")
cmd.Flags().Bool(optionNameBootnodeMode, false, "cause the node to always accept incoming connections")
......@@ -235,6 +236,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap")
cmd.Flags().Bool(optionNameFullNode, false, "cause the node to start in full mode")
cmd.Flags().String(optionNamePostageContractAddress, "", "postage stamp contract address")
cmd.Flags().String(optionNamePriceOracleAddress, "", "price oracle contract address")
cmd.Flags().String(optionNameTransactionHash, "", "proof-of-identity transaction hash")
cmd.Flags().Uint64(optionNameBlockTime, 15, "chain block time")
cmd.Flags().String(optionNameSwapDeploymentGasPrice, "", "gas price in wei to use for deployment and funding")
......
......@@ -152,6 +152,7 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
FullNodeMode: fullNode,
Transaction: c.config.GetString(optionNameTransactionHash),
PostageContractAddress: c.config.GetString(optionNamePostageContractAddress),
PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress),
BlockTime: c.config.GetUint64(optionNameBlockTime),
DeployGasPrice: c.config.GetString(optionNameSwapDeploymentGasPrice),
WarmupTime: c.config.GetDuration(optionWarmUpTime),
......
......@@ -9,6 +9,7 @@ require (
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/ethereum/go-ethereum v1.10.3
github.com/ethersphere/go-storage-incentives-abi v0.3.0
github.com/ethersphere/go-price-oracle-abi v0.1.0
github.com/ethersphere/go-sw3-abi v0.4.0
github.com/ethersphere/langos v1.0.0
github.com/gogo/protobuf v1.3.1
......
......@@ -178,6 +178,8 @@ github.com/ethereum/go-ethereum v1.10.3 h1:SEYOYARvbWnoDl1hOSks3ZJQpRiiRJe8ubaQG
github.com/ethereum/go-ethereum v1.10.3/go.mod h1:99onQmSd1GRGOziyGldI41YQb7EESX3Q4H41IfJgIQQ=
github.com/ethersphere/go-storage-incentives-abi v0.3.0 h1:Y1OyNMI1JjqOmVJlgzR70PPe2Czuh4BglCV/nD3UHIA=
github.com/ethersphere/go-storage-incentives-abi v0.3.0/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc=
github.com/ethersphere/go-price-oracle-abi v0.1.0 h1:yg/hK8nETNvk+GEBASlbakMFv/CVp7HXiycrHw1pRV8=
github.com/ethersphere/go-price-oracle-abi v0.1.0/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
github.com/ethersphere/go-sw3-abi v0.4.0 h1:T3ANY+ktWrPAwe2U0tZi+DILpkHzto5ym/XwV/Bbz8g=
github.com/ethersphere/go-sw3-abi v0.4.0/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU=
github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc=
......
......@@ -29,7 +29,8 @@ var (
balancesOriginatedPrefix = "accounting_originatedbalance_"
// fraction of the refresh rate that is the minimum for monetary settlement
// this value is chosen so that tiny payments are prevented while still allowing small payments in environments with lower payment thresholds
minimumPaymentDivisor = int64(5)
minimumPaymentDivisor = int64(5)
failedSettlementInterval = int64(10) // seconds
)
// Interface is the Accounting interface.
......@@ -83,12 +84,13 @@ type RefreshFunc func(context.Context, swarm.Address, *big.Int, *big.Int) (*big.
// accountingPeer holds all in-memory accounting information for one peer.
type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
shadowReservedBalance *big.Int // amount potentially to be debited for active peer interaction
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
shadowReservedBalance *big.Int // amount potentially to be debited for active peer interaction
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
}
// Accounting is the main implementation of the accounting interface.
......@@ -117,6 +119,7 @@ type Accounting struct {
minimumPayment *big.Int
pricing pricing.Interface
metrics metrics
wg sync.WaitGroup
timeNow func() time.Time
}
......@@ -205,7 +208,9 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
// If our expected debt reduced by what could have been credited on the other side already is less than earlyPayment away from our payment threshold
// and we are actually in debt, trigger settlement.
// we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold.
if increasedExpectedDebtReduced.Cmp(threshold) >= 0 && currentBalance.Cmp(big.NewInt(0)) < 0 {
err = a.settle(peer, accountingPeer)
if err != nil {
return fmt.Errorf("failed to settle with peer %v: %v", peer, err)
......@@ -326,7 +331,6 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
}
paymentAmount := new(big.Int).Neg(compensatedBalance)
// Don't do anything if there is no actual debt or no time passed since last refreshment attempt
// This might be the case if the peer owes us and the total reserve for a peer exceeds the payment threshold.
if paymentAmount.Cmp(big.NewInt(0)) > 0 && timeElapsed > 0 {
......@@ -358,22 +362,28 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
}
if a.payFunction != nil && !balance.paymentOngoing {
// if there is no monetary settlement happening, check if there is something to settle
// compute debt excluding debt created by incoming payments
originatedBalance, err := a.OriginatedBalance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance to settle: %w", err)
difference := now - balance.lastSettlementFailureTimestamp
if difference > failedSettlementInterval {
// if there is no monetary settlement happening, check if there is something to settle
// compute debt excluding debt created by incoming payments
originatedBalance, err := a.OriginatedBalance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance to settle: %w", err)
}
}
}
paymentAmount := new(big.Int).Neg(originatedBalance)
// if the remaining debt is still larger than some minimum amount, trigger monetary settlement
if paymentAmount.Cmp(a.minimumPayment) >= 0 {
balance.paymentOngoing = true
// add settled amount to shadow reserve before sending it
balance.shadowReservedBalance.Add(balance.shadowReservedBalance, paymentAmount)
go a.payFunction(context.Background(), peer, paymentAmount)
paymentAmount := new(big.Int).Neg(originatedBalance)
// if the remaining debt is still larger than some minimum amount, trigger monetary settlement
if paymentAmount.Cmp(a.minimumPayment) >= 0 {
balance.paymentOngoing = true
// add settled amount to shadow reserve before sending it
balance.shadowReservedBalance.Add(balance.shadowReservedBalance, paymentAmount)
a.wg.Add(1)
go a.payFunction(context.Background(), peer, paymentAmount)
}
}
}
......@@ -662,6 +672,7 @@ func (a *Accounting) shadowBalance(peer swarm.Address) (shadowBalance *big.Int,
// NotifyPaymentSent is triggered by async monetary settlement to update our balance and remove it's price from the shadow reserve
func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, receivedError error) {
defer a.wg.Done()
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
......@@ -672,6 +683,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
accountingPeer.shadowReservedBalance.Sub(accountingPeer.shadowReservedBalance, amount)
if receivedError != nil {
accountingPeer.lastSettlementFailureTimestamp = a.timeNow().Unix()
a.logger.Warningf("accounting: payment failure %v", receivedError)
return
}
......@@ -998,3 +1010,9 @@ func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
func (a *Accounting) SetPayFunc(f PayFunc) {
a.payFunction = f
}
// Close hangs up running websockets on shutdown.
func (a *Accounting) Close() error {
a.wg.Wait()
return nil
}
......@@ -42,8 +42,8 @@ type booking struct {
expectedBalance int64
originatedBalance int64
originatedCredit bool
amount int64
notifyPaymentSent bool
overpay uint64
}
// TestAccountingAddBalance does several accounting actions and verifies the balance after each steep
......@@ -119,6 +119,12 @@ func TestAccountingAddOriginatedBalance(t *testing.T) {
t.Fatal(err)
}
f := func(ctx context.Context, peer swarm.Address, amount *big.Int, shadowBalance *big.Int) (*big.Int, int64, error) {
return big.NewInt(0), 0, nil
}
acc.SetRefreshFunc(f)
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
......@@ -126,46 +132,65 @@ func TestAccountingAddOriginatedBalance(t *testing.T) {
bookings := []booking{
// originated credit
{peer: peer1Addr, price: -200, expectedBalance: -200, originatedBalance: -200, originatedCredit: true},
{peer: peer1Addr, price: -2000, expectedBalance: -2000, originatedBalance: -2000, originatedCredit: true},
// forwarder credit
{peer: peer1Addr, price: -200, expectedBalance: -400, originatedBalance: -200, originatedCredit: false},
{peer: peer1Addr, price: -2000, expectedBalance: -4000, originatedBalance: -2000, originatedCredit: false},
// inconsequential debit not moving balance closer to 0 than originbalance is to 0
{peer: peer1Addr, price: 100, expectedBalance: -300, originatedBalance: -200},
{peer: peer1Addr, price: 1000, expectedBalance: -3000, originatedBalance: -2000},
// consequential debit moving balance closer to 0 than originbalance, therefore also moving originated balance along
{peer: peer1Addr, price: 200, expectedBalance: -100, originatedBalance: -100},
// notifypaymentsent that moves originated balance into positive domain
{peer: peer1Addr, amount: 200, expectedBalance: 100, originatedBalance: 100, notifyPaymentSent: true},
{peer: peer1Addr, price: 2000, expectedBalance: -1000, originatedBalance: -1000},
// forwarder credit happening to increase debt
{peer: peer1Addr, price: -7000, expectedBalance: -8000, originatedBalance: -1000, originatedCredit: false},
// expect notifypaymentsent triggered by reserve that moves originated balance into positive domain because of earlier debit triggering overpay
{peer: peer1Addr, price: -1000, expectedBalance: 1000, originatedBalance: 1000, overpay: 9000, notifyPaymentSent: true},
// inconsequential debit because originated balance is in the positive domain
{peer: peer1Addr, price: 100, expectedBalance: 200, originatedBalance: 100},
{peer: peer1Addr, price: 1000, expectedBalance: 2000, originatedBalance: 1000},
// originated credit moving the originated balance back into the negative domain, should be limited to the expectedbalance
{peer: peer1Addr, price: -300, expectedBalance: -100, originatedBalance: -100, originatedCredit: true},
{peer: peer1Addr, price: -3000, expectedBalance: -1000, originatedBalance: -1000, originatedCredit: true},
}
paychan := make(chan struct{})
for i, booking := range bookings {
if booking.notifyPaymentSent {
acc.NotifyPaymentSent(booking.peer, big.NewInt(booking.amount), nil)
} else {
pay := func(ctx context.Context, peer swarm.Address, amount *big.Int) {
if booking.overpay != 0 {
debitAction := acc.PrepareDebit(peer, booking.overpay)
_ = debitAction.Apply()
}
if booking.price < 0 {
err = acc.Reserve(context.Background(), booking.peer, uint64(-booking.price))
if err != nil {
t.Fatal(err)
}
err = acc.Credit(booking.peer, uint64(-booking.price), booking.originatedCredit)
if err != nil {
t.Fatal(err)
}
acc.Release(booking.peer, uint64(-booking.price))
} else {
debitAction := acc.PrepareDebit(booking.peer, uint64(booking.price))
err = debitAction.Apply()
if err != nil {
t.Fatal(err)
acc.NotifyPaymentSent(peer, amount, nil)
paychan <- struct{}{}
}
acc.SetPayFunc(pay)
if booking.price < 0 {
err = acc.Reserve(context.Background(), booking.peer, uint64(-booking.price))
if err != nil {
t.Fatal(err)
}
if booking.notifyPaymentSent {
select {
case <-paychan:
case <-time.After(1 * time.Second):
t.Fatal("expected payment sent")
}
debitAction.Cleanup()
}
err = acc.Credit(booking.peer, uint64(-booking.price), booking.originatedCredit)
if err != nil {
t.Fatal(err)
}
acc.Release(booking.peer, uint64(-booking.price))
} else {
debitAction := acc.PrepareDebit(booking.peer, uint64(booking.price))
err = debitAction.Apply()
if err != nil {
t.Fatal(err)
}
debitAction.Cleanup()
}
balance, err := acc.Balance(booking.peer)
......@@ -185,7 +210,6 @@ func TestAccountingAddOriginatedBalance(t *testing.T) {
if originatedBalance.Int64() != booking.originatedBalance {
t.Fatalf("originated balance for peer %v not as expected after booking %d. got %d, wanted %d", booking.peer.String(), i, originatedBalance, booking.originatedBalance)
}
}
}
......@@ -925,13 +949,6 @@ func (p *pricingMock) AnnouncePaymentThreshold(ctx context.Context, peer swarm.A
return nil
}
func (p *pricingMock) AnnouncePaymentThresholdAndPriceTable(ctx context.Context, peer swarm.Address, paymentThreshold *big.Int) error {
p.called = true
p.peer = peer
p.paymentThreshold = paymentThreshold
return nil
}
func TestAccountingConnected(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
......@@ -1082,3 +1099,113 @@ func TestAccountingPeerDebt(t *testing.T) {
}
}
func TestAccountingCallPaymentFailureRetries(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc, err := accounting.NewAccounting(testPaymentThreshold, testPaymentTolerance, testPaymentEarly, logger, store, nil, big.NewInt(1))
if err != nil {
t.Fatal(err)
}
refreshchan := make(chan paymentCall, 1)
paychan := make(chan paymentCall, 1)
ts := int64(100)
acc.SetTime(ts)
acc.SetRefreshFunc(func(ctx context.Context, peer swarm.Address, amount *big.Int, shadowBalance *big.Int) (*big.Int, int64, error) {
refreshchan <- paymentCall{peer: peer, amount: big.NewInt(1)}
return big.NewInt(1), ts, nil
})
acc.SetPayFunc(func(ctx context.Context, peer swarm.Address, amount *big.Int) {
paychan <- paymentCall{peer: peer, amount: amount}
})
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
requestPrice := testPaymentThreshold.Uint64() - 100
// Credit until near payment threshold
err = acc.Credit(peer1Addr, requestPrice, true)
if err != nil {
t.Fatal(err)
}
err = acc.Reserve(context.Background(), peer1Addr, 2)
if err != nil {
t.Fatal(err)
}
select {
case <-refreshchan:
case <-time.After(1 * time.Second):
t.Fatalf("expected refreshment")
}
var sentAmount *big.Int
select {
case call := <-paychan:
sentAmount = call.amount
case <-time.After(1 * time.Second):
t.Fatal("payment expected to be sent")
}
acc.Release(peer1Addr, 2)
acc.NotifyPaymentSent(peer1Addr, sentAmount, errors.New("error"))
// try another n requests 1 per second
for i := 0; i < 10; i++ {
ts++
acc.SetTime(ts)
err = acc.Reserve(context.Background(), peer1Addr, 2)
if err != nil {
t.Fatal(err)
}
select {
case <-refreshchan:
case <-time.After(1 * time.Second):
t.Fatal("expected refreshment")
}
if acc.IsPaymentOngoing(peer1Addr) {
t.Fatal("unexpected ongoing payment")
}
acc.Release(peer1Addr, 2)
}
ts++
acc.SetTime(ts)
// try another request
err = acc.Reserve(context.Background(), peer1Addr, 1)
if err != nil {
t.Fatal(err)
}
select {
case <-refreshchan:
case <-time.After(1 * time.Second):
t.Fatalf("expected refreshment")
}
select {
case <-paychan:
case <-time.After(500 * time.Millisecond):
t.Fatal("payment expected to be sent")
}
acc.Release(peer1Addr, 1)
}
......@@ -20,6 +20,7 @@ import (
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/settlement/swap"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/priceoracle"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/transaction"
......@@ -214,8 +215,25 @@ func InitSwap(
chequeStore chequebook.ChequeStore,
cashoutService chequebook.CashoutService,
accounting settlement.Accounting,
) (*swap.Service, error) {
swapProtocol := swapprotocol.New(p2ps, logger, overlayEthAddress)
priceOracleAddress string,
chainID int64,
transactionService transaction.Service,
) (*swap.Service, priceoracle.Service, error) {
var currentPriceOracleAddress common.Address
if priceOracleAddress == "" {
var found bool
currentPriceOracleAddress, found = priceoracle.DiscoverPriceOracleAddress(chainID)
if !found {
return nil, nil, errors.New("no known price oracle address for this network")
}
} else {
currentPriceOracleAddress = common.HexToAddress(priceOracleAddress)
}
priceOracle := priceoracle.New(logger, currentPriceOracleAddress, transactionService, 300)
priceOracle.Start()
swapProtocol := swapprotocol.New(p2ps, logger, overlayEthAddress, priceOracle)
swapAddressBook := swap.NewAddressbook(stateStore)
swapService := swap.New(
......@@ -235,8 +253,8 @@ func InitSwap(
err := p2ps.AddProtocol(swapProtocol.Protocol())
if err != nil {
return nil, err
return nil, nil, err
}
return swapService, nil
return swapService, priceOracle, nil
}
......@@ -62,6 +62,7 @@ import (
"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/pkg/settlement/swap"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/priceoracle"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/steward"
"github.com/ethersphere/bee/pkg/storage"
......@@ -96,6 +97,7 @@ type Bee struct {
topologyHalter topology.Halter
pusherCloser io.Closer
pullerCloser io.Closer
accountingCloser io.Closer
pullSyncCloser io.Closer
pssCloser io.Closer
ethClientCloser func()
......@@ -103,6 +105,7 @@ type Bee struct {
recoveryHandleCleanup func()
listenerCloser io.Closer
postageServiceCloser io.Closer
priceOracleCloser io.Closer
shutdownInProgress bool
shutdownMutex sync.Mutex
}
......@@ -150,8 +153,8 @@ type Options struct {
}
const (
refreshRate = int64(1000000000000)
basePrice = 1000000000
refreshRate = int64(6000000)
basePrice = 10000
)
func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (b *Bee, err error) {
......@@ -518,6 +521,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
if err != nil {
return nil, fmt.Errorf("accounting: %w", err)
}
b.accountingCloser = acc
pseudosettleService := pseudosettle.New(p2ps, logger, stateStore, acc, big.NewInt(refreshRate), p2ps)
if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil {
......@@ -527,7 +531,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
acc.SetRefreshFunc(pseudosettleService.Pay)
if o.SwapEnable {
swapService, err = InitSwap(
var priceOracle priceoracle.Service
swapService, priceOracle, err = InitSwap(
p2ps,
logger,
stateStore,
......@@ -537,10 +542,14 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
chequeStore,
cashoutService,
acc,
o.PriceOracleAddress,
chainID,
transactionService,
)
if err != nil {
return nil, err
}
b.priceOracleCloser = priceOracle
acc.SetPayFunc(swapService.Pay)
}
......@@ -771,7 +780,7 @@ func (b *Bee) Shutdown(ctx context.Context) error {
b.recoveryHandleCleanup()
}
var wg sync.WaitGroup
wg.Add(4)
wg.Add(5)
go func() {
defer wg.Done()
tryClose(b.pssCloser, "pss")
......@@ -784,6 +793,10 @@ func (b *Bee) Shutdown(ctx context.Context) error {
defer wg.Done()
tryClose(b.pullerCloser, "puller")
}()
go func() {
defer wg.Done()
tryClose(b.accountingCloser, "accounting")
}()
b.p2pCancel()
go func() {
......@@ -794,6 +807,7 @@ func (b *Bee) Shutdown(ctx context.Context) error {
wg.Wait()
tryClose(b.p2pService, "p2p server")
tryClose(b.priceOracleCloser, "price oracle service")
wg.Add(3)
go func() {
......
......@@ -136,6 +136,7 @@ func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Head
defer close(record.done)
// pass a new context to handler,
streamIn.responseHeaders = streamOut.headers
// do not cancel it with the client stream context
err := handler(context.Background(), p2p.Peer{Address: r.base, FullNode: r.fullNode}, streamIn)
if err != nil && err != io.EOF {
......
......@@ -36,11 +36,6 @@ type Interface interface {
AnnouncePaymentThreshold(ctx context.Context, peer swarm.Address, paymentThreshold *big.Int) error
}
// PriceTableObserver is used for being notified of price table updates
type PriceTableObserver interface {
NotifyPriceTable(peer swarm.Address, priceTable []uint64) error
}
// PaymentThresholdObserver is used for being notified of payment threshold updates
type PaymentThresholdObserver interface {
NotifyPaymentThreshold(peer swarm.Address, paymentThreshold *big.Int) error
......
......@@ -17,6 +17,8 @@ var (
peerChequebookPrefix = "swap_peer_chequebook_"
beneficiaryPeerPrefix = "swap_beneficiary_peer_"
peerBeneficiaryPrefix = "swap_peer_beneficiary_"
deductedForPeerPrefix = "swap_deducted_for_peer_"
deductedByPeerPrefix = "swap_deducted_by_peer_"
)
// Addressbook maps peers to beneficaries, chequebooks and in reverse.
......@@ -33,6 +35,14 @@ type Addressbook interface {
PutBeneficiary(peer swarm.Address, beneficiary common.Address) error
// PutChequebook stores the chequebook for the given peer.
PutChequebook(peer swarm.Address, chequebook common.Address) error
// AddDeductionFor peer stores the flag indicating the peer have already issued a cheque that has been deducted
AddDeductionFor(peer swarm.Address) error
// AddDeductionFor peer stores the flag indicating the peer have already received a cheque that has been deducted
AddDeductionBy(peer swarm.Address) error
// GetDeductionFor returns whether a peer have already issued a cheque that has been deducted
GetDeductionFor(peer swarm.Address) (bool, error)
// GetDeductionBy returns whether a peer have already received a cheque that has been deducted
GetDeductionBy(peer swarm.Address) (bool, error)
}
type addressbook struct {
......@@ -112,6 +122,38 @@ func (a *addressbook) PutChequebook(peer swarm.Address, chequebook common.Addres
return a.store.Put(chequebookPeerKey(chequebook), peer)
}
func (a *addressbook) AddDeductionFor(peer swarm.Address) error {
return a.store.Put(peerDeductedForKey(peer), struct{}{})
}
func (a *addressbook) AddDeductionBy(peer swarm.Address) error {
return a.store.Put(peerDeductedByKey(peer), struct{}{})
}
func (a *addressbook) GetDeductionFor(peer swarm.Address) (bool, error) {
var nothing struct{}
err := a.store.Get(peerDeductedForKey(peer), &nothing)
if err != nil {
if err != storage.ErrNotFound {
return false, err
}
return false, nil
}
return true, nil
}
func (a *addressbook) GetDeductionBy(peer swarm.Address) (bool, error) {
var nothing struct{}
err := a.store.Get(peerDeductedByKey(peer), &nothing)
if err != nil {
if err != storage.ErrNotFound {
return false, err
}
return false, nil
}
return true, nil
}
// peerKey computes the key where to store the chequebook from a peer.
func peerKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", peerPrefix, peer)
......@@ -131,3 +173,11 @@ func peerBeneficiaryKey(peer swarm.Address) string {
func beneficiaryPeerKey(peer common.Address) string {
return fmt.Sprintf("%s%s", beneficiaryPeerPrefix, peer)
}
func peerDeductedByKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", deductedByPeerPrefix, peer.String())
}
func peerDeductedForKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", deductedForPeerPrefix, peer.String())
}
......@@ -18,6 +18,11 @@ import (
"github.com/ethersphere/bee/pkg/transaction"
)
const (
// prefix for the persistence key
lastReceivedChequePrefix = "swap_chequebook_last_received_cheque_"
)
var (
// ErrNoCheque is the error returned if there is no prior cheque for a chequebook or beneficiary.
ErrNoCheque = errors.New("no cheque")
......@@ -28,14 +33,15 @@ var (
// ErrWrongBeneficiary is the error returned if the cheque has the wrong beneficiary.
ErrWrongBeneficiary = errors.New("wrong beneficiary")
// ErrBouncingCheque is the error returned if the chequebook is demonstrably illiquid.
ErrBouncingCheque = errors.New("bouncing cheque")
lastReceivedChequePrefix = "swap_chequebook_last_received_cheque_"
ErrBouncingCheque = errors.New("bouncing cheque")
// ErrChequeValueTooLow is the error returned if the after deduction value of a cheque did not cover 1 accounting credit
ErrChequeValueTooLow = errors.New("cheque value lower than acceptable")
)
// ChequeStore handles the verification and storage of received cheques
type ChequeStore interface {
// ReceiveCheque verifies and stores a cheque. It returns the total amount earned.
ReceiveCheque(ctx context.Context, cheque *SignedCheque) (*big.Int, error)
ReceiveCheque(ctx context.Context, cheque *SignedCheque, exchangeRate *big.Int, deduction *big.Int) (*big.Int, error)
// LastCheque returns the last cheque we received from a specific chequebook.
LastCheque(chequebook common.Address) (*SignedCheque, error)
// LastCheques returns the last received cheques from every known chequebook.
......@@ -92,7 +98,7 @@ func (s *chequeStore) LastCheque(chequebook common.Address) (*SignedCheque, erro
}
// ReceiveCheque verifies and stores a cheque. It returns the totam amount earned.
func (s *chequeStore) ReceiveCheque(ctx context.Context, cheque *SignedCheque) (*big.Int, error) {
func (s *chequeStore) ReceiveCheque(ctx context.Context, cheque *SignedCheque, exchangeRate *big.Int, deduction *big.Int) (*big.Int, error) {
// verify we are the beneficiary
if cheque.Beneficiary != s.beneficiary {
return nil, ErrWrongBeneficiary
......@@ -130,6 +136,12 @@ func (s *chequeStore) ReceiveCheque(ctx context.Context, cheque *SignedCheque) (
return nil, ErrChequeNotIncreasing
}
deducedAmount := new(big.Int).Sub(amount, deduction)
if deducedAmount.Cmp(exchangeRate) < 0 {
return nil, ErrChequeValueTooLow
}
// blockchain calls below
contract := newChequebookContract(cheque.Chequebook, s.transactionService)
......
......@@ -20,11 +20,13 @@ func TestReceiveCheque(t *testing.T) {
store := storemock.NewStateStore()
beneficiary := common.HexToAddress("0xffff")
issuer := common.HexToAddress("0xbeee")
cumulativePayout := big.NewInt(10)
cumulativePayout2 := big.NewInt(20)
cumulativePayout := big.NewInt(101)
cumulativePayout2 := big.NewInt(201)
chequebookAddress := common.HexToAddress("0xeeee")
sig := make([]byte, 65)
chainID := int64(1)
exchangeRate := big.NewInt(10)
deduction := big.NewInt(1)
cheque := &chequebook.SignedCheque{
Cheque: chequebook.Cheque{
......@@ -71,7 +73,7 @@ func TestReceiveCheque(t *testing.T) {
return issuer, nil
})
received, err := chequestore.ReceiveCheque(context.Background(), cheque)
received, err := chequestore.ReceiveCheque(context.Background(), cheque, exchangeRate, deduction)
if err != nil {
t.Fatal(err)
}
......@@ -103,7 +105,7 @@ func TestReceiveCheque(t *testing.T) {
}
verifiedWithFactory = false
received, err = chequestore.ReceiveCheque(context.Background(), cheque)
received, err = chequestore.ReceiveCheque(context.Background(), cheque, exchangeRate, deduction)
if err != nil {
t.Fatal(err)
}
......@@ -145,7 +147,7 @@ func TestReceiveChequeInvalidBeneficiary(t *testing.T) {
nil,
)
_, err := chequestore.ReceiveCheque(context.Background(), cheque)
_, err := chequestore.ReceiveCheque(context.Background(), cheque, cumulativePayout, big.NewInt(0))
if err == nil {
t.Fatal("accepted cheque with wrong beneficiary")
}
......@@ -191,7 +193,7 @@ func TestReceiveChequeInvalidAmount(t *testing.T) {
Chequebook: chequebookAddress,
},
Signature: sig,
})
}, cumulativePayout, big.NewInt(0))
if err != nil {
t.Fatal(err)
}
......@@ -203,7 +205,7 @@ func TestReceiveChequeInvalidAmount(t *testing.T) {
Chequebook: chequebookAddress,
},
Signature: sig,
})
}, cumulativePayout, big.NewInt(0))
if err == nil {
t.Fatal("accepted lower amount cheque")
}
......@@ -247,7 +249,7 @@ func TestReceiveChequeInvalidChequebook(t *testing.T) {
Chequebook: chequebookAddress,
},
Signature: sig,
})
}, cumulativePayout, big.NewInt(0))
if !errors.Is(err, chequebook.ErrNotDeployedByFactory) {
t.Fatalf("wrong error. wanted %v, got %v", chequebook.ErrNotDeployedByFactory, err)
}
......@@ -287,7 +289,7 @@ func TestReceiveChequeInvalidSignature(t *testing.T) {
Chequebook: chequebookAddress,
},
Signature: sig,
})
}, cumulativePayout, big.NewInt(0))
if !errors.Is(err, chequebook.ErrChequeInvalid) {
t.Fatalf("wrong error. wanted %v, got %v", chequebook.ErrChequeInvalid, err)
}
......@@ -329,7 +331,7 @@ func TestReceiveChequeInsufficientBalance(t *testing.T) {
Chequebook: chequebookAddress,
},
Signature: sig,
})
}, cumulativePayout, big.NewInt(0))
if !errors.Is(err, chequebook.ErrBouncingCheque) {
t.Fatalf("wrong error. wanted %v, got %v", chequebook.ErrBouncingCheque, err)
}
......@@ -371,8 +373,127 @@ func TestReceiveChequeSufficientBalancePaidOut(t *testing.T) {
Chequebook: chequebookAddress,
},
Signature: sig,
})
}, cumulativePayout, big.NewInt(0))
if err != nil {
t.Fatal(err)
}
}
func TestReceiveChequeNotEnoughValue(t *testing.T) {
store := storemock.NewStateStore()
beneficiary := common.HexToAddress("0xffff")
issuer := common.HexToAddress("0xbeee")
cumulativePayout := big.NewInt(100)
chequebookAddress := common.HexToAddress("0xeeee")
sig := make([]byte, 65)
chainID := int64(1)
exchangeRate := big.NewInt(101)
deduction := big.NewInt(0)
cheque := &chequebook.SignedCheque{
Cheque: chequebook.Cheque{
Beneficiary: beneficiary,
CumulativePayout: cumulativePayout,
Chequebook: chequebookAddress,
},
Signature: sig,
}
factory := &factoryMock{
verifyChequebook: func(ctx context.Context, address common.Address) error {
if address != chequebookAddress {
t.Fatal("verifying wrong chequebook")
}
return nil
},
}
chequestore := chequebook.NewChequeStore(
store,
factory,
chainID,
beneficiary,
transactionmock.New(
transactionmock.WithABICallSequence(
transactionmock.ABICall(&chequebookABI, chequebookAddress, issuer.Hash().Bytes(), "issuer"),
transactionmock.ABICall(&chequebookABI, chequebookAddress, cumulativePayout.FillBytes(make([]byte, 32)), "balance"),
transactionmock.ABICall(&chequebookABI, chequebookAddress, big.NewInt(0).FillBytes(make([]byte, 32)), "paidOut", beneficiary),
),
),
func(c *chequebook.SignedCheque, cid int64) (common.Address, error) {
if cid != chainID {
t.Fatalf("recovery with wrong chain id. wanted %d, got %d", chainID, cid)
}
if !cheque.Equal(c) {
t.Fatalf("recovery with wrong cheque. wanted %v, got %v", cheque, c)
}
return issuer, nil
})
_, err := chequestore.ReceiveCheque(context.Background(), cheque, exchangeRate, deduction)
if !errors.Is(err, chequebook.ErrChequeValueTooLow) {
t.Fatalf("got wrong error. wanted %v, got %v", chequebook.ErrChequeValueTooLow, err)
}
}
func TestReceiveChequeNotEnoughValueAfterDeduction(t *testing.T) {
store := storemock.NewStateStore()
beneficiary := common.HexToAddress("0xffff")
issuer := common.HexToAddress("0xbeee")
cumulativePayout := big.NewInt(100)
chequebookAddress := common.HexToAddress("0xeeee")
sig := make([]byte, 65)
chainID := int64(1)
// cheque needs to cover initial deduction (if applicable) plus one times the exchange rate
// in order to amount to at least 1 accounting credit and be accepted
// in this test cheque amount is just not enough to cover that therefore we expect
exchangeRate := big.NewInt(100)
deduction := big.NewInt(1)
cheque := &chequebook.SignedCheque{
Cheque: chequebook.Cheque{
Beneficiary: beneficiary,
CumulativePayout: cumulativePayout,
Chequebook: chequebookAddress,
},
Signature: sig,
}
factory := &factoryMock{
verifyChequebook: func(ctx context.Context, address common.Address) error {
if address != chequebookAddress {
t.Fatal("verifying wrong chequebook")
}
return nil
},
}
chequestore := chequebook.NewChequeStore(
store,
factory,
chainID,
beneficiary,
transactionmock.New(
transactionmock.WithABICallSequence(
transactionmock.ABICall(&chequebookABI, chequebookAddress, issuer.Hash().Bytes(), "issuer"),
transactionmock.ABICall(&chequebookABI, chequebookAddress, cumulativePayout.FillBytes(make([]byte, 32)), "balance"),
transactionmock.ABICall(&chequebookABI, chequebookAddress, big.NewInt(0).FillBytes(make([]byte, 32)), "paidOut", beneficiary),
),
),
func(c *chequebook.SignedCheque, cid int64) (common.Address, error) {
if cid != chainID {
t.Fatalf("recovery with wrong chain id. wanted %d, got %d", chainID, cid)
}
if !cheque.Equal(c) {
t.Fatalf("recovery with wrong cheque. wanted %v, got %v", cheque, c)
}
return issuer, nil
})
_, err := chequestore.ReceiveCheque(context.Background(), cheque, exchangeRate, deduction)
if !errors.Is(err, chequebook.ErrChequeValueTooLow) {
t.Fatalf("got wrong error. wanted %v, got %v", chequebook.ErrChequeValueTooLow, err)
}
}
......@@ -21,6 +21,8 @@ type Service struct {
chequebookIssueFunc func(ctx context.Context, beneficiary common.Address, amount *big.Int, sendChequeFunc chequebook.SendChequeFunc) (*big.Int, error)
chequebookWithdrawFunc func(ctx context.Context, amount *big.Int) (hash common.Hash, err error)
chequebookDepositFunc func(ctx context.Context, amount *big.Int) (hash common.Hash, err error)
lastChequeFunc func(common.Address) (*chequebook.SignedCheque, error)
lastChequesFunc func() (map[common.Address]*chequebook.SignedCheque, error)
}
// WithChequebook*Functions set the mock chequebook functions
......@@ -60,6 +62,18 @@ func WithChequebookWithdrawFunc(f func(ctx context.Context, amount *big.Int) (ha
})
}
func WithLastChequeFunc(f func(beneficiary common.Address) (*chequebook.SignedCheque, error)) Option {
return optionFunc(func(s *Service) {
s.lastChequeFunc = f
})
}
func WithLastChequesFunc(f func() (map[common.Address]*chequebook.SignedCheque, error)) Option {
return optionFunc(func(s *Service) {
s.lastChequesFunc = f
})
}
// NewChequebook creates the mock chequebook implementation
func NewChequebook(opts ...Option) chequebook.Service {
mock := new(Service)
......@@ -113,10 +127,16 @@ func (s *Service) Issue(ctx context.Context, beneficiary common.Address, amount
}
func (s *Service) LastCheque(beneficiary common.Address) (*chequebook.SignedCheque, error) {
if s.lastChequeFunc != nil {
return s.lastChequeFunc(beneficiary)
}
return nil, errors.New("Error")
}
func (s *Service) LastCheques() (map[common.Address]*chequebook.SignedCheque, error) {
if s.lastChequesFunc != nil {
return s.lastChequesFunc()
}
return nil, errors.New("Error")
}
......
......@@ -14,12 +14,12 @@ import (
// Service is the mock chequeStore service.
type Service struct {
receiveCheque func(ctx context.Context, cheque *chequebook.SignedCheque) (*big.Int, error)
receiveCheque func(ctx context.Context, cheque *chequebook.SignedCheque, exchangeRate *big.Int, deduction *big.Int) (*big.Int, error)
lastCheque func(chequebook common.Address) (*chequebook.SignedCheque, error)
lastCheques func() (map[common.Address]*chequebook.SignedCheque, error)
}
func WithRetrieveChequeFunc(f func(ctx context.Context, cheque *chequebook.SignedCheque) (*big.Int, error)) Option {
func WithReceiveChequeFunc(f func(ctx context.Context, cheque *chequebook.SignedCheque, exchangeRate *big.Int, deduction *big.Int) (*big.Int, error)) Option {
return optionFunc(func(s *Service) {
s.receiveCheque = f
})
......@@ -46,8 +46,8 @@ func NewChequeStore(opts ...Option) chequebook.ChequeStore {
return mock
}
func (s *Service) ReceiveCheque(ctx context.Context, cheque *chequebook.SignedCheque) (*big.Int, error) {
return s.receiveCheque(ctx, cheque)
func (s *Service) ReceiveCheque(ctx context.Context, cheque *chequebook.SignedCheque, exchangeRate *big.Int, deduction *big.Int) (*big.Int, error) {
return s.receiveCheque(ctx, cheque, exchangeRate, deduction)
}
func (s *Service) LastCheque(chequebook common.Address) (*chequebook.SignedCheque, error) {
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swap
const (
ExchangeRateFieldName = exchangeRateFieldName
DeductionFieldName = deductionFieldName
)
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swap
import (
"errors"
"math/big"
"github.com/ethersphere/bee/pkg/p2p"
)
const (
exchangeRateFieldName = "exchange"
deductionFieldName = "deduction"
)
var (
// ErrFieldLength denotes p2p.Header having malformed field length in bytes
ErrFieldLength = errors.New("field length error")
// ErrNoExchangeHeader denotes p2p.Header lacking specified field
ErrNoExchangeHeader = errors.New("no exchange header")
// ErrNoDeductionHeader denotes p2p.Header lacking specified field
ErrNoDeductionHeader = errors.New("no deduction header")
)
func MakeSettlementHeaders(exchangeRate, deduction *big.Int) p2p.Headers {
return p2p.Headers{
exchangeRateFieldName: exchangeRate.Bytes(),
deductionFieldName: deduction.Bytes(),
}
}
func ParseSettlementResponseHeaders(receivedHeaders p2p.Headers) (exchange *big.Int, deduction *big.Int, err error) {
exchangeRate, err := ParseExchangeHeader(receivedHeaders)
if err != nil {
return nil, nil, err
}
deduction, err = ParseDeductionHeader(receivedHeaders)
if err != nil {
return exchangeRate, nil, err
}
return exchangeRate, deduction, nil
}
func ParseExchangeHeader(receivedHeaders p2p.Headers) (*big.Int, error) {
if receivedHeaders[exchangeRateFieldName] == nil {
return nil, ErrNoExchangeHeader
}
exchange := new(big.Int).SetBytes(receivedHeaders[exchangeRateFieldName])
return exchange, nil
}
func ParseDeductionHeader(receivedHeaders p2p.Headers) (*big.Int, error) {
if receivedHeaders[deductionFieldName] == nil {
return nil, ErrNoDeductionHeader
}
deduced := new(big.Int).SetBytes(receivedHeaders[deductionFieldName])
return deduced, nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swap_test
import (
"math/big"
"reflect"
"testing"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/settlement/swap/headers"
)
func TestParseSettlementResponseHeaders(t *testing.T) {
headers := p2p.Headers{
swap.ExchangeRateFieldName: []byte{10},
swap.DeductionFieldName: []byte{20},
}
exchange, deduction, err := swap.ParseSettlementResponseHeaders(headers)
if err != nil {
t.Fatal(err)
}
if exchange.Cmp(big.NewInt(10)) != 0 {
t.Fatalf("Exchange rate mismatch, got %v, want %v", exchange, 10)
}
if deduction.Cmp(big.NewInt(20)) != 0 {
t.Fatalf("Deduction mismatch, got %v, want %v", deduction, 20)
}
}
func TestMakeSettlementHeaders(t *testing.T) {
makeHeaders := swap.MakeSettlementHeaders(big.NewInt(906000), big.NewInt(5348))
expectedHeaders := p2p.Headers{
swap.ExchangeRateFieldName: []byte{13, 211, 16},
swap.DeductionFieldName: []byte{20, 228},
}
if !reflect.DeepEqual(makeHeaders, expectedHeaders) {
t.Fatalf("Made headers not as expected, got %+v, want %+v", makeHeaders, expectedHeaders)
}
}
func TestParseExchangeHeader(t *testing.T) {
toReadHeaders := p2p.Headers{
swap.ExchangeRateFieldName: []byte{13, 211, 16},
}
parsedExchange, err := swap.ParseExchangeHeader(toReadHeaders)
if err != nil {
t.Fatal(err)
}
if parsedExchange.Cmp(big.NewInt(906000)) != 0 {
t.Fatalf("Allowance mismatch, got %v, want %v", parsedExchange, big.NewInt(906000))
}
}
func TestParseDeductionHeader(t *testing.T) {
toReadHeaders := p2p.Headers{
swap.DeductionFieldName: []byte{20, 228},
}
parsedDeduction, err := swap.ParseDeductionHeader(toReadHeaders)
if err != nil {
t.Fatal(err)
}
if parsedDeduction.Cmp(big.NewInt(5348)) != 0 {
t.Fatalf("Allowance mismatch, got %v, want %v", parsedDeduction, big.NewInt(5348))
}
}
......@@ -12,6 +12,7 @@ import (
"github.com/ethersphere/bee/pkg/settlement/swap"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -25,7 +26,10 @@ type Service struct {
settlementsSentFunc func() (map[string]*big.Int, error)
settlementsRecvFunc func() (map[string]*big.Int, error)
receiveChequeFunc func(context.Context, swarm.Address, *chequebook.SignedCheque) error
deductionByPeers map[string]struct{}
deductionForPeers map[string]struct{}
receiveChequeFunc func(context.Context, swarm.Address, *chequebook.SignedCheque, *big.Int, *big.Int) error
payFunc func(context.Context, swarm.Address, *big.Int)
handshakeFunc func(swarm.Address, common.Address) error
lastSentChequeFunc func(swarm.Address) (*chequebook.SignedCheque, error)
......@@ -64,7 +68,7 @@ func WithSettlementsRecvFunc(f func() (map[string]*big.Int, error)) Option {
})
}
func WithReceiveChequeFunc(f func(context.Context, swarm.Address, *chequebook.SignedCheque) error) Option {
func WithReceiveChequeFunc(f func(context.Context, swarm.Address, *chequebook.SignedCheque, *big.Int, *big.Int) error) Option {
return optionFunc(func(s *Service) {
s.receiveChequeFunc = f
})
......@@ -129,12 +133,17 @@ func New(opts ...Option) swap.Interface {
return mock
}
// ReceiveCheque is the mock ReceiveCheque function of swap.
func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) (err error) {
if s.receiveChequeFunc != nil {
return s.receiveChequeFunc(ctx, peer, cheque)
func NewSwap(opts ...Option) swapprotocol.Swap {
mock := new(Service)
mock.settlementsSent = make(map[string]*big.Int)
mock.settlementsRecv = make(map[string]*big.Int)
mock.deductionByPeers = make(map[string]struct{})
mock.deductionForPeers = make(map[string]struct{})
for _, o := range opts {
o.apply(mock)
}
return nil
return mock
}
// Pay is the mock Pay function of swap.
......@@ -238,6 +247,38 @@ func (s *Service) CashoutStatus(ctx context.Context, peer swarm.Address) (*chequ
return nil, nil
}
func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque, exchangeRate *big.Int, deduction *big.Int) (err error) {
defer func() {
if err == nil {
s.deductionForPeers[peer.String()] = struct{}{}
}
}()
if s.receiveChequeFunc != nil {
return s.receiveChequeFunc(ctx, peer, cheque, exchangeRate, deduction)
}
return nil
}
func (s *Service) GetDeductionForPeer(peer swarm.Address) (bool, error) {
if _, ok := s.deductionForPeers[peer.String()]; ok {
return true, nil
}
return false, nil
}
func (s *Service) GetDeductionByPeer(peer swarm.Address) (bool, error) {
if _, ok := s.deductionByPeers[peer.String()]; ok {
return true, nil
}
return false, nil
}
func (s *Service) AddDeductionByPeer(peer swarm.Address) error {
s.deductionByPeers[peer.String()] = struct{}{}
return nil
}
// Option is the option passed to the mock settlement service
type Option interface {
apply(*Service)
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/common"
)
type Service struct {
rate *big.Int
deduct *big.Int
}
func New(rate *big.Int, deduct *big.Int) Service {
return Service{
rate: rate,
deduct: deduct,
}
}
func (s Service) Start() {
}
func (s Service) GetPrice(ctx context.Context) (*big.Int, *big.Int, error) {
return s.rate, s.deduct, nil
}
func (s Service) CurrentRates() (exchangeRate *big.Int, deduction *big.Int, err error) {
return s.rate, s.deduct, nil
}
func (s Service) Close() error {
return nil
}
func DiscoverPriceOracleAddress(chainID int64) (priceOracleAddress common.Address, found bool) {
return common.Address{}, false
}
func (s Service) SetValues(rate *big.Int, deduct *big.Int) {
s.rate = rate
s.deduct = deduct
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package priceoracle
import (
"context"
"errors"
"io"
"math/big"
"time"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/transaction"
"github.com/ethersphere/go-price-oracle-abi/priceoracleabi"
)
var (
errDecodeABI = errors.New("could not decode abi data")
goerliContractAddress = common.HexToAddress("0x0c9de531dcb38b758fe8a2c163444a5e54ee0db2")
)
type service struct {
logger logging.Logger
priceOracleAddress common.Address
transactionService transaction.Service
exchangeRate *big.Int
deduction *big.Int
timeDivisor int64
quitC chan struct{}
}
type Service interface {
io.Closer
// CurrentRates returns the current value of exchange rate and deduction
// according to the latest information from oracle
CurrentRates() (exchangeRate *big.Int, deduction *big.Int, err error)
// GetPrice retrieves latest available information from oracle
GetPrice(ctx context.Context) (*big.Int, *big.Int, error)
Start()
}
var (
priceOracleABI = transaction.ParseABIUnchecked(priceoracleabi.PriceOracleABIv0_1_0)
)
func New(logger logging.Logger, priceOracleAddress common.Address, transactionService transaction.Service, timeDivisor int64) Service {
return &service{
logger: logger,
priceOracleAddress: priceOracleAddress,
transactionService: transactionService,
exchangeRate: big.NewInt(0),
deduction: nil,
quitC: make(chan struct{}),
timeDivisor: timeDivisor,
}
}
func (s *service) Start() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
<-s.quitC
}()
go func() {
defer cancel()
for {
exchangeRate, deduction, err := s.GetPrice(ctx)
if err != nil {
s.logger.Errorf("could not get price: %v", err)
} else {
s.logger.Tracef("updated exchange rate to %d and deduction to %d", exchangeRate, deduction)
s.exchangeRate = exchangeRate
s.deduction = deduction
}
ts := time.Now().Unix()
// We poll the oracle in every timestamp divisible by constant 300 (timeDivisor)
// in order to get latest version approximately at the same time on all nodes
// and to minimize polling frequency
// If the node gets newer information than what was applicable at last polling point at startup
// this minimizes the negative scenario to less than 5 minutes
// during which cheques can not be sent / accepted because of the asymmetric information
timeUntilNextPoll := time.Duration(s.timeDivisor-ts%s.timeDivisor) * time.Second
select {
case <-s.quitC:
return
case <-time.After(timeUntilNextPoll):
}
}
}()
}
func (s *service) GetPrice(ctx context.Context) (*big.Int, *big.Int, error) {
callData, err := priceOracleABI.Pack("getPrice")
if err != nil {
return nil, nil, err
}
result, err := s.transactionService.Call(ctx, &transaction.TxRequest{
To: &s.priceOracleAddress,
Data: callData,
})
if err != nil {
return nil, nil, err
}
results, err := priceOracleABI.Unpack("getPrice", result)
if err != nil {
return nil, nil, err
}
if len(results) != 2 {
return nil, nil, errDecodeABI
}
exchangeRate, ok := abi.ConvertType(results[0], new(big.Int)).(*big.Int)
if !ok || exchangeRate == nil {
return nil, nil, errDecodeABI
}
deduction, ok := abi.ConvertType(results[1], new(big.Int)).(*big.Int)
if !ok || deduction == nil {
return nil, nil, errDecodeABI
}
return exchangeRate, deduction, nil
}
func (s *service) CurrentRates() (exchangeRate *big.Int, deduction *big.Int, err error) {
if s.exchangeRate.Cmp(big.NewInt(0)) == 0 {
return nil, nil, errors.New("exchange rate not yet available")
}
if s.deduction == nil {
return nil, nil, errors.New("deduction amount not yet available")
}
return s.exchangeRate, s.deduction, nil
}
func (s *service) Close() error {
close(s.quitC)
return nil
}
// DiscoverPriceOracleAddress returns the canonical price oracle for this chainID
func DiscoverPriceOracleAddress(chainID int64) (priceOracleAddress common.Address, found bool) {
if chainID == 5 {
// goerli
return goerliContractAddress, true
}
return common.Address{}, false
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package priceoracle_test
import (
"context"
"io/ioutil"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/settlement/swap/priceoracle"
"github.com/ethersphere/bee/pkg/transaction"
transactionmock "github.com/ethersphere/bee/pkg/transaction/mock"
"github.com/ethersphere/go-price-oracle-abi/priceoracleabi"
)
var (
priceOracleABI = transaction.ParseABIUnchecked(priceoracleabi.PriceOracleABIv0_1_0)
)
func TestExchangeGetPrice(t *testing.T) {
priceOracleAddress := common.HexToAddress("0xabcd")
expectedPrice := big.NewInt(100)
expectedDeduce := big.NewInt(200)
result := make([]byte, 64)
expectedPrice.FillBytes(result[0:32])
expectedDeduce.FillBytes(result[32:64])
ex := priceoracle.New(
logging.New(ioutil.Discard, 0),
priceOracleAddress,
transactionmock.New(
transactionmock.WithABICall(
&priceOracleABI,
priceOracleAddress,
result,
"getPrice",
),
),
1,
)
price, deduce, err := ex.GetPrice(context.Background())
if err != nil {
t.Fatal(err)
}
if expectedPrice.Cmp(price) != 0 {
t.Fatalf("got wrong price. wanted %d, got %d", expectedPrice, price)
}
if expectedDeduce.Cmp(deduce) != 0 {
t.Fatalf("got wrong deduce. wanted %d, got %d", expectedDeduce, deduce)
}
}
......@@ -28,6 +28,8 @@ var (
ErrWrongBeneficiary = errors.New("wrong beneficiary")
// ErrUnknownBeneficary is the error if a peer has never announced a beneficiary.
ErrUnknownBeneficary = errors.New("unknown beneficiary for peer")
// ErrChequeValueTooLow is the error a peer issued a cheque not covering 1 accounting credit
ErrChequeValueTooLow = errors.New("cheque value too low")
)
type Interface interface {
......@@ -79,7 +81,7 @@ func New(proto swapprotocol.Interface, logger logging.Logger, store storage.Stat
}
// ReceiveCheque is called by the swap protocol if a cheque is received.
func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) (err error) {
func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque, exchangeRate *big.Int, deduction *big.Int) (err error) {
// check this is the same chequebook for this peer as previously
expectedChequebook, known, err := s.addressbook.Chequebook(peer)
if err != nil {
......@@ -89,12 +91,22 @@ func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque
return ErrWrongChequebook
}
amount, err := s.chequeStore.ReceiveCheque(ctx, cheque)
receivedAmount, err := s.chequeStore.ReceiveCheque(ctx, cheque, exchangeRate, deduction)
if err != nil {
s.metrics.ChequesRejected.Inc()
return fmt.Errorf("rejecting cheque: %w", err)
}
if deduction.Cmp(big.NewInt(0)) > 0 {
err = s.addressbook.AddDeductionFor(peer)
if err != nil {
return err
}
}
decreasedAmount := new(big.Int).Sub(receivedAmount, deduction)
amount := new(big.Int).Div(decreasedAmount, exchangeRate)
if !known {
err = s.addressbook.PutChequebook(peer, cheque.Chequebook)
if err != nil {
......@@ -102,7 +114,7 @@ func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque
}
}
tot, _ := big.NewFloat(0).SetInt(amount).Float64()
tot, _ := big.NewFloat(0).SetInt(receivedAmount).Float64()
s.metrics.TotalReceived.Add(tot)
s.metrics.ChequesReceived.Inc()
......@@ -130,12 +142,13 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int)
err = ErrUnknownBeneficary
return
}
balance, err := s.chequebook.Issue(ctx, beneficiary, amount, func(signedCheque *chequebook.SignedCheque) error {
return s.proto.EmitCheque(ctx, peer, signedCheque)
})
balance, err := s.proto.EmitCheque(ctx, peer, beneficiary, amount, s.chequebook.Issue)
if err != nil {
return
}
bal, _ := big.NewFloat(0).SetInt(balance).Float64()
s.metrics.AvailableBalance.Set(bal)
s.accounting.NotifyPaymentSent(peer, amount, nil)
......@@ -347,3 +360,15 @@ func (s *Service) CashoutStatus(ctx context.Context, peer swarm.Address) (*chequ
}
return s.cashout.CashoutStatus(ctx, chequebookAddress)
}
func (s *Service) GetDeductionForPeer(peer swarm.Address) (bool, error) {
return s.addressbook.GetDeductionFor(peer)
}
func (s *Service) GetDeductionByPeer(peer swarm.Address) (bool, error) {
return s.addressbook.GetDeductionBy(peer)
}
func (s *Service) AddDeductionByPeer(peer swarm.Address) error {
return s.addressbook.AddDeductionBy(peer)
}
......@@ -20,19 +20,20 @@ import (
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
mockchequebook "github.com/ethersphere/bee/pkg/settlement/swap/chequebook/mock"
mockchequestore "github.com/ethersphere/bee/pkg/settlement/swap/chequestore/mock"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol"
mockstore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
type swapProtocolMock struct {
emitCheque func(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error
emitCheque func(context.Context, swarm.Address, common.Address, *big.Int, swapprotocol.IssueFunc) (*big.Int, error)
}
func (m *swapProtocolMock) EmitCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error {
func (m *swapProtocolMock) EmitCheque(ctx context.Context, peer swarm.Address, beneficiary common.Address, value *big.Int, issueFunc swapprotocol.IssueFunc) (*big.Int, error) {
if m.emitCheque != nil {
return m.emitCheque(ctx, peer, cheque)
return m.emitCheque(ctx, peer, beneficiary, value, issueFunc)
}
return nil
return nil, errors.New("not implemented")
}
type testObserver struct {
......@@ -89,6 +90,10 @@ type addressbookMock struct {
chequebookPeer func(chequebook common.Address) (peer swarm.Address, known bool, err error)
putBeneficiary func(peer swarm.Address, beneficiary common.Address) error
putChequebook func(peer swarm.Address, chequebook common.Address) error
addDeductionFor func(peer swarm.Address) error
addDeductionBy func(peer swarm.Address) error
getDeductionFor func(peer swarm.Address) (bool, error)
getDeductionBy func(peer swarm.Address) (bool, error)
}
func (m *addressbookMock) Beneficiary(peer swarm.Address) (beneficiary common.Address, known bool, err error) {
......@@ -109,6 +114,18 @@ func (m *addressbookMock) PutBeneficiary(peer swarm.Address, beneficiary common.
func (m *addressbookMock) PutChequebook(peer swarm.Address, chequebook common.Address) error {
return m.putChequebook(peer, chequebook)
}
func (m *addressbookMock) AddDeductionFor(peer swarm.Address) error {
return m.addDeductionFor(peer)
}
func (m *addressbookMock) AddDeductionBy(peer swarm.Address) error {
return m.addDeductionBy(peer)
}
func (m *addressbookMock) GetDeductionFor(peer swarm.Address) (bool, error) {
return m.getDeductionFor(peer)
}
func (m *addressbookMock) GetDeductionBy(peer swarm.Address) (bool, error) {
return m.getDeductionBy(peer)
}
type cashoutMock struct {
cashCheque func(ctx context.Context, chequebook common.Address, recipient common.Address) (common.Hash, error)
......@@ -127,6 +144,8 @@ func TestReceiveCheque(t *testing.T) {
store := mockstore.NewStateStore()
chequebookService := mockchequebook.NewChequebook()
amount := big.NewInt(50)
exchangeRate := big.NewInt(10)
deduction := big.NewInt(10)
chequebookAddress := common.HexToAddress("0xcd")
peer := swarm.MustParseHexAddress("abcd")
......@@ -140,13 +159,22 @@ func TestReceiveCheque(t *testing.T) {
}
chequeStore := mockchequestore.NewChequeStore(
mockchequestore.WithRetrieveChequeFunc(func(ctx context.Context, c *chequebook.SignedCheque) (*big.Int, error) {
mockchequestore.WithReceiveChequeFunc(func(ctx context.Context, c *chequebook.SignedCheque, e *big.Int, d *big.Int) (*big.Int, error) {
if !cheque.Equal(c) {
t.Fatalf("passed wrong cheque to store. wanted %v, got %v", cheque, c)
}
if exchangeRate.Cmp(e) != 0 {
t.Fatalf("passed wrong exchange rate to store. wanted %v, got %v", exchangeRate, e)
}
if deduction.Cmp(e) != 0 {
t.Fatalf("passed wrong deduction to store. wanted %v, got %v", deduction, d)
}
return amount, nil
}),
)
peerDeductionFor := false
networkID := uint64(1)
addressbook := &addressbookMock{
chequebook: func(p swarm.Address) (common.Address, bool, error) {
......@@ -164,6 +192,13 @@ func TestReceiveCheque(t *testing.T) {
}
return nil
},
addDeductionFor: func(p swarm.Address) error {
peerDeductionFor = true
if !peer.Equal(p) {
t.Fatal("storing deduction for wrong peer")
}
return nil
},
}
observer := newTestObserver()
......@@ -181,15 +216,17 @@ func TestReceiveCheque(t *testing.T) {
observer,
)
err := swap.ReceiveCheque(context.Background(), peer, cheque)
err := swap.ReceiveCheque(context.Background(), peer, cheque, exchangeRate, deduction)
if err != nil {
t.Fatal(err)
}
expectedAmount := big.NewInt(4)
select {
case call := <-observer.receivedCalled:
if call.amount.Cmp(amount) != 0 {
t.Fatalf("observer called with wrong amount. got %d, want %d", call.amount, amount)
if call.amount.Cmp(expectedAmount) != 0 {
t.Fatalf("observer called with wrong amount. got %d, want %d", call.amount, expectedAmount)
}
if !call.peer.Equal(peer) {
......@@ -200,6 +237,10 @@ func TestReceiveCheque(t *testing.T) {
t.Fatal("expected observer to be called")
}
if !peerDeductionFor {
t.Fatal("add deduction for peer not called")
}
}
func TestReceiveChequeReject(t *testing.T) {
......@@ -207,6 +248,8 @@ func TestReceiveChequeReject(t *testing.T) {
store := mockstore.NewStateStore()
chequebookService := mockchequebook.NewChequebook()
chequebookAddress := common.HexToAddress("0xcd")
exchangeRate := big.NewInt(10)
deduction := big.NewInt(10)
peer := swarm.MustParseHexAddress("abcd")
cheque := &chequebook.SignedCheque{
......@@ -221,7 +264,7 @@ func TestReceiveChequeReject(t *testing.T) {
var errReject = errors.New("reject")
chequeStore := mockchequestore.NewChequeStore(
mockchequestore.WithRetrieveChequeFunc(func(ctx context.Context, c *chequebook.SignedCheque) (*big.Int, error) {
mockchequestore.WithReceiveChequeFunc(func(ctx context.Context, c *chequebook.SignedCheque, e *big.Int, d *big.Int) (*big.Int, error) {
return nil, errReject
}),
)
......@@ -247,7 +290,7 @@ func TestReceiveChequeReject(t *testing.T) {
observer,
)
err := swap.ReceiveCheque(context.Background(), peer, cheque)
err := swap.ReceiveCheque(context.Background(), peer, cheque, exchangeRate, deduction)
if err == nil {
t.Fatal("accepted invalid cheque")
}
......@@ -268,6 +311,8 @@ func TestReceiveChequeWrongChequebook(t *testing.T) {
store := mockstore.NewStateStore()
chequebookService := mockchequebook.NewChequebook()
chequebookAddress := common.HexToAddress("0xcd")
exchangeRate := big.NewInt(10)
deduction := big.NewInt(10)
peer := swarm.MustParseHexAddress("abcd")
cheque := &chequebook.SignedCheque{
......@@ -301,7 +346,7 @@ func TestReceiveChequeWrongChequebook(t *testing.T) {
observer,
)
err := swapService.ReceiveCheque(context.Background(), peer, cheque)
err := swapService.ReceiveCheque(context.Background(), peer, cheque, exchangeRate, deduction)
if err == nil {
t.Fatal("accepted invalid cheque")
}
......@@ -323,22 +368,7 @@ func TestPay(t *testing.T) {
amount := big.NewInt(50)
beneficiary := common.HexToAddress("0xcd")
var cheque chequebook.SignedCheque
peer := swarm.MustParseHexAddress("abcd")
var chequebookCalled bool
chequebookService := mockchequebook.NewChequebook(
mockchequebook.WithChequebookIssueFunc(func(ctx context.Context, b common.Address, a *big.Int, sendChequeFunc chequebook.SendChequeFunc) (*big.Int, error) {
if b != beneficiary {
t.Fatalf("issuing cheque for wrong beneficiary. wanted %v, got %v", beneficiary, b)
}
if a.Cmp(amount) != 0 {
t.Fatalf("issuing cheque with wrong amount. wanted %d, got %d", amount, a)
}
chequebookCalled = true
return big.NewInt(0), sendChequeFunc(&cheque)
}),
)
networkID := uint64(1)
addressbook := &addressbookMock{
......@@ -355,20 +385,23 @@ func TestPay(t *testing.T) {
var emitCalled bool
swap := swap.New(
&swapProtocolMock{
emitCheque: func(ctx context.Context, p swarm.Address, c *chequebook.SignedCheque) error {
emitCheque: func(ctx context.Context, p swarm.Address, b common.Address, a *big.Int, issueFunc swapprotocol.IssueFunc) (*big.Int, error) {
if !peer.Equal(p) {
t.Fatal("sending to wrong peer")
}
if !cheque.Equal(c) {
t.Fatal("sending wrong cheque")
if b != beneficiary {
t.Fatal("issuing for wrong beneficiary")
}
if amount.Cmp(a) != 0 {
t.Fatal("issuing with wrong amount")
}
emitCalled = true
return nil
return amount, nil
},
},
logger,
store,
chequebookService,
mockchequebook.NewChequebook(),
mockchequestore.NewChequeStore(),
addressbook,
networkID,
......@@ -379,10 +412,6 @@ func TestPay(t *testing.T) {
swap.Pay(context.Background(), peer, amount)
if !chequebookCalled {
t.Fatal("chequebook was not called")
}
if !emitCalled {
t.Fatal("swap protocol was not called")
}
......@@ -397,12 +426,6 @@ func TestPayIssueError(t *testing.T) {
peer := swarm.MustParseHexAddress("abcd")
errReject := errors.New("reject")
chequebookService := mockchequebook.NewChequebook(
mockchequebook.WithChequebookIssueFunc(func(ctx context.Context, b common.Address, a *big.Int, sendChequeFunc chequebook.SendChequeFunc) (*big.Int, error) {
return big.NewInt(0), errReject
}),
)
networkID := uint64(1)
addressbook := &addressbookMock{
beneficiary: func(p swarm.Address) (common.Address, bool, error) {
......@@ -414,10 +437,14 @@ func TestPayIssueError(t *testing.T) {
}
swap := swap.New(
&swapProtocolMock{},
&swapProtocolMock{
emitCheque: func(c context.Context, a1 swarm.Address, a2 common.Address, i *big.Int, issueFunc swapprotocol.IssueFunc) (*big.Int, error) {
return nil, errReject
},
},
logger,
store,
chequebookService,
mockchequebook.NewChequebook(),
mockchequestore.NewChequeStore(),
addressbook,
networkID,
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swapprotocol
import (
"context"
"github.com/ethersphere/bee/pkg/p2p"
)
func (s *Service) Init(ctx context.Context, p p2p.Peer) error {
return s.init(ctx, p)
}
func (s *Service) Handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
return s.handler(ctx, p, stream)
}
......@@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
......@@ -16,6 +17,8 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
swap "github.com/ethersphere/bee/pkg/settlement/swap/headers"
"github.com/ethersphere/bee/pkg/settlement/swap/priceoracle"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol/pb"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -27,18 +30,33 @@ const (
initStreamName = "init" // stream for handshake
)
var (
ErrNegotiateRate = errors.New("exchange rates mismatch")
ErrNegotiateDeduction = errors.New("deduction values mismatch")
ErrHaveDeduction = errors.New("received deduction not zero")
)
type SendChequeFunc chequebook.SendChequeFunc
type IssueFunc func(ctx context.Context, beneficiary common.Address, amount *big.Int, sendChequeFunc chequebook.SendChequeFunc) (*big.Int, error)
// (context.Context, common.Address, *big.Int, chequebook.SendChequeFunc) (*big.Int, error)
// Interface is the main interface to send messages over swap protocol.
type Interface interface {
// EmitCheque sends a signed cheque to a peer.
EmitCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error
EmitCheque(ctx context.Context, peer swarm.Address, beneficiary common.Address, amount *big.Int, issue IssueFunc) (balance *big.Int, err error)
}
// Swap is the interface the settlement layer should implement to receive cheques.
type Swap interface {
// ReceiveCheque is called by the swap protocol if a cheque is received.
ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error
ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque, exchangeRate *big.Int, deduction *big.Int) error
// Handshake is called by the swap protocol when a handshake is received.
Handshake(peer swarm.Address, beneficiary common.Address) error
GetDeductionForPeer(peer swarm.Address) (bool, error)
GetDeductionByPeer(peer swarm.Address) (bool, error)
AddDeductionByPeer(peer swarm.Address) error
}
// Service is the main implementation of the swap protocol.
......@@ -46,15 +64,17 @@ type Service struct {
streamer p2p.Streamer
logger logging.Logger
swap Swap
priceOracle priceoracle.Service
beneficiary common.Address
}
// New creates a new swap protocol Service.
func New(streamer p2p.Streamer, logger logging.Logger, beneficiary common.Address) *Service {
func New(streamer p2p.Streamer, logger logging.Logger, beneficiary common.Address, priceOracle priceoracle.Service) *Service {
return &Service{
streamer: streamer,
logger: logger,
beneficiary: beneficiary,
priceOracle: priceOracle,
}
}
......@@ -71,6 +91,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
{
Name: streamName,
Handler: s.handler,
Headler: s.headler,
},
{
Name: initStreamName,
......@@ -159,28 +180,59 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
_ = stream.FullClose()
}
}()
var req pb.EmitCheque
if err := r.ReadMsgWithContext(ctx, &req); err != nil {
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
responseHeaders := stream.ResponseHeaders()
exchangeRate, deduction, err := swap.ParseSettlementResponseHeaders(responseHeaders)
if err != nil {
if !errors.Is(err, swap.ErrNoDeductionHeader) {
return err
}
deduction = big.NewInt(0)
}
var signedCheque *chequebook.SignedCheque
err = json.Unmarshal(req.Cheque, &signedCheque)
if err != nil {
return err
}
return s.swap.ReceiveCheque(ctx, p.Address, signedCheque)
// signature validation
return s.swap.ReceiveCheque(ctx, p.Address, signedCheque, exchangeRate, deduction)
}
// EmitCheque sends a signed cheque to a peer.
func (s *Service) EmitCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error {
func (s *Service) headler(receivedHeaders p2p.Headers, peerAddress swarm.Address) (returnHeaders p2p.Headers) {
exchangeRate, deduction, err := s.priceOracle.CurrentRates()
if err != nil {
return p2p.Headers{}
}
checkPeer, err := s.swap.GetDeductionForPeer(peerAddress)
if err != nil {
return p2p.Headers{}
}
if checkPeer {
deduction = big.NewInt(0)
}
returnHeaders = swap.MakeSettlementHeaders(exchangeRate, deduction)
return
}
// InitiateCheque attempts to send a cheque to a peer.
func (s *Service) EmitCheque(ctx context.Context, peer swarm.Address, beneficiary common.Address, amount *big.Int, issue IssueFunc) (balance *big.Int, err error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return err
return nil, err
}
defer func() {
if err != nil {
......@@ -190,16 +242,76 @@ func (s *Service) EmitCheque(ctx context.Context, peer swarm.Address, cheque *ch
}
}()
// for simplicity we use json marshaller. can be replaced by a binary encoding in the future.
encodedCheque, err := json.Marshal(cheque)
// reading exchangeRated headers
returnedHeaders := stream.Headers()
exchangeRate, deduction, err := swap.ParseSettlementResponseHeaders(returnedHeaders)
if err != nil {
return err
if !errors.Is(err, swap.ErrNoDeductionHeader) {
return nil, err
}
deduction = big.NewInt(0)
}
// comparing received headers to known truth
// get whether peer have deducted in the past
checkPeer, err := s.swap.GetDeductionByPeer(peer)
if err != nil {
return nil, err
}
s.logger.Tracef("sending cheque message to peer %v (%v)", peer, cheque)
// if peer is not entitled for deduction but sent non zero deduction value, return with error
if checkPeer && deduction.Cmp(big.NewInt(0)) != 0 {
return nil, ErrHaveDeduction
}
// get current global exchangeRate rate and deduction
checkExchangeRate, checkDeduction, err := s.priceOracle.CurrentRates()
if err != nil {
return nil, err
}
// exchangeRate rates should match
if exchangeRate.Cmp(checkExchangeRate) != 0 {
return nil, ErrNegotiateRate
}
// deduction values should match or be zero
if deduction.Cmp(checkDeduction) != 0 && deduction.Cmp(big.NewInt(0)) != 0 {
return nil, ErrNegotiateDeduction
}
paymentAmount := new(big.Int).Mul(amount, exchangeRate)
sentAmount := new(big.Int).Add(paymentAmount, deduction)
// issue cheque call with provided callback for sending cheque to finish transaction
balance, err = issue(ctx, beneficiary, sentAmount, func(cheque *chequebook.SignedCheque) error {
// for simplicity we use json marshaller. can be replaced by a binary encoding in the future.
encodedCheque, err := json.Marshal(cheque)
if err != nil {
return err
}
// sending cheque
s.logger.Tracef("sending cheque message to peer %v (%v)", peer, cheque)
w := protobuf.NewWriter(stream)
return w.WriteMsgWithContext(ctx, &pb.EmitCheque{
Cheque: encodedCheque,
})
w := protobuf.NewWriter(stream)
return w.WriteMsgWithContext(ctx, &pb.EmitCheque{
Cheque: encodedCheque,
})
if err != nil {
return nil, err
}
if deduction.Cmp(big.NewInt(0)) != 0 {
err = s.swap.AddDeductionByPeer(peer)
if err != nil {
return nil, err
}
}
return balance, nil
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment