Commit 824636a2 authored by acud's avatar acud Committed by GitHub

libp2p, debugapi: add API to view blocklisted peers (#1171)

parent 7aa2db90
......@@ -72,3 +72,16 @@ func (s *server) peersHandler(w http.ResponseWriter, r *http.Request) {
Peers: s.P2P.Peers(),
})
}
func (s *server) blocklistedPeersHandler(w http.ResponseWriter, r *http.Request) {
peers, err := s.P2P.BlocklistedPeers()
if err != nil {
s.Logger.Debugf("debug api: blocklisted peers: %v", err)
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, peersResponse{
Peers: peers,
})
}
......@@ -181,3 +181,35 @@ func TestPeer(t *testing.T) {
)
})
}
func TestBlocklistedPeers(t *testing.T) {
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithBlocklistedPeersFunc(func() ([]p2p.Peer, error) {
return []p2p.Peer{{Address: overlay}}, nil
})),
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/blocklist", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(debugapi.PeersResponse{
Peers: []p2p.Peer{{Address: overlay}},
}),
)
}
func TestBlocklistedPeersErr(t *testing.T) {
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithBlocklistedPeersFunc(func() ([]p2p.Peer, error) {
return []p2p.Peer{{Address: overlay}}, errors.New("some error")
})),
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/blocklist", http.StatusInternalServerError,
jsonhttptest.WithExpectedJSONResponse(
jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
Message: http.StatusText(http.StatusInternalServerError),
}),
)
}
......@@ -68,6 +68,10 @@ func (s *server) setupRouting() {
router.Handle("/peers", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.peersHandler),
})
router.Handle("/blocklist", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.blocklistedPeersHandler),
})
router.Handle("/peers/{address}", jsonhttp.MethodHandler{
"DELETE": http.HandlerFunc(s.peerDisconnectHandler),
})
......
......@@ -5,8 +5,10 @@
package blocklist
import (
"strings"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -69,6 +71,38 @@ func (b *Blocklist) Add(overlay swarm.Address, duration time.Duration) (err erro
})
}
// Peers returns all currently blocklisted peers.
func (b *Blocklist) Peers() ([]p2p.Peer, error) {
var peers []p2p.Peer
if err := b.store.Iterate(keyPrefix, func(k, v []byte) (bool, error) {
if !strings.HasPrefix(string(k), keyPrefix) {
return true, nil
}
addr, err := unmarshalKey(string(k))
if err != nil {
return true, err
}
t, d, err := b.get(string(k))
if err != nil {
return true, err
}
if timeNow().Sub(t) > d && d != 0 {
// skip to the next item
return false, nil
}
p := p2p.Peer{Address: addr}
peers = append(peers, p)
return false, nil
}); err != nil {
return nil, err
}
return peers, nil
}
func (b *Blocklist) get(key string) (timestamp time.Time, duration time.Duration, err error) {
var e entry
if err := b.store.Get(key, &e); err != nil {
......@@ -86,3 +120,8 @@ func (b *Blocklist) get(key string) (timestamp time.Time, duration time.Duration
func generateKey(overlay swarm.Address) string {
return keyPrefix + overlay.String()
}
func unmarshalKey(s string) (swarm.Address, error) {
addr := strings.TrimLeft(s, keyPrefix)
return swarm.ParseHexAddress(addr)
}
......@@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/blocklist"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -39,6 +40,7 @@ func TestExist(t *testing.T) {
}
blocklist.SetTimeNow(func() time.Time { return time.Now().Add(100 * time.Millisecond) })
defer func() { blocklist.SetTimeNow(time.Now) }()
exists, err = bl.Exists(addr1)
if err != nil {
......@@ -58,3 +60,57 @@ func TestExist(t *testing.T) {
}
}
func TestPeers(t *testing.T) {
addr1 := swarm.NewAddress([]byte{0, 1, 2, 3})
addr2 := swarm.NewAddress([]byte{4, 5, 6, 7})
bl := blocklist.NewBlocklist(mock.NewStateStore())
// add forever
if err := bl.Add(addr1, 0); err != nil {
t.Fatal(err)
}
// add for 50 miliseconds
if err := bl.Add(addr2, time.Millisecond*50); err != nil {
t.Fatal(err)
}
peers, err := bl.Peers()
if err != nil {
t.Fatal(err)
}
if !isIn(addr1, peers) {
t.Fatalf("expected addr1 to exist in peers: %v", addr1)
}
if !isIn(addr2, peers) {
t.Fatalf("expected addr2 to exist in peers: %v", addr2)
}
blocklist.SetTimeNow(func() time.Time { return time.Now().Add(100 * time.Millisecond) })
defer func() { blocklist.SetTimeNow(time.Now) }()
// now expect just one
peers, err = bl.Peers()
if err != nil {
t.Fatal(err)
}
if !isIn(addr1, peers) {
t.Fatalf("expected addr1 to exist in peers: %v", peers)
}
if isIn(addr2, peers) {
t.Fatalf("expected addr2 to not exist in peers: %v", peers)
}
}
func isIn(p swarm.Address, peers []p2p.Peer) bool {
for _, v := range peers {
if v.Address.Equal(p) {
return true
}
}
return false
}
......@@ -587,6 +587,10 @@ func (s *Service) Peers() []p2p.Peer {
return s.peers.peers()
}
func (s *Service) BlocklistedPeers() ([]p2p.Peer, error) {
return s.blocklist.Peers()
}
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
peerID, found := s.peers.peerID(overlay)
if !found {
......
......@@ -21,6 +21,7 @@ type Service struct {
connectFunc func(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
disconnectFunc func(overlay swarm.Address) error
peersFunc func() []p2p.Peer
blocklistedPeersFunc func() ([]p2p.Peer, error)
addressesFunc func() ([]ma.Multiaddr, error)
setNotifierFunc func(p2p.Notifier)
setWelcomeMessageFunc func(string) error
......@@ -64,6 +65,13 @@ func WithPeersFunc(f func() []p2p.Peer) Option {
})
}
// WithBlocklistedPeersFunc sets the mock implementation of the BlocklistedPeers function
func WithBlocklistedPeersFunc(f func() ([]p2p.Peer, error)) Option {
return optionFunc(func(s *Service) {
s.blocklistedPeersFunc = f
})
}
// WithAddressesFunc sets the mock implementation of the Adresses function
func WithAddressesFunc(f func() ([]ma.Multiaddr, error)) Option {
return optionFunc(func(s *Service) {
......@@ -135,6 +143,14 @@ func (s *Service) Peers() []p2p.Peer {
return s.peersFunc()
}
func (s *Service) BlocklistedPeers() ([]p2p.Peer, error) {
if s.blocklistedPeersFunc == nil {
return nil, nil
}
return s.blocklistedPeersFunc()
}
func (s *Service) SetWelcomeMessage(val string) error {
if s.setWelcomeMessageFunc != nil {
return s.setWelcomeMessageFunc(val)
......
......@@ -21,6 +21,7 @@ type Service interface {
Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnecter
Peers() []Peer
BlocklistedPeers() ([]Peer, error)
Addresses() ([]ma.Multiaddr, error)
SetNotifier(Notifier)
}
......
......@@ -17,7 +17,7 @@ var _ storage.StateStorer = (*store)(nil)
type store struct {
store map[string][]byte
mtx sync.Mutex
mtx sync.RWMutex
}
func NewStateStore() storage.StateStorer {
......@@ -27,8 +27,8 @@ func NewStateStore() storage.StateStorer {
}
func (s *store) Get(key string, i interface{}) (err error) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.mtx.RLock()
defer s.mtx.RUnlock()
data, ok := s.store[key]
if !ok {
......@@ -68,8 +68,8 @@ func (s *store) Delete(key string) (err error) {
}
func (s *store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err error) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.mtx.RLock()
defer s.mtx.RUnlock()
for k, v := range s.store {
if !strings.HasPrefix(k, prefix) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment