Commit 4b91ba20 authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

announce payment threshold through pricing protocol (#585)

parent d45fb176
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/settlement" "github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -47,20 +48,9 @@ type Interface interface { ...@@ -47,20 +48,9 @@ type Interface interface {
// accountingPeer holds all in-memory accounting information for one peer. // accountingPeer holds all in-memory accounting information for one peer.
type accountingPeer struct { type accountingPeer struct {
// Lock to be held during any accounting action for this peer. lock sync.Mutex // lock to be held during any accounting action for this peer
lock sync.Mutex reservedBalance uint64 // amount currently reserved for active peer interaction
// Amount currently reserved for active peer interaction paymentThreshold uint64 // the threshold at which the peer expects us to pay
reservedBalance uint64
}
// Options are options provided to Accounting.
type Options struct {
PaymentThreshold uint64
PaymentTolerance uint64
EarlyPayment uint64
Logger logging.Logger
Store storage.StateStorer
Settlement settlement.Interface
} }
// Accounting is the main implementation of the accounting interface. // Accounting is the main implementation of the accounting interface.
...@@ -77,6 +67,7 @@ type Accounting struct { ...@@ -77,6 +67,7 @@ type Accounting struct {
paymentTolerance uint64 paymentTolerance uint64
earlyPayment uint64 earlyPayment uint64
settlement settlement.Interface settlement settlement.Interface
pricing pricing.Interface
metrics metrics metrics metrics
} }
...@@ -95,23 +86,32 @@ var ( ...@@ -95,23 +86,32 @@ var (
) )
// NewAccounting creates a new Accounting instance with the provided options. // NewAccounting creates a new Accounting instance with the provided options.
func NewAccounting(o Options) (*Accounting, error) { func NewAccounting(
if o.PaymentTolerance+o.PaymentThreshold > math.MaxInt64 { PaymentThreshold uint64,
PaymentTolerance uint64,
EarlyPayment uint64,
Logger logging.Logger,
Store storage.StateStorer,
Settlement settlement.Interface,
Pricing pricing.Interface,
) (*Accounting, error) {
if PaymentTolerance+PaymentThreshold > math.MaxInt64 {
return nil, fmt.Errorf("tolerance plus threshold too big: %w", ErrOverflow) return nil, fmt.Errorf("tolerance plus threshold too big: %w", ErrOverflow)
} }
if o.PaymentTolerance > o.PaymentThreshold/2 { if PaymentTolerance > PaymentThreshold/2 {
return nil, ErrInvalidPaymentTolerance return nil, ErrInvalidPaymentTolerance
} }
return &Accounting{ return &Accounting{
accountingPeers: make(map[string]*accountingPeer), accountingPeers: make(map[string]*accountingPeer),
paymentThreshold: o.PaymentThreshold, paymentThreshold: PaymentThreshold,
paymentTolerance: o.PaymentTolerance, paymentTolerance: PaymentTolerance,
earlyPayment: o.EarlyPayment, earlyPayment: EarlyPayment,
logger: o.Logger, logger: Logger,
store: o.Store, store: Store,
settlement: o.Settlement, settlement: Settlement,
pricing: Pricing,
metrics: newMetrics(), metrics: newMetrics(),
}, nil }, nil
} }
...@@ -233,7 +233,7 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error { ...@@ -233,7 +233,7 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
// If our expected debt is less than earlyPayment away from our payment threshold (which we assume is // If our expected debt is less than earlyPayment away from our payment threshold (which we assume is
// also the peers payment threshold), trigger settlement. // also the peers payment threshold), trigger settlement.
// we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold // we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold
threshold := a.paymentThreshold threshold := accountingPeer.paymentThreshold
if threshold > a.earlyPayment { if threshold > a.earlyPayment {
threshold -= a.earlyPayment threshold -= a.earlyPayment
} else { } else {
...@@ -370,6 +370,8 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, err ...@@ -370,6 +370,8 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, err
if !ok { if !ok {
peerData = &accountingPeer{ peerData = &accountingPeer{
reservedBalance: 0, reservedBalance: 0,
// initially assume the peer has the same threshold as us
paymentThreshold: a.paymentThreshold,
} }
a.accountingPeers[peer.String()] = peerData a.accountingPeers[peer.String()] = peerData
} }
...@@ -506,3 +508,17 @@ func addI64pU64(a int64, b uint64) (result int64, err error) { ...@@ -506,3 +508,17 @@ func addI64pU64(a int64, b uint64) (result int64, err error) {
return result, nil return result, nil
} }
// NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold
func (a *Accounting) NotifyPaymentThreshold(peer swarm.Address, paymentThreshold uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
accountingPeer.paymentThreshold = paymentThreshold
return nil
}
...@@ -40,11 +40,7 @@ func TestAccountingAddBalance(t *testing.T) { ...@@ -40,11 +40,7 @@ func TestAccountingAddBalance(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThreshold, 1000, 1000, logger, store, nil, nil)
PaymentThreshold: testPaymentThreshold,
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -105,11 +101,7 @@ func TestAccountingAdd_persistentBalances(t *testing.T) { ...@@ -105,11 +101,7 @@ func TestAccountingAdd_persistentBalances(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThreshold, 1000, 1000, logger, store, nil, nil)
PaymentThreshold: testPaymentThreshold,
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -136,10 +128,7 @@ func TestAccountingAdd_persistentBalances(t *testing.T) { ...@@ -136,10 +128,7 @@ func TestAccountingAdd_persistentBalances(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
acc, err = accounting.NewAccounting(accounting.Options{ acc, err = accounting.NewAccounting(testPaymentThreshold, 1000, 1000, logger, store, nil, nil)
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -170,11 +159,7 @@ func TestAccountingReserve(t *testing.T) { ...@@ -170,11 +159,7 @@ func TestAccountingReserve(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThreshold, 1000, 1000, logger, store, nil, nil)
PaymentThreshold: testPaymentThreshold,
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -208,12 +193,7 @@ func TestAccountingOverflowReserve(t *testing.T) { ...@@ -208,12 +193,7 @@ func TestAccountingOverflowReserve(t *testing.T) {
settlement := &settlementMock{} settlement := &settlementMock{}
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThresholdLarge, 0, 0, logger, store, settlement, nil)
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
Settlement: settlement,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -263,12 +243,7 @@ func TestAccountingOverflowNotifyPayment(t *testing.T) { ...@@ -263,12 +243,7 @@ func TestAccountingOverflowNotifyPayment(t *testing.T) {
settlement := &settlementMock{} settlement := &settlementMock{}
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThresholdLarge, 0, 0, logger, store, settlement, nil)
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
Settlement: settlement,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -299,12 +274,7 @@ func TestAccountingOverflowDebit(t *testing.T) { ...@@ -299,12 +274,7 @@ func TestAccountingOverflowDebit(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThresholdLarge, 0, 0, logger, store, nil, nil)
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -345,12 +315,7 @@ func TestAccountingOverflowCredit(t *testing.T) { ...@@ -345,12 +315,7 @@ func TestAccountingOverflowCredit(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThresholdLarge, 0, 0, logger, store, nil, nil)
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -396,12 +361,7 @@ func TestAccountingDisconnect(t *testing.T) { ...@@ -396,12 +361,7 @@ func TestAccountingDisconnect(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThreshold, 1000, 1000, logger, store, nil, nil)
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentTolerance,
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -468,13 +428,7 @@ func TestAccountingCallSettlement(t *testing.T) { ...@@ -468,13 +428,7 @@ func TestAccountingCallSettlement(t *testing.T) {
settlement := &settlementMock{} settlement := &settlementMock{}
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThreshold, 1000, 1000, logger, store, settlement, nil)
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentTolerance,
Logger: logger,
Store: store,
Settlement: settlement,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -550,14 +504,7 @@ func TestAccountingCallSettlementEarly(t *testing.T) { ...@@ -550,14 +504,7 @@ func TestAccountingCallSettlementEarly(t *testing.T) {
settlement := &settlementMock{} settlement := &settlementMock{}
earlyPayment := uint64(1000) earlyPayment := uint64(1000)
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThreshold, testPaymentTolerance, earlyPayment, logger, store, settlement, nil)
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentTolerance,
EarlyPayment: earlyPayment,
Logger: logger,
Store: store,
Settlement: settlement,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -605,12 +552,7 @@ func TestAccountingNotifyPayment(t *testing.T) { ...@@ -605,12 +552,7 @@ func TestAccountingNotifyPayment(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{ acc, err := accounting.NewAccounting(testPaymentThreshold, testPaymentTolerance, 1000, logger, store, nil, nil)
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentTolerance,
Logger: logger,
Store: store,
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -648,12 +590,7 @@ func TestAccountingInvalidPaymentTolerance(t *testing.T) { ...@@ -648,12 +590,7 @@ func TestAccountingInvalidPaymentTolerance(t *testing.T) {
store := mock.NewStateStore() store := mock.NewStateStore()
defer store.Close() defer store.Close()
_, err := accounting.NewAccounting(accounting.Options{ _, err := accounting.NewAccounting(testPaymentThreshold, testPaymentThreshold/2+1, 1000, logger, store, nil, nil)
PaymentThreshold: testPaymentThreshold,
PaymentTolerance: testPaymentThreshold/2 + 1,
Logger: logger,
Store: store,
})
if err == nil { if err == nil {
t.Fatal("expected error") t.Fatal("expected error")
} }
...@@ -662,3 +599,93 @@ func TestAccountingInvalidPaymentTolerance(t *testing.T) { ...@@ -662,3 +599,93 @@ func TestAccountingInvalidPaymentTolerance(t *testing.T) {
t.Fatalf("got wrong error. got %v wanted %v", err, accounting.ErrInvalidPaymentTolerance) t.Fatalf("got wrong error. got %v wanted %v", err, accounting.ErrInvalidPaymentTolerance)
} }
} }
type pricingMock struct {
called bool
peer swarm.Address
paymentThreshold uint64
}
func (p *pricingMock) AnnouncePaymentThreshold(ctx context.Context, peer swarm.Address, paymentThreshold uint64) error {
p.called = true
p.peer = peer
p.paymentThreshold = paymentThreshold
return nil
}
func TestAccountingConnected(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
pricing := &pricingMock{}
_, err := accounting.NewAccounting(testPaymentThreshold, 1000, 1000, logger, store, nil, pricing)
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
err = pricing.AnnouncePaymentThreshold(context.Background(), peer1Addr, testPaymentThreshold)
if err != nil {
t.Fatal(err)
}
if !pricing.called {
t.Fatal("expected pricing to be called")
}
if !pricing.peer.Equal(peer1Addr) {
t.Fatalf("paid to wrong peer. got %v wanted %v", pricing.peer, peer1Addr)
}
if pricing.paymentThreshold != testPaymentThreshold {
t.Fatalf("paid wrong amount. got %d wanted %d", pricing.paymentThreshold, testPaymentThreshold)
}
}
func TestAccountingNotifyPaymentThreshold(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
pricing := &pricingMock{}
settlement := &settlementMock{}
acc, err := accounting.NewAccounting(testPaymentThreshold, 1000, 0, logger, store, settlement, pricing)
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
lowerThreshold := uint64(100)
err = acc.NotifyPaymentThreshold(peer1Addr, lowerThreshold)
if err != nil {
t.Fatal(err)
}
err = acc.Reserve(peer1Addr, lowerThreshold)
if err != nil {
t.Fatal(err)
}
err = acc.Credit(peer1Addr, lowerThreshold)
if err != nil {
t.Fatal(err)
}
if settlement.paidAmount != lowerThreshold {
t.Fatalf("settled wrong amount. wanted %d, got %d", lowerThreshold, settlement.paidAmount)
}
}
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/pkg/netstore" "github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/puller" "github.com/ethersphere/bee/pkg/puller"
"github.com/ethersphere/bee/pkg/pullsync" "github.com/ethersphere/bee/pkg/pullsync"
...@@ -290,19 +291,18 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -290,19 +291,18 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
settlement = pseudosettleService settlement = pseudosettleService
} }
acc, err := accounting.NewAccounting(accounting.Options{ pricing := pricing.New(p2ps, logger, o.PaymentThreshold)
Logger: logger, if err = p2ps.AddProtocol(pricing.Protocol()); err != nil {
Store: stateStore, return nil, fmt.Errorf("pricing service: %w", err)
PaymentThreshold: o.PaymentThreshold, }
PaymentTolerance: o.PaymentTolerance,
EarlyPayment: o.PaymentEarly, acc, err := accounting.NewAccounting(o.PaymentThreshold, o.PaymentTolerance, o.PaymentEarly, logger, stateStore, settlement, pricing)
Settlement: settlement,
})
if err != nil { if err != nil {
return nil, fmt.Errorf("accounting: %w", err) return nil, fmt.Errorf("accounting: %w", err)
} }
settlement.SetPaymentObserver(acc) settlement.SetPaymentObserver(acc)
pricing.SetPaymentThresholdObserver(acc)
kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, logger, kademlia.Options{Bootnodes: bootnodes, Standalone: o.Standalone}) kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, logger, kademlia.Options{Bootnodes: bootnodes, Standalone: o.Standalone})
b.topologyCloser = kad b.topologyCloser = kad
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. pricing.proto"
package pb
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pricing.proto
package pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type AnnouncePaymentThreshold struct {
PaymentThreshold uint64 `protobuf:"varint,1,opt,name=PaymentThreshold,proto3" json:"PaymentThreshold,omitempty"`
}
func (m *AnnouncePaymentThreshold) Reset() { *m = AnnouncePaymentThreshold{} }
func (m *AnnouncePaymentThreshold) String() string { return proto.CompactTextString(m) }
func (*AnnouncePaymentThreshold) ProtoMessage() {}
func (*AnnouncePaymentThreshold) Descriptor() ([]byte, []int) {
return fileDescriptor_ec4cc93d045d43d0, []int{0}
}
func (m *AnnouncePaymentThreshold) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AnnouncePaymentThreshold) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AnnouncePaymentThreshold.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *AnnouncePaymentThreshold) XXX_Merge(src proto.Message) {
xxx_messageInfo_AnnouncePaymentThreshold.Merge(m, src)
}
func (m *AnnouncePaymentThreshold) XXX_Size() int {
return m.Size()
}
func (m *AnnouncePaymentThreshold) XXX_DiscardUnknown() {
xxx_messageInfo_AnnouncePaymentThreshold.DiscardUnknown(m)
}
var xxx_messageInfo_AnnouncePaymentThreshold proto.InternalMessageInfo
func (m *AnnouncePaymentThreshold) GetPaymentThreshold() uint64 {
if m != nil {
return m.PaymentThreshold
}
return 0
}
func init() {
proto.RegisterType((*AnnouncePaymentThreshold)(nil), "pricing.AnnouncePaymentThreshold")
}
func init() { proto.RegisterFile("pricing.proto", fileDescriptor_ec4cc93d045d43d0) }
var fileDescriptor_ec4cc93d045d43d0 = []byte{
// 122 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0x28, 0xca, 0x4c,
0xce, 0xcc, 0x4b, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x87, 0x72, 0x95, 0xdc, 0xb8,
0x24, 0x1c, 0xf3, 0xf2, 0xf2, 0x4b, 0xf3, 0x92, 0x53, 0x03, 0x12, 0x2b, 0x73, 0x53, 0xf3, 0x4a,
0x42, 0x32, 0x8a, 0x52, 0x8b, 0x33, 0xf2, 0x73, 0x52, 0x84, 0xb4, 0xb8, 0x04, 0xd0, 0xc5, 0x24,
0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x30, 0xc4, 0x9d, 0x64, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0,
0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8,
0xf1, 0x58, 0x8e, 0x21, 0x8a, 0xa9, 0x20, 0x29, 0x89, 0x0d, 0x6c, 0xab, 0x31, 0x20, 0x00, 0x00,
0xff, 0xff, 0x70, 0x59, 0x58, 0xcf, 0x86, 0x00, 0x00, 0x00,
}
func (m *AnnouncePaymentThreshold) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AnnouncePaymentThreshold) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *AnnouncePaymentThreshold) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.PaymentThreshold != 0 {
i = encodeVarintPricing(dAtA, i, uint64(m.PaymentThreshold))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintPricing(dAtA []byte, offset int, v uint64) int {
offset -= sovPricing(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *AnnouncePaymentThreshold) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.PaymentThreshold != 0 {
n += 1 + sovPricing(uint64(m.PaymentThreshold))
}
return n
}
func sovPricing(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozPricing(x uint64) (n int) {
return sovPricing(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *AnnouncePaymentThreshold) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPricing
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AnnouncePaymentThreshold: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AnnouncePaymentThreshold: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PaymentThreshold", wireType)
}
m.PaymentThreshold = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPricing
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.PaymentThreshold |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipPricing(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPricing
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPricing
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPricing(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPricing
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPricing
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPricing
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthPricing
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupPricing
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthPricing
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthPricing = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPricing = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupPricing = fmt.Errorf("proto: unexpected end of group")
)
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
syntax = "proto3";
package pricing;
option go_package = "pb";
message AnnouncePaymentThreshold {
uint64 PaymentThreshold = 1;
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pricing
import (
"context"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pricing/pb"
"github.com/ethersphere/bee/pkg/swarm"
)
const (
protocolName = "pricing"
protocolVersion = "1.0.0"
streamName = "pricing"
)
var _ Interface = (*Service)(nil)
// Interface is the main interface of the pricing protocol
type Interface interface {
AnnouncePaymentThreshold(ctx context.Context, peer swarm.Address, paymentThreshold uint64) error
}
// PaymentThresholdObserver is used for being notified of payment threshold updates
type PaymentThresholdObserver interface {
NotifyPaymentThreshold(peer swarm.Address, paymentThreshold uint64) error
}
type Service struct {
streamer p2p.Streamer
logger logging.Logger
paymentThreshold uint64
paymentThresholdObserver PaymentThresholdObserver
}
func New(streamer p2p.Streamer, logger logging.Logger, paymentThreshold uint64) *Service {
return &Service{
streamer: streamer,
logger: logger,
paymentThreshold: paymentThreshold,
}
}
func (s *Service) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Handler: s.handler,
},
},
ConnectIn: s.init,
ConnectOut: s.init,
}
}
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
r := protobuf.NewReader(stream)
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
s.logger.Infof("receiving payment threshold announcement from peer %v", p.Address)
var req pb.AnnouncePaymentThreshold
if err := r.ReadMsg(&req); err != nil {
s.logger.Debugf("error receiving payment threshold announcement from peer %v", p.Address)
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
s.logger.Infof("received payment threshold announcement from peer %v of %d", p.Address, req.PaymentThreshold)
return s.paymentThresholdObserver.NotifyPaymentThreshold(p.Address, req.PaymentThreshold)
}
func (s *Service) init(ctx context.Context, p p2p.Peer) error {
s.logger.Infof("sending payment threshold announcement to peer %v", p.Address)
err := s.AnnouncePaymentThreshold(ctx, p.Address, s.paymentThreshold)
if err != nil {
s.logger.Warningf("error sending payment threshold announcement to peer %v", p.Address)
}
return err
}
// AnnouncePaymentThreshold announces the payment threshold to per
func (s *Service) AnnouncePaymentThreshold(ctx context.Context, peer swarm.Address, paymentThreshold uint64) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = stream.Reset()
} else {
go stream.FullClose()
}
}()
s.logger.Tracef("sending payment threshold announcement to peer %v of %d", peer, paymentThreshold)
w := protobuf.NewWriter(stream)
err = w.WriteMsgWithContext(ctx, &pb.AnnouncePaymentThreshold{
PaymentThreshold: paymentThreshold,
})
return err
}
// SetPaymentThresholdObserver sets the PaymentThresholdObserver to be used when receiving a new payment threshold
func (s *Service) SetPaymentThresholdObserver(observer PaymentThresholdObserver) {
s.paymentThresholdObserver = observer
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pricing_test
import (
"bytes"
"context"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/pricing/pb"
"github.com/ethersphere/bee/pkg/swarm"
)
type testObserver struct {
called bool
peer swarm.Address
paymentThreshold uint64
}
func (t *testObserver) NotifyPaymentThreshold(peer swarm.Address, paymentThreshold uint64) error {
t.called = true
t.peer = peer
t.paymentThreshold = paymentThreshold
return nil
}
func TestAnnouncePaymentThreshold(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
testThreshold := uint64(100000)
observer := &testObserver{}
recipient := pricing.New(nil, logger, testThreshold)
recipient.SetPaymentThresholdObserver(observer)
recorder := streamtest.New(
streamtest.WithProtocols(recipient.Protocol()),
)
payer := pricing.New(recorder, logger, testThreshold)
peerID := swarm.MustParseHexAddress("9ee7add7")
paymentThreshold := uint64(10000)
err := payer.AnnouncePaymentThreshold(context.Background(), peerID, paymentThreshold)
if err != nil {
t.Fatal(err)
}
records, err := recorder.Records(peerID, "pricing", "1.0.0", "pricing")
if err != nil {
t.Fatal(err)
}
if l := len(records); l != 1 {
t.Fatalf("got %v records, want %v", l, 1)
}
record := records[0]
messages, err := protobuf.ReadMessages(
bytes.NewReader(record.In()),
func() protobuf.Message { return new(pb.AnnouncePaymentThreshold) },
)
if err != nil {
t.Fatal(err)
}
if len(messages) != 1 {
t.Fatalf("got %v messages, want %v", len(messages), 1)
}
sentPaymentThreshold := messages[0].(*pb.AnnouncePaymentThreshold).PaymentThreshold
if sentPaymentThreshold != paymentThreshold {
t.Fatalf("got message with amount %v, want %v", sentPaymentThreshold, paymentThreshold)
}
if !observer.called {
t.Fatal("expected observer to be called")
}
if observer.paymentThreshold != paymentThreshold {
t.Fatalf("observer called with wrong paymentThreshold. got %d, want %d", observer.paymentThreshold, paymentThreshold)
}
if !observer.peer.Equal(peerID) {
t.Fatalf("observer called with wrong peer. got %v, want %v", observer.peer, peerID)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment