Commit 5ada5bcd authored by metacertain's avatar metacertain Committed by GitHub

Accounting debug api (#461)

Co-authored-by: default avatarJanos Guljas <janos@resenje.org>
parent 2226acb4
......@@ -7,6 +7,7 @@ package accounting
import (
"errors"
"fmt"
"strings"
"sync"
"github.com/ethersphere/bee/pkg/logging"
......@@ -15,7 +16,10 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
var _ Interface = (*Accounting)(nil)
var (
_ Interface = (*Accounting)(nil)
balancesPrefix string = "balance_"
)
// Interface is the main interface for Accounting
type Interface interface {
......@@ -31,6 +35,8 @@ type Interface interface {
Debit(peer swarm.Address, price uint64) error
// Balance returns the current balance for the given peer
Balance(peer swarm.Address) (int64, error)
// Balances returns balances for all known peers
Balances() (map[string]int64, error)
}
// PeerBalance holds all relevant accounting information for one peer
......@@ -129,7 +135,7 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance)
err = a.store.Put(balanceKey(peer), nextBalance)
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return err
}
......@@ -159,7 +165,7 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
a.logger.Tracef("debiting peer %v with price %d, new balance is %d", peer, price, nextBalance)
err = a.store.Put(balanceKey(peer), nextBalance)
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return err
}
......@@ -188,8 +194,8 @@ func (a *Accounting) Balance(peer swarm.Address) (int64, error) {
}
// get the balance storage key for the given peer
func balanceKey(peer swarm.Address) string {
return fmt.Sprintf("balance_%s", peer.String())
func peerBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesPrefix, peer.String())
}
// getPeerBalance gets the PeerBalance for a given peer
......@@ -203,7 +209,7 @@ func (a *Accounting) getPeerBalance(peer swarm.Address) (*PeerBalance, error) {
if !ok {
// balance not yet in memory, load from state store
var balance int64
err := a.store.Get(balanceKey(peer), &balance)
err := a.store.Get(peerBalanceKey(peer), &balance)
if err == nil {
peerBalance = &PeerBalance{
balance: balance,
......@@ -226,6 +232,63 @@ func (a *Accounting) getPeerBalance(peer swarm.Address) (*PeerBalance, error) {
return peerBalance, nil
}
// Balances gets balances for all peers, first from memory, than completing from store
func (a *Accounting) Balances() (map[string]int64, error) {
peersBalances := make(map[string]int64)
// get peer balances from store first as it may be outdated
// compared to the in memory map
if err := a.balancesFromStore(peersBalances); err != nil {
return nil, err
}
a.balancesMu.Lock()
for peer, balance := range a.balances {
peersBalances[peer] = balance.balance
}
a.balancesMu.Unlock()
return peersBalances, nil
}
// Get balances from store for keys (peers) that do not already exist in argument map.
// Used to get all balances not loaded in memory at the time the Balances() function is called.
func (a *Accounting) balancesFromStore(s map[string]int64) error {
return a.store.Iterate(balancesPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := balanceKeyPeer(key)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
}
if _, ok := s[addr.String()]; !ok {
var storevalue int64
err = a.store.Get(peerBalanceKey(addr), &storevalue)
if err != nil {
return false, fmt.Errorf("get peer %s balance: %v", addr.String(), err)
}
s[addr.String()] = storevalue
}
return false, nil
})
}
// get the embedded peer from the balance storage key
func balanceKeyPeer(key []byte) (swarm.Address, error) {
k := string(key)
split := strings.SplitAfter(k, balancesPrefix)
if len(split) != 2 {
return swarm.ZeroAddress, errors.New("no peer in key")
}
addr, err := swarm.ParseHexAddress(split[1])
if err != nil {
return swarm.ZeroAddress, err
}
return addr, nil
}
func (pb *PeerBalance) freeBalance() int64 {
return pb.balance - int64(pb.reserved)
}
......@@ -5,45 +5,136 @@
package mock
import (
"github.com/ethersphere/bee/pkg/swarm"
"sync"
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/swarm"
)
type MockAccounting struct {
// Service is the mock Accounting service.
type Service struct {
lock sync.Mutex
balances map[string]int64
reserveFunc func(peer swarm.Address, price uint64) error
releaseFunc func(peer swarm.Address, price uint64)
creditFunc func(peer swarm.Address, price uint64) error
debitFunc func(peer swarm.Address, price uint64) error
balanceFunc func(swarm.Address) (int64, error)
balancesFunc func() (map[string]int64, error)
}
func (ma *MockAccounting) Reserve(peer swarm.Address, price uint64) error {
return nil
// WithReserveFunc sets the mock Reserve function
func WithReserveFunc(f func(peer swarm.Address, price uint64) error) Option {
return optionFunc(func(s *Service) {
s.reserveFunc = f
})
}
// WithReleaseFunc sets the mock Release function
func WithReleaseFunc(f func(peer swarm.Address, price uint64)) Option {
return optionFunc(func(s *Service) {
s.releaseFunc = f
})
}
// WithCreditFunc sets the mock Credit function
func WithCreditFunc(f func(peer swarm.Address, price uint64) error) Option {
return optionFunc(func(s *Service) {
s.creditFunc = f
})
}
// WithDebitFunc sets the mock Debit function
func WithDebitFunc(f func(peer swarm.Address, price uint64) error) Option {
return optionFunc(func(s *Service) {
s.debitFunc = f
})
}
// WithBalanceFunc sets the mock Balance function
func WithBalanceFunc(f func(swarm.Address) (int64, error)) Option {
return optionFunc(func(s *Service) {
s.balanceFunc = f
})
}
// WithBalancesFunc sets the mock Balances function
func WithBalancesFunc(f func() (map[string]int64, error)) Option {
return optionFunc(func(s *Service) {
s.balancesFunc = f
})
}
func (ma *MockAccounting) Release(peer swarm.Address, price uint64) {
// NewAccounting creates the mock accounting implementation
func NewAccounting(opts ...Option) accounting.Interface {
mock := new(Service)
mock.balances = make(map[string]int64)
for _, o := range opts {
o.apply(mock)
}
return mock
}
// Reserve is the mock function wrapper that calls the set implementation
func (s *Service) Reserve(peer swarm.Address, price uint64) error {
if s.reserveFunc != nil {
return s.reserveFunc(peer, price)
}
return nil
}
// Release is the mock function wrapper that calls the set implementation
func (s *Service) Release(peer swarm.Address, price uint64) {
if s.releaseFunc != nil {
s.releaseFunc(peer, price)
}
}
func (ma *MockAccounting) Credit(peer swarm.Address, price uint64) error {
ma.lock.Lock()
defer ma.lock.Unlock()
ma.balances[peer.String()] -= int64(price)
// Credit is the mock function wrapper that calls the set implementation
func (s *Service) Credit(peer swarm.Address, price uint64) error {
if s.creditFunc != nil {
return s.creditFunc(peer, price)
}
s.lock.Lock()
defer s.lock.Unlock()
s.balances[peer.String()] -= int64(price)
return nil
}
func (ma *MockAccounting) Debit(peer swarm.Address, price uint64) error {
ma.lock.Lock()
defer ma.lock.Unlock()
ma.balances[peer.String()] += int64(price)
// Debit is the mock function wrapper that calls the set implementation
func (s *Service) Debit(peer swarm.Address, price uint64) error {
if s.debitFunc != nil {
return s.debitFunc(peer, price)
}
s.lock.Lock()
defer s.lock.Unlock()
s.balances[peer.String()] += int64(price)
return nil
}
func (ma *MockAccounting) Balance(peer swarm.Address) (int64, error) {
ma.lock.Lock()
defer ma.lock.Unlock()
return ma.balances[peer.String()], nil
// Balance is the mock function wrapper that calls the set implementation
func (s *Service) Balance(peer swarm.Address) (int64, error) {
if s.balanceFunc != nil {
return s.balanceFunc(peer)
}
s.lock.Lock()
defer s.lock.Unlock()
return s.balances[peer.String()], nil
}
func NewAccounting() *MockAccounting {
return &MockAccounting{
balances: make(map[string]int64),
// Balances is the mock function wrapper that calls the set implementation
func (s *Service) Balances() (map[string]int64, error) {
if s.balancesFunc != nil {
return s.balancesFunc()
}
return s.balances, nil
}
// Option is the option passed to the mock accounting service
type Option interface {
apply(*Service)
}
type optionFunc func(*Service)
func (f optionFunc) apply(r *Service) { f(r) }
......@@ -10,12 +10,12 @@ import (
var _ swarm.Validator = (*Validator)(nil)
// ContentAddressValidator validates that the address of a given chunk
// Validator validates that the address of a given chunk
// is the content address of its contents.
type Validator struct {
}
// NewContentAddressValidator constructs a new ContentAddressValidator
// NewValidator constructs a new Validator
func NewValidator() swarm.Validator {
return &Validator{}
}
......
......@@ -11,9 +11,9 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
// TestContentAddressValidator checks that the validator evaluates correctly
// TestValidator checks that the validator evaluates correctly
// on valid and invalid input
func TestContentAddressValidator(t *testing.T) {
func TestValidator(t *testing.T) {
// instantiate validator
validator := content.NewValidator()
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi
import (
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
var (
errCantBalances = "Cannot get balances"
errCantBalance = "Cannot get balance"
errInvaliAddress = "Invalid address"
)
type balanceResponse struct {
Peer string `json:"peer"`
Balance int64 `json:"balance"`
}
type balancesResponse struct {
Balances []balanceResponse `json:"balances"`
}
func (s *server) balancesHandler(w http.ResponseWriter, r *http.Request) {
balances, err := s.Accounting.Balances()
if err != nil {
jsonhttp.InternalServerError(w, errCantBalances)
s.Logger.Debugf("debug api: balances: %v", err)
s.Logger.Error("debug api: can not get balances")
return
}
balResponses := make([]balanceResponse, len(balances))
i := 0
for k := range balances {
balResponses[i] = balanceResponse{
Peer: k,
Balance: balances[k],
}
i++
}
jsonhttp.OK(w, balancesResponse{Balances: balResponses})
}
func (s *server) peerBalanceHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["peer"]
peer, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("debug api: balances peer: invalid peer address %s: %v", addr, err)
s.Logger.Error("debug api: balances peer: invalid peer address %s", addr)
jsonhttp.NotFound(w, errInvaliAddress)
return
}
balance, err := s.Accounting.Balance(peer)
if err != nil {
s.Logger.Debugf("debug api: balances peer: get peer %s balance: %v", peer.String(), err)
s.Logger.Errorf("debug api: balances peer: can't get peer %s balance", peer.String())
jsonhttp.InternalServerError(w, errCantBalance)
return
}
jsonhttp.OK(w, balanceResponse{
Peer: peer.String(),
Balance: balance,
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi_test
import (
"errors"
"net/http"
"reflect"
"testing"
"github.com/ethersphere/bee/pkg/accounting/mock"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestBalances(t *testing.T) {
balancesFunc := func() (ret map[string]int64, err error) {
ret = make(map[string]int64)
ret["DEAD"] = 1000000000000000000
ret["BEEF"] = -100000000000000000
ret["PARTY"] = 0
return ret, err
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalancesFunc(balancesFunc)},
})
expected := &debugapi.BalancesResponse{
[]debugapi.BalanceResponse{
{
Peer: "DEAD",
Balance: 1000000000000000000,
},
{
Peer: "BEEF",
Balance: -100000000000000000,
},
{
Peer: "PARTY",
Balance: 0,
},
},
}
// We expect a list of items unordered by peer:
var got *debugapi.BalancesResponse
jsonhttptest.ResponseUnmarshal(t, testServer.Client, http.MethodGet, "/balances", nil, http.StatusOK, &got)
if !equalBalances(got, expected) {
t.Errorf("got balances: %v, expected: %v", got, expected)
}
}
func TestBalancesError(t *testing.T) {
wantErr := errors.New("ASDF")
balancesFunc := func() (ret map[string]int64, err error) {
return nil, wantErr
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalancesFunc(balancesFunc)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/balances", nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Message: debugapi.ErrCantBalances,
Code: http.StatusInternalServerError,
})
}
func TestBalancesPeers(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
balanceFunc := func(swarm.Address) (int64, error) {
return 1000000000000000000, nil
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/balances/"+peer, nil, http.StatusOK, debugapi.BalanceResponse{
Peer: peer,
Balance: 1000000000000000000,
})
}
func TestBalancesPeersError(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
wantErr := errors.New("Error")
balanceFunc := func(swarm.Address) (int64, error) {
return 0, wantErr
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/balances/"+peer, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Message: debugapi.ErrCantBalance,
Code: http.StatusInternalServerError,
})
}
func TestBalancesInvalidAddress(t *testing.T) {
peer := "bad peer address"
testServer := newTestServer(t, testServerOptions{})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/balances/"+peer, nil, http.StatusNotFound, jsonhttp.StatusResponse{
Message: debugapi.ErrInvaliAddress,
Code: http.StatusNotFound,
})
}
func equalBalances(a, b *debugapi.BalancesResponse) bool {
var state bool
for akeys := range a.Balances {
state = false
for bkeys := range b.Balances {
if reflect.DeepEqual(a.Balances[akeys], b.Balances[bkeys]) {
state = true
}
}
if !state {
return false
}
}
for bkeys := range b.Balances {
state = false
for akeys := range a.Balances {
if reflect.DeepEqual(a.Balances[akeys], b.Balances[bkeys]) {
state = true
}
}
if !state {
return false
}
}
return true
}
......@@ -7,6 +7,7 @@ package debugapi
import (
"net/http"
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/pingpong"
......@@ -39,6 +40,7 @@ type Options struct {
Logger logging.Logger
Tracer *tracing.Tracer
Tags *tags.Tags
Accounting accounting.Interface
}
func New(o Options) Service {
......
......@@ -11,35 +11,38 @@ import (
"net/url"
"testing"
accountingmock "github.com/ethersphere/bee/pkg/accounting/mock"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/logging"
mockp2p "github.com/ethersphere/bee/pkg/p2p/mock"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology/mock"
topologymock "github.com/ethersphere/bee/pkg/topology/mock"
"github.com/multiformats/go-multiaddr"
"resenje.org/web"
)
type testServerOptions struct {
Overlay swarm.Address
P2P *mockp2p.Service
P2P *p2pmock.Service
Pingpong pingpong.Interface
Storer storage.Storer
TopologyOpts []mock.Option
TopologyOpts []topologymock.Option
Tags *tags.Tags
AccountingOpts []accountingmock.Option
}
type testServer struct {
Client *http.Client
P2PMock *mockp2p.Service
P2PMock *p2pmock.Service
}
func newTestServer(t *testing.T, o testServerOptions) *testServer {
topologyDriver := mock.NewTopologyDriver(o.TopologyOpts...)
topologyDriver := topologymock.NewTopologyDriver(o.TopologyOpts...)
acc := accountingmock.NewAccounting(o.AccountingOpts...)
s := debugapi.New(debugapi.Options{
Overlay: o.Overlay,
......@@ -49,6 +52,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
Logger: logging.New(ioutil.Discard, 0),
Storer: o.Storer,
TopologyDriver: topologyDriver,
Accounting: acc,
})
ts := httptest.NewServer(s)
t.Cleanup(ts.Close)
......
......@@ -15,4 +15,12 @@ type (
TagResponse = tagResponse
WelcomeMessageRequest = welcomeMessageRequest
WelcomeMessageResponse = welcomeMessageResponse
BalancesResponse = balancesResponse
BalanceResponse = balanceResponse
)
var (
ErrCantBalance = errCantBalance
ErrCantBalances = errCantBalances
ErrInvaliAddress = errInvaliAddress
)
......@@ -93,6 +93,12 @@ func (s *server) setupRouting() {
web.FinalHandlerFunc(s.setWelcomeMessageHandler),
),
})
router.Handle("/balances", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.balancesHandler),
})
router.Handle("/balances/{peer}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.peerBalanceHandler),
})
baseRouter.Handle("/", web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "debug api access"),
......
......@@ -242,7 +242,7 @@ func NewBee(o Options) (*Bee, error) {
DisconnectThreshold: o.DisconnectThreshold,
})
chunkvalidators := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
retrieve := retrieval.New(retrieval.Options{
Streamer: p2ps,
......@@ -250,7 +250,7 @@ func NewBee(o Options) (*Bee, error) {
Logger: logger,
Accounting: acc,
Pricer: accounting.NewFixedPricer(address, 10),
Validator: chunkvalidators,
Validator: chunkvalidator,
})
tagg := tags.NewTags()
......@@ -258,7 +258,7 @@ func NewBee(o Options) (*Bee, error) {
return nil, fmt.Errorf("retrieval service: %w", err)
}
ns := netstore.New(storer, retrieve, logger, chunkvalidators)
ns := netstore.New(storer, retrieve, logger, chunkvalidator)
retrieve.SetStorer(ns)
......@@ -348,6 +348,7 @@ func NewBee(o Options) (*Bee, error) {
TopologyDriver: topologyDriver,
Storer: storer,
Tags: tagg,
Accounting: acc,
})
// register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
......
......@@ -14,7 +14,7 @@ var _ swarm.Validator = (*Validator)(nil)
type Validator struct {
}
// NewSocValidator creates a new SocValidator.
// NewValidator creates a new Validator.
func NewValidator() swarm.Validator {
return &Validator{}
}
......
......@@ -14,10 +14,10 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
// TestSocValidator verifies that the validator can detect both
// TestValidator verifies that the validator can detect both
// valid soc chunks, as well as chunks with invalid data and invalid
// address.
func TestSocValidator(t *testing.T) {
func TestValidator(t *testing.T) {
id := make([]byte, soc.IdSize)
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment