Commit f6ae8938 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into aj/clock

parents 0a6b8bdd 7dea0cf0
...@@ -24,6 +24,7 @@ require ( ...@@ -24,6 +24,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.9.0 github.com/libp2p/go-libp2p-pubsub v0.9.0
github.com/libp2p/go-libp2p-testing v0.12.0 github.com/libp2p/go-libp2p-testing v0.12.0
github.com/mattn/go-isatty v0.0.17 github.com/mattn/go-isatty v0.0.17
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-multiaddr v0.8.0 github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multiaddr-dns v0.3.1 github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
...@@ -128,7 +129,6 @@ require ( ...@@ -128,7 +129,6 @@ require (
github.com/moby/term v0.0.0-20221105221325-4eb28fa6025c // indirect github.com/moby/term v0.0.0-20221105221325-4eb28fa6025c // indirect
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect github.com/multiformats/go-multibase v0.1.1 // indirect
......
package store
import (
"context"
"errors"
"fmt"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peerstore"
)
type extendedStore struct {
peerstore.Peerstore
peerstore.CertifiedAddrBook
*scoreBook
}
func NewExtendedPeerstore(ctx context.Context, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
sb, err := newScoreBook(ctx, store)
if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err)
}
return &extendedStore{
Peerstore: ps,
CertifiedAddrBook: cab,
scoreBook: sb,
}, nil
}
var _ ExtendedPeerstore = (*extendedStore)(nil)
package store
import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
type PeerScores struct {
Gossip float64
}
type ScoreType int
const (
TypeGossip ScoreType = iota
)
// ScoreDatastore defines a type-safe API for getting and setting libp2p peer score information
type ScoreDatastore interface {
// GetPeerScores returns the current scores for the specified peer
GetPeerScores(id peer.ID) (PeerScores, error)
// SetScore stores the latest score for the specified peer and score type
SetScore(id peer.ID, scoreType ScoreType, score float64) error
}
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
type ExtendedPeerstore interface {
peerstore.Peerstore
ScoreDatastore
peerstore.CertifiedAddrBook
}
package store
import (
"context"
"errors"
"fmt"
"sync"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-base32"
)
type scoreBook struct {
ctx context.Context
store ds.Batching
cache *lru.Cache[peer.ID, PeerScores]
sync.RWMutex
}
var scoresBase = ds.NewKey("/peers/scores")
const (
scoreDataV0 = "0"
scoreCacheSize = 100
)
func newScoreBook(ctx context.Context, store ds.Batching) (*scoreBook, error) {
cache, err := lru.New[peer.ID, PeerScores](scoreCacheSize)
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
}
return &scoreBook{
ctx: ctx,
store: store,
cache: cache,
}, nil
}
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
d.RLock()
defer d.RUnlock()
return d.getPeerScoresNoLock(id)
}
func (d *scoreBook) getPeerScoresNoLock(id peer.ID) (PeerScores, error) {
scores, ok := d.cache.Get(id)
if ok {
return scores, nil
}
data, err := d.store.Get(d.ctx, scoreKey(id))
if errors.Is(err, ds.ErrNotFound) {
return PeerScores{}, nil
} else if err != nil {
return PeerScores{}, fmt.Errorf("load scores for peer %v: %w", id, err)
}
scores, err = deserializeScoresV0(data)
if err != nil {
return PeerScores{}, fmt.Errorf("invalid score data for peer %v: %w", id, err)
}
d.cache.Add(id, scores)
return scores, nil
}
func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) error {
d.Lock()
defer d.Unlock()
scores, err := d.getPeerScoresNoLock(id)
if err != nil {
return err
}
switch scoreType {
case TypeGossip:
scores.Gossip = score
default:
return fmt.Errorf("unknown score type: %v", scoreType)
}
data, err := serializeScoresV0(scores)
if err != nil {
return fmt.Errorf("encode scores for peer %v: %w", id, err)
}
err = d.store.Put(d.ctx, scoreKey(id), data)
if err != nil {
return fmt.Errorf("storing updated scores for peer %v: %w", id, err)
}
d.cache.Add(id, scores)
return nil
}
func scoreKey(id peer.ID) ds.Key {
return scoresBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(id))).ChildString(scoreDataV0)
}
package store
import (
"context"
"testing"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
"github.com/stretchr/testify/require"
)
func TestGetEmptyScoreComponents(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
assertPeerScores(t, store, id, PeerScores{})
}
func TestRoundTripGossipScore(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
score := 123.45
err := store.SetScore(id, TypeGossip, score)
require.NoError(t, err)
assertPeerScores(t, store, id, PeerScores{Gossip: score})
}
func TestUpdateGossipScore(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
score := 123.45
require.NoError(t, store.SetScore(id, TypeGossip, 444.223))
require.NoError(t, store.SetScore(id, TypeGossip, score))
assertPeerScores(t, store, id, PeerScores{Gossip: score})
}
func TestStoreScoresForMultiplePeers(t *testing.T) {
id1 := peer.ID("aaaa")
id2 := peer.ID("bbbb")
store := createMemoryStore(t)
score1 := 123.45
score2 := 453.22
require.NoError(t, store.SetScore(id1, TypeGossip, score1))
require.NoError(t, store.SetScore(id2, TypeGossip, score2))
assertPeerScores(t, store, id1, PeerScores{Gossip: score1})
assertPeerScores(t, store, id2, PeerScores{Gossip: score2})
}
func TestPersistData(t *testing.T) {
id := peer.ID("aaaa")
score := 123.45
backingStore := sync.MutexWrap(ds.NewMapDatastore())
store := createPeerstoreWithBacking(t, backingStore)
require.NoError(t, store.SetScore(id, TypeGossip, score))
// Close and recreate a new store from the same backing
require.NoError(t, store.Close())
store = createPeerstoreWithBacking(t, backingStore)
assertPeerScores(t, store, id, PeerScores{Gossip: score})
}
func TestUnknownScoreType(t *testing.T) {
store := createMemoryStore(t)
err := store.SetScore("aaaa", 92832, 244.24)
require.ErrorContains(t, err, "unknown score type")
}
func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id)
require.NoError(t, err)
require.Equal(t, result, expected)
}
func createMemoryStore(t *testing.T) ExtendedPeerstore {
store := sync.MutexWrap(ds.NewMapDatastore())
return createPeerstoreWithBacking(t, store)
}
func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) ExtendedPeerstore {
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore")
eps, err := NewExtendedPeerstore(context.Background(), ps, store)
require.NoError(t, err)
return eps
}
package store
import (
"bytes"
"encoding/binary"
)
func serializeScoresV0(scores PeerScores) ([]byte, error) {
var b bytes.Buffer
err := binary.Write(&b, binary.BigEndian, scores.Gossip)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
func deserializeScoresV0(data []byte) (PeerScores, error) {
var scores PeerScores
r := bytes.NewReader(data)
err := binary.Read(r, binary.BigEndian, &scores.Gossip)
if err != nil {
return PeerScores{}, err
}
return scores, nil
}
package store
import (
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestRoundtripScoresV0(t *testing.T) {
scores := PeerScores{
Gossip: 1234.52382,
}
data, err := serializeScoresV0(scores)
require.NoError(t, err)
result, err := deserializeScoresV0(data)
require.NoError(t, err)
require.Equal(t, scores, result)
}
// TestParseHistoricSerializations checks that existing data can still be deserialized
// Adding new fields should not require bumping the version, only removing fields
// A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized
func TestParseHistoricSerializationsV0(t *testing.T) {
tests := []struct {
name string
data []byte
expected PeerScores
}{
{
name: "GossipOnly",
data: common.Hex2Bytes("40934A18644523F6"),
expected: PeerScores{Gossip: 1234.52382},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
result, err := deserializeScoresV0(test.data)
require.NoError(t, err)
require.Equal(t, test.expected, result)
})
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment