Commit d60f4ab3 authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

settlement persistence and debug api (#601)

parent f539b9bd
...@@ -248,6 +248,22 @@ func (s *settlementMock) Pay(ctx context.Context, peer swarm.Address, amount uin ...@@ -248,6 +248,22 @@ func (s *settlementMock) Pay(ctx context.Context, peer swarm.Address, amount uin
return nil return nil
} }
func (s *settlementMock) TotalSent(peer swarm.Address) (totalSent uint64, err error) {
return 0, nil
}
func (s *settlementMock) TotalReceived(peer swarm.Address) (totalReceived uint64, err error) {
return 0, nil
}
func (s *settlementMock) SettlementsSent() (SettlementSent map[string]uint64, err error) {
return nil, nil
}
func (s *settlementMock) SettlementsReceived() (SettlementReceived map[string]uint64, err error) {
return nil, nil
}
// TestAccountingCallSettlement tests that settlement is called correctly if the payment threshold is hit // TestAccountingCallSettlement tests that settlement is called correctly if the payment threshold is hit
func TestAccountingCallSettlement(t *testing.T) { func TestAccountingCallSettlement(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
...@@ -34,12 +35,12 @@ type server struct { ...@@ -34,12 +35,12 @@ type server struct {
Tracer *tracing.Tracer Tracer *tracing.Tracer
Tags *tags.Tags Tags *tags.Tags
Accounting accounting.Interface Accounting accounting.Interface
Settlement settlement.Interface
http.Handler http.Handler
metricsRegistry *prometheus.Registry metricsRegistry *prometheus.Registry
} }
func New(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, storer storage.Storer, logger logging.Logger, tracer *tracing.Tracer, tags *tags.Tags, accounting accounting.Interface) Service { func New(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, storer storage.Storer, logger logging.Logger, tracer *tracing.Tracer, tags *tags.Tags, accounting accounting.Interface, settlement settlement.Interface) Service {
s := &server{ s := &server{
Overlay: overlay, Overlay: overlay,
P2P: p2p, P2P: p2p,
...@@ -50,6 +51,7 @@ func New(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interfac ...@@ -50,6 +51,7 @@ func New(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interfac
Tracer: tracer, Tracer: tracer,
Tags: tags, Tags: tags,
Accounting: accounting, Accounting: accounting,
Settlement: settlement,
metricsRegistry: newMetricsRegistry(), metricsRegistry: newMetricsRegistry(),
} }
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock" p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/resolver" "github.com/ethersphere/bee/pkg/resolver"
settlementmock "github.com/ethersphere/bee/pkg/settlement/pseudosettle/mock"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
...@@ -34,6 +35,7 @@ type testServerOptions struct { ...@@ -34,6 +35,7 @@ type testServerOptions struct {
TopologyOpts []topologymock.Option TopologyOpts []topologymock.Option
Tags *tags.Tags Tags *tags.Tags
AccountingOpts []accountingmock.Option AccountingOpts []accountingmock.Option
SettlementOpts []settlementmock.Option
} }
type testServer struct { type testServer struct {
...@@ -44,8 +46,9 @@ type testServer struct { ...@@ -44,8 +46,9 @@ type testServer struct {
func newTestServer(t *testing.T, o testServerOptions) *testServer { func newTestServer(t *testing.T, o testServerOptions) *testServer {
topologyDriver := topologymock.NewTopologyDriver(o.TopologyOpts...) topologyDriver := topologymock.NewTopologyDriver(o.TopologyOpts...)
acc := accountingmock.NewAccounting(o.AccountingOpts...) acc := accountingmock.NewAccounting(o.AccountingOpts...)
settlement := settlementmock.NewSettlement(o.SettlementOpts...)
s := debugapi.New(o.Overlay, o.P2P, o.Pingpong, topologyDriver, o.Storer, logging.New(ioutil.Discard, 0), nil, o.Tags, acc) s := debugapi.New(o.Overlay, o.P2P, o.Pingpong, topologyDriver, o.Storer, logging.New(ioutil.Discard, 0), nil, o.Tags, acc, settlement)
ts := httptest.NewServer(s) ts := httptest.NewServer(s)
t.Cleanup(ts.Close) t.Cleanup(ts.Close)
......
...@@ -14,10 +14,14 @@ type ( ...@@ -14,10 +14,14 @@ type (
WelcomeMessageResponse = welcomeMessageResponse WelcomeMessageResponse = welcomeMessageResponse
BalancesResponse = balancesResponse BalancesResponse = balancesResponse
BalanceResponse = balanceResponse BalanceResponse = balanceResponse
SettlementResponse = settlementResponse
SettlementsResponse = settlementsResponse
) )
var ( var (
ErrCantBalance = errCantBalance ErrCantBalance = errCantBalance
ErrCantBalances = errCantBalances ErrCantBalances = errCantBalances
ErrInvaliAddress = errInvaliAddress ErrCantSettlementsPeer = errCantSettlementsPeer
ErrCantSettlements = errCantSettlements
ErrInvaliAddress = errInvaliAddress
) )
...@@ -87,6 +87,13 @@ func (s *server) setupRouting() { ...@@ -87,6 +87,13 @@ func (s *server) setupRouting() {
"GET": http.HandlerFunc(s.peerBalanceHandler), "GET": http.HandlerFunc(s.peerBalanceHandler),
}) })
router.Handle("/settlements", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.settlementsHandler),
})
router.Handle("/settlements/{peer}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.peerSettlementsHandler),
})
baseRouter.Handle("/", web.ChainHandlers( baseRouter.Handle("/", web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "debug api access"), logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "debug api access"),
handlers.CompressHandler, handlers.CompressHandler,
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi
import (
"errors"
"math/big"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
var (
errCantSettlements = "can not get settlements"
errCantSettlementsPeer = "can not get settlements for peer"
)
type settlementResponse struct {
Peer string `json:"peer"`
SettlementReceived uint64 `json:"received"`
SettlementSent uint64 `json:"sent"`
}
type settlementsResponse struct {
TotalSettlementReceived *big.Int `json:"totalreceived"`
TotalSettlementSent *big.Int `json:"totalsent"`
Settlements []settlementResponse `json:"settlements"`
}
func (s *server) settlementsHandler(w http.ResponseWriter, r *http.Request) {
settlementsSent, err := s.Settlement.SettlementsSent()
if err != nil {
jsonhttp.InternalServerError(w, errCantSettlements)
s.Logger.Debugf("debug api: sent settlements: %v", err)
s.Logger.Error("debug api: can not get sent settlements")
return
}
settlementsReceived, err := s.Settlement.SettlementsReceived()
if err != nil {
jsonhttp.InternalServerError(w, errCantSettlements)
s.Logger.Debugf("debug api: received settlements: %v", err)
s.Logger.Error("debug api: can not get received settlements")
return
}
totalReceived := big.NewInt(0)
totalSent := big.NewInt(0)
settlementResponses := make(map[string]settlementResponse)
for a, b := range settlementsSent {
settlementResponses[a] = settlementResponse{
Peer: a,
SettlementSent: b,
SettlementReceived: 0,
}
totalSent.Add(big.NewInt(int64(b)), totalSent)
}
for a, b := range settlementsReceived {
if _, ok := settlementResponses[a]; ok {
t := settlementResponses[a]
t.SettlementReceived = b
settlementResponses[a] = t
} else {
settlementResponses[a] = settlementResponse{
Peer: a,
SettlementSent: 0,
SettlementReceived: b,
}
}
totalReceived.Add(big.NewInt(int64(b)), totalReceived)
}
settlementResponsesArray := make([]settlementResponse, len(settlementResponses))
i := 0
for k := range settlementResponses {
settlementResponsesArray[i] = settlementResponses[k]
i++
}
jsonhttp.OK(w, settlementsResponse{TotalSettlementReceived: totalReceived, TotalSettlementSent: totalSent, Settlements: settlementResponsesArray})
}
func (s *server) peerSettlementsHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["peer"]
peer, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("debug api: settlements peer: invalid peer address %s: %v", addr, err)
s.Logger.Error("debug api: settlements peer: invalid peer address %s", addr)
jsonhttp.NotFound(w, errInvaliAddress)
return
}
peerexists := false
received, err := s.Settlement.TotalReceived(peer)
if err != nil {
if !errors.Is(err, pseudosettle.ErrPeerNoSettlements) {
s.Logger.Debugf("debug api: settlements peer: get peer %s received settlement: %v", peer.String(), err)
s.Logger.Errorf("debug api: settlements peer: can't get peer %s received settlement", peer.String())
jsonhttp.InternalServerError(w, errCantSettlementsPeer)
return
}
}
if err == nil {
peerexists = true
}
sent, err := s.Settlement.TotalSent(peer)
if err != nil {
if !errors.Is(err, pseudosettle.ErrPeerNoSettlements) {
s.Logger.Debugf("debug api: settlements peer: get peer %s sent settlement: %v", peer.String(), err)
s.Logger.Errorf("debug api: settlements peer: can't get peer %s sent settlement", peer.String())
jsonhttp.InternalServerError(w, errCantSettlementsPeer)
return
}
}
if err == nil {
peerexists = true
}
if !peerexists {
jsonhttp.NotFound(w, pseudosettle.ErrPeerNoSettlements)
return
}
jsonhttp.OK(w, settlementResponse{
Peer: peer.String(),
SettlementReceived: received,
SettlementSent: sent,
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi_test
import (
"errors"
"math/big"
"net/http"
"reflect"
"testing"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestSettlements(t *testing.T) {
settlementsSentFunc := func() (ret map[string]uint64, err error) {
ret = make(map[string]uint64)
ret["DEAD"] = 10000
ret["BEEF"] = 20000
ret["FFFF"] = 50000
return ret, err
}
settlementsRecvFunc := func() (ret map[string]uint64, err error) {
ret = make(map[string]uint64)
ret["BEEF"] = 10000
ret["EEEE"] = 5000
return ret, err
}
testServer := newTestServer(t, testServerOptions{
SettlementOpts: []mock.Option{mock.WithSettlementsSentFunc(settlementsSentFunc), mock.WithSettlementsRecvFunc(settlementsRecvFunc)},
})
expected := &debugapi.SettlementsResponse{
TotalSettlementReceived: big.NewInt(15000),
TotalSettlementSent: big.NewInt(80000),
Settlements: []debugapi.SettlementResponse{
{
Peer: "DEAD",
SettlementReceived: 0,
SettlementSent: 10000,
},
{
Peer: "BEEF",
SettlementReceived: 10000,
SettlementSent: 20000,
},
{
Peer: "FFFF",
SettlementReceived: 0,
SettlementSent: 50000,
},
{
Peer: "EEEE",
SettlementReceived: 5000,
SettlementSent: 0,
},
},
}
// We expect a list of items unordered by peer:
var got *debugapi.SettlementsResponse
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/settlements", http.StatusOK,
jsonhttptest.WithUnmarshalJSONResponse(&got),
)
if !equalSettlements(got, expected) {
t.Errorf("got settlements: %+v, expected: %+v", got, expected)
}
}
func TestSettlementsError(t *testing.T) {
wantErr := errors.New("New errors")
settlementsSentFunc := func() (map[string]uint64, error) {
return nil, wantErr
}
testServer := newTestServer(t, testServerOptions{
SettlementOpts: []mock.Option{mock.WithSettlementsSentFunc(settlementsSentFunc)},
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/settlements", http.StatusInternalServerError,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: debugapi.ErrCantSettlements,
Code: http.StatusInternalServerError,
}),
)
}
func TestSettlementsPeers(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
settlementSentFunc := func(swarm.Address) (uint64, error) {
return 1000000000000000000, nil
}
testServer := newTestServer(t, testServerOptions{
SettlementOpts: []mock.Option{mock.WithSettlementSentFunc(settlementSentFunc)},
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/settlements/"+peer, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(debugapi.SettlementResponse{
Peer: peer,
SettlementSent: 1000000000000000000,
SettlementReceived: 0,
}),
)
}
func TestSettlementsPeersError(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
wantErr := errors.New("Error")
settlementSentFunc := func(swarm.Address) (uint64, error) {
return 0, wantErr
}
testServer := newTestServer(t, testServerOptions{
SettlementOpts: []mock.Option{mock.WithSettlementSentFunc(settlementSentFunc)},
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/settlements/"+peer, http.StatusInternalServerError,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: debugapi.ErrCantSettlementsPeer,
Code: http.StatusInternalServerError,
}),
)
}
func TestSettlementsInvalidAddress(t *testing.T) {
peer := "bad peer address"
testServer := newTestServer(t, testServerOptions{})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/settlements/"+peer, http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: debugapi.ErrInvaliAddress,
Code: http.StatusNotFound,
}),
)
}
func equalSettlements(a, b *debugapi.SettlementsResponse) bool {
var state bool
for akeys := range a.Settlements {
state = false
for bkeys := range b.Settlements {
if reflect.DeepEqual(a.Settlements[akeys], b.Settlements[bkeys]) {
state = true
break
}
}
if !state {
return false
}
}
for bkeys := range b.Settlements {
state = false
for akeys := range a.Settlements {
if reflect.DeepEqual(a.Settlements[akeys], b.Settlements[bkeys]) {
state = true
break
}
}
if !state {
return false
}
}
if a.TotalSettlementReceived.Cmp(b.TotalSettlementReceived) != 0 {
return false
}
if a.TotalSettlementSent.Cmp(b.TotalSettlementSent) != 0 {
return false
}
return true
}
...@@ -220,10 +220,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service, ...@@ -220,10 +220,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
} }
b.localstoreCloser = storer b.localstoreCloser = storer
settlement := pseudosettle.New(pseudosettle.Options{ settlement := pseudosettle.New(p2ps, logger, stateStore)
Streamer: p2ps,
Logger: logger,
})
if err = p2ps.AddProtocol(settlement.Protocol()); err != nil { if err = p2ps.AddProtocol(settlement.Protocol()); err != nil {
return nil, fmt.Errorf("pseudosettle service: %w", err) return nil, fmt.Errorf("pseudosettle service: %w", err)
...@@ -330,7 +327,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service, ...@@ -330,7 +327,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
if o.DebugAPIAddr != "" { if o.DebugAPIAddr != "" {
// Debug API server // Debug API server
debugAPIService := debugapi.New(swarmAddress, p2ps, pingPong, kad, storer, logger, tracer, tagg, acc) debugAPIService := debugapi.New(swarmAddress, p2ps, pingPong, kad, storer, logger, tracer, tagg, acc, settlement)
// register metrics from components // register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...) debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...) debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
......
...@@ -15,6 +15,14 @@ type Interface interface { ...@@ -15,6 +15,14 @@ type Interface interface {
// Pay initiates a payment to the given peer // Pay initiates a payment to the given peer
// It should return without error it is likely that the payment worked // It should return without error it is likely that the payment worked
Pay(ctx context.Context, peer swarm.Address, amount uint64) error Pay(ctx context.Context, peer swarm.Address, amount uint64) error
// TotalSent returns the total amount sent to a peer
TotalSent(peer swarm.Address) (totalSent uint64, err error)
// TotalReceived returns the total amount received from a peer
TotalReceived(peer swarm.Address) (totalSent uint64, err error)
// SettlementsSent returns sent settlements for each individual known peer
SettlementsSent() (map[string]uint64, error)
// SettlementsReceived returns received settlements for each individual known peer
SettlementsReceived() (map[string]uint64, error)
} }
// PaymentObserver is the interface Settlement uses to notify other components of an incoming payment // PaymentObserver is the interface Settlement uses to notify other components of an incoming payment
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"context"
"sync"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/swarm"
)
// Service is the mock settlement service.
type Service struct {
lock sync.Mutex
settlementsSent map[string]uint64
settlementsRecv map[string]uint64
settlementSentFunc func(swarm.Address) (uint64, error)
settlementRecvFunc func(swarm.Address) (uint64, error)
settlementsSentFunc func() (map[string]uint64, error)
settlementsRecvFunc func() (map[string]uint64, error)
}
// WithsettlementFunc sets the mock settlement function
func WithSettlementSentFunc(f func(swarm.Address) (uint64, error)) Option {
return optionFunc(func(s *Service) {
s.settlementSentFunc = f
})
}
func WithSettlementRecvFunc(f func(swarm.Address) (uint64, error)) Option {
return optionFunc(func(s *Service) {
s.settlementRecvFunc = f
})
}
// WithsettlementsFunc sets the mock settlements function
func WithSettlementsSentFunc(f func() (map[string]uint64, error)) Option {
return optionFunc(func(s *Service) {
s.settlementsSentFunc = f
})
}
func WithSettlementsRecvFunc(f func() (map[string]uint64, error)) Option {
return optionFunc(func(s *Service) {
s.settlementsRecvFunc = f
})
}
// Newsettlement creates the mock settlement implementation
func NewSettlement(opts ...Option) settlement.Interface {
mock := new(Service)
mock.settlementsSent = make(map[string]uint64)
mock.settlementsRecv = make(map[string]uint64)
for _, o := range opts {
o.apply(mock)
}
return mock
}
func (s *Service) Pay(_ context.Context, peer swarm.Address, amount uint64) error {
s.settlementsSent[peer.String()] += amount
return nil
}
func (s *Service) TotalSent(peer swarm.Address) (totalSent uint64, err error) {
if s.settlementSentFunc != nil {
return s.settlementSentFunc(peer)
}
s.lock.Lock()
defer s.lock.Unlock()
return s.settlementsSent[peer.String()], nil
}
func (s *Service) TotalReceived(peer swarm.Address) (totalSent uint64, err error) {
if s.settlementRecvFunc != nil {
return s.settlementRecvFunc(peer)
}
s.lock.Lock()
defer s.lock.Unlock()
return s.settlementsRecv[peer.String()], nil
}
// settlements is the mock function wrapper that calls the set implementation
func (s *Service) SettlementsSent() (map[string]uint64, error) {
if s.settlementsSentFunc != nil {
return s.settlementsSentFunc()
}
s.lock.Lock()
defer s.lock.Unlock()
return s.settlementsSent, nil
}
func (s *Service) SettlementsReceived() (map[string]uint64, error) {
if s.settlementsRecvFunc != nil {
return s.settlementsRecvFunc()
}
s.lock.Lock()
defer s.lock.Unlock()
return s.settlementsRecv, nil
}
// Option is the option passed to the mock settlement service
type Option interface {
apply(*Service)
}
type optionFunc func(*Service)
func (f optionFunc) apply(r *Service) { f(r) }
...@@ -6,7 +6,9 @@ package pseudosettle ...@@ -6,7 +6,9 @@ package pseudosettle
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
...@@ -14,6 +16,7 @@ import ( ...@@ -14,6 +16,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/settlement" "github.com/ethersphere/bee/pkg/settlement"
pb "github.com/ethersphere/bee/pkg/settlement/pseudosettle/pb" pb "github.com/ethersphere/bee/pkg/settlement/pseudosettle/pb"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
...@@ -23,23 +26,26 @@ const ( ...@@ -23,23 +26,26 @@ const (
streamName = "pseudosettle" streamName = "pseudosettle"
) )
var (
SettlementReceivedPrefix = "pseudosettle_total_received_"
SettlementSentPrefix = "pseudosettle_total_sent_"
ErrPeerNoSettlements = errors.New("no settlements for peer")
)
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
logger logging.Logger logger logging.Logger
store storage.StateStorer
observer settlement.PaymentObserver observer settlement.PaymentObserver
metrics metrics metrics metrics
} }
type Options struct { func New(streamer p2p.Streamer, logger logging.Logger, store storage.StateStorer) *Service {
Streamer p2p.Streamer
Logger logging.Logger
}
func New(o Options) *Service {
return &Service{ return &Service{
streamer: o.Streamer, streamer: streamer,
logger: o.Logger, logger: logger,
metrics: newMetrics(), metrics: newMetrics(),
store: store,
} }
} }
...@@ -56,6 +62,20 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -56,6 +62,20 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
} }
} }
func totalKey(peer swarm.Address, prefix string) string {
return fmt.Sprintf("%v%v", prefix, peer.String())
}
func totalKeyPeer(key []byte, prefix string) (peer swarm.Address, err error) {
k := string(key)
split := strings.SplitAfter(k, prefix)
if len(split) != 2 {
return swarm.ZeroAddress, errors.New("no peer in key")
}
return swarm.ParseHexAddress(split[1])
}
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
r := protobuf.NewReader(stream) r := protobuf.NewReader(stream)
defer func() { defer func() {
...@@ -72,6 +92,20 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e ...@@ -72,6 +92,20 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
s.metrics.TotalReceivedPseudoSettlements.Add(float64(req.Amount)) s.metrics.TotalReceivedPseudoSettlements.Add(float64(req.Amount))
s.logger.Tracef("received payment message from peer %v of %d", p.Address, req.Amount) s.logger.Tracef("received payment message from peer %v of %d", p.Address, req.Amount)
totalReceived, err := s.TotalReceived(p.Address)
if err != nil {
if !errors.Is(err, ErrPeerNoSettlements) {
return err
}
totalReceived = 0
}
err = s.store.Put(totalKey(p.Address, SettlementReceivedPrefix), totalReceived+req.Amount)
if err != nil {
return err
}
return s.observer.NotifyPayment(p.Address, req.Amount) return s.observer.NotifyPayment(p.Address, req.Amount)
} }
...@@ -100,6 +134,17 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount uint64) er ...@@ -100,6 +134,17 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount uint64) er
if err != nil { if err != nil {
return err return err
} }
totalSent, err := s.TotalSent(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoSettlements) {
return err
}
totalSent = 0
}
err = s.store.Put(totalKey(peer, SettlementSentPrefix), totalSent+amount)
if err != nil {
return err
}
s.metrics.TotalSentPseudoSettlements.Add(float64(amount)) s.metrics.TotalSentPseudoSettlements.Add(float64(amount))
return nil return nil
} }
...@@ -108,3 +153,78 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount uint64) er ...@@ -108,3 +153,78 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount uint64) er
func (s *Service) SetPaymentObserver(observer settlement.PaymentObserver) { func (s *Service) SetPaymentObserver(observer settlement.PaymentObserver) {
s.observer = observer s.observer = observer
} }
// TotalSent returns the total amount sent to a peer
func (s *Service) TotalSent(peer swarm.Address) (totalSent uint64, err error) {
key := totalKey(peer, SettlementSentPrefix)
err = s.store.Get(key, &totalSent)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, ErrPeerNoSettlements
}
return 0, err
}
return totalSent, nil
}
// TotalReceived returns the total amount received from a peer
func (s *Service) TotalReceived(peer swarm.Address) (totalReceived uint64, err error) {
key := totalKey(peer, SettlementReceivedPrefix)
err = s.store.Get(key, &totalReceived)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, ErrPeerNoSettlements
}
return 0, err
}
return totalReceived, nil
}
// AllSettlements returns all stored settlement values for a given type of prefix (sent or received)
func (s *Service) SettlementsSent() (map[string]uint64, error) {
sent := make(map[string]uint64)
err := s.store.Iterate(SettlementSentPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := totalKeyPeer(key, SettlementSentPrefix)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %w", string(key), err)
}
if _, ok := sent[addr.String()]; !ok {
var storevalue uint64
err = s.store.Get(totalKey(addr, SettlementSentPrefix), &storevalue)
if err != nil {
return false, fmt.Errorf("get peer %s settlement balance: %w", addr.String(), err)
}
sent[addr.String()] = storevalue
}
return false, nil
})
if err != nil {
return nil, err
}
return sent, nil
}
func (s *Service) SettlementsReceived() (map[string]uint64, error) {
received := make(map[string]uint64)
err := s.store.Iterate(SettlementReceivedPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := totalKeyPeer(key, SettlementReceivedPrefix)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %w", string(key), err)
}
if _, ok := received[addr.String()]; !ok {
var storevalue uint64
err = s.store.Get(totalKey(addr, SettlementReceivedPrefix), &storevalue)
if err != nil {
return false, fmt.Errorf("get peer %s settlement balance: %w", addr.String(), err)
}
received[addr.String()] = storevalue
}
return false, nil
})
if err != nil {
return nil, err
}
return received, nil
}
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/streamtest" "github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle" "github.com/ethersphere/bee/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle/pb" "github.com/ethersphere/bee/pkg/settlement/pseudosettle/pb"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
...@@ -34,20 +35,21 @@ func (t *testObserver) NotifyPayment(peer swarm.Address, amount uint64) error { ...@@ -34,20 +35,21 @@ func (t *testObserver) NotifyPayment(peer swarm.Address, amount uint64) error {
func TestPayment(t *testing.T) { func TestPayment(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
storeRecipient := mock.NewStateStore()
defer storeRecipient.Close()
observer := &testObserver{} observer := &testObserver{}
recipient := pseudosettle.New(pseudosettle.Options{ recipient := pseudosettle.New(nil, logger, storeRecipient)
Logger: logger,
})
recipient.SetPaymentObserver(observer) recipient.SetPaymentObserver(observer)
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols(recipient.Protocol()), streamtest.WithProtocols(recipient.Protocol()),
) )
payer := pseudosettle.New(pseudosettle.Options{ storePayer := mock.NewStateStore()
Streamer: recorder, defer storePayer.Close()
Logger: logger,
}) payer := pseudosettle.New(recorder, logger, storePayer)
peerID := swarm.MustParseHexAddress("9ee7add7") peerID := swarm.MustParseHexAddress("9ee7add7")
amount := uint64(10000) amount := uint64(10000)
...@@ -96,4 +98,22 @@ func TestPayment(t *testing.T) { ...@@ -96,4 +98,22 @@ func TestPayment(t *testing.T) {
if !observer.peer.Equal(peerID) { if !observer.peer.Equal(peerID) {
t.Fatalf("observer called with wrong peer. got %v, want %v", observer.peer, peerID) t.Fatalf("observer called with wrong peer. got %v, want %v", observer.peer, peerID)
} }
totalSent, err := payer.TotalSent(peerID)
if err != nil {
t.Fatal(err)
}
if totalSent != sentAmount {
t.Fatalf("stored wrong totalSent. got %d, want %d", totalSent, sentAmount)
}
totalReceived, err := recipient.TotalReceived(peerID)
if err != nil {
t.Fatal(err)
}
if totalReceived != sentAmount {
t.Fatalf("stored wrong totalReceived. got %d, want %d", totalReceived, sentAmount)
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment