Commit 17520e44 authored by protolambda's avatar protolambda

op-node: generalize score book and implement ban expiry books with ban util funcs

parent 95978376
...@@ -4,7 +4,12 @@ import ( ...@@ -4,7 +4,12 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net"
"strconv" "strconv"
"time"
"github.com/libp2p/go-libp2p/core/peer"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
...@@ -29,6 +34,8 @@ type NodeP2P struct { ...@@ -29,6 +34,8 @@ type NodeP2P struct {
gater gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled gater gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
scorer Scorer // writes score-updates to the peerstore and keeps metrics of score changes scorer Scorer // writes score-updates to the peerstore and keeps metrics of score changes
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
store store.ExtendedPeerstore // peerstore of host, with extra bindings for scoring and banning
log log.Logger
// the below components are all optional, and may be nil. They require the host to not be nil. // the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local *enode.LocalNode // p2p discovery identity dv5Local *enode.LocalNode // p2p discovery identity
dv5Udp *discover.UDPv5 // p2p discovery service dv5Udp *discover.UDPv5 // p2p discovery service
...@@ -61,6 +68,8 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. ...@@ -61,6 +68,8 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error { func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
bwc := p2pmetrics.NewBandwidthCounter() bwc := p2pmetrics.NewBandwidthCounter()
n.log = log
var err error var err error
// nil if disabled. // nil if disabled.
n.host, err = setup.Host(log, bwc, metrics) n.host, err = setup.Host(log, bwc, metrics)
...@@ -105,6 +114,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -105,6 +114,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if !ok { if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err) return fmt.Errorf("cannot init without extended peerstore: %w", err)
} }
n.store = eps
n.scorer = NewScorer(rollupCfg, eps, metrics, setup.PeerBandScorer(), log) n.scorer = NewScorer(rollupCfg, eps, metrics, setup.PeerBandScorer(), log)
n.host.Network().Notify(&network.NotifyBundle{ n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, conn network.Conn) { ConnectedF: func(_ network.Network, conn network.Conn) {
...@@ -184,6 +194,42 @@ func (n *NodeP2P) ConnectionManager() connmgr.ConnManager { ...@@ -184,6 +194,42 @@ func (n *NodeP2P) ConnectionManager() connmgr.ConnManager {
return n.connMgr return n.connMgr
} }
func (n *NodeP2P) BanPeer(id peer.ID, expiration time.Time) error {
if err := n.gater.BlockPeer(id); err != nil {
return fmt.Errorf("failed to block peer: %w", err)
}
if err := n.store.SetPeerBanExpiration(id, expiration); err != nil {
return fmt.Errorf("failed to set peer ban expiry: %w", err)
}
if err := n.host.Network().ClosePeer(id); err != nil {
return fmt.Errorf("failed to close peer connection: %w", err)
}
return nil
}
func (n *NodeP2P) BanIP(ip net.IP, expiration time.Time) error {
if err := n.gater.BlockAddr(ip); err != nil {
return fmt.Errorf("failed to block IP: %w", err)
}
if err := n.store.SetIPBanExpiration(ip, expiration); err != nil {
return fmt.Errorf("failed to set IP ban expiry: %w", err)
}
// kick all peers that match this IP
for _, conn := range n.host.Network().Conns() {
addr := conn.RemoteMultiaddr()
remoteIP, err := manet.ToIP(addr)
if err != nil {
continue
}
if remoteIP.Equal(ip) {
if err := conn.Close(); err != nil {
n.log.Error("failed to close connection to peer with banned IP", "peer", conn.RemotePeer(), "ip", ip)
}
}
}
return nil
}
func (n *NodeP2P) Close() error { func (n *NodeP2P) Close() error {
var result *multierror.Error var result *multierror.Error
if n.dv5Udp != nil { if n.dv5Udp != nil {
......
...@@ -15,6 +15,8 @@ type extendedStore struct { ...@@ -15,6 +15,8 @@ type extendedStore struct {
peerstore.Peerstore peerstore.Peerstore
peerstore.CertifiedAddrBook peerstore.CertifiedAddrBook
*scoreBook *scoreBook
*peerBanBook
*ipBanBook
} }
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) { func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
...@@ -27,10 +29,22 @@ func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Cl ...@@ -27,10 +29,22 @@ func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Cl
return nil, fmt.Errorf("create scorebook: %w", err) return nil, fmt.Errorf("create scorebook: %w", err)
} }
sb.startGC() sb.startGC()
pb, err := newPeerBanBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create peer ban book: %w", err)
}
pb.startGC()
ib, err := newIPBanBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create IP ban book: %w", err)
}
ib.startGC()
return &extendedStore{ return &extendedStore{
Peerstore: ps, Peerstore: ps,
CertifiedAddrBook: cab, CertifiedAddrBook: cab,
scoreBook: sb, scoreBook: sb,
peerBanBook: pb,
ipBanBook: ib,
}, nil }, nil
} }
......
package store package store
import ( import (
"errors"
"net"
"time"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
) )
...@@ -44,9 +48,29 @@ type ScoreDiff interface { ...@@ -44,9 +48,29 @@ type ScoreDiff interface {
Apply(score *scoreRecord) Apply(score *scoreRecord)
} }
var UnknownBanErr = errors.New("unknown ban")
type PeerBanStore interface {
// SetPeerBanExpiration create the peer ban with expiration time.
// If expiry == time.Time{} then the ban is deleted.
SetPeerBanExpiration(id peer.ID, expiry time.Time) error
// GetPeerBanExpiration gets the peer ban expiration time, or UnknownBanErr error if none exists.
GetPeerBanExpiration(id peer.ID) (time.Time, error)
}
type IPBanStore interface {
// SetIPBanExpiration create the IP ban with expiration time.
// If expiry == time.Time{} then the ban is deleted.
SetIPBanExpiration(ip net.IP, expiry time.Time) error
// GetIPBanExpiration gets the IP ban expiration time, or UnknownBanErr error if none exists.
GetIPBanExpiration(ip net.IP) (time.Time, error)
}
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore // ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
type ExtendedPeerstore interface { type ExtendedPeerstore interface {
peerstore.Peerstore peerstore.Peerstore
ScoreDatastore ScoreDatastore
peerstore.CertifiedAddrBook peerstore.CertifiedAddrBook
PeerBanStore
IPBanStore
} }
package store
import (
"context"
"encoding/json"
"net"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
)
const (
ipBanCacheSize = 100
ipBanRecordExpiration = time.Hour * 24 * 7
)
var ipBanExpirationsBase = ds.NewKey("/ips/ban_expiration")
type ipBanRecord struct {
Expiry int64 `json:"expiry"` // unix timestamp in seconds
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
}
func (s *ipBanRecord) SetLastUpdated(t time.Time) {
s.LastUpdate = t.Unix()
}
func (s *ipBanRecord) LastUpdated() time.Time {
return time.Unix(s.LastUpdate, 0)
}
func (s *ipBanRecord) MarshalBinary() (data []byte, err error) {
return json.Marshal(s)
}
func (s *ipBanRecord) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, s)
}
type ipBanUpdate time.Time
func (p ipBanUpdate) Apply(rec *ipBanRecord) {
rec.Expiry = time.Time(p).Unix()
}
type ipBanBook struct {
book *recordsBook[string, *ipBanRecord]
}
func newIPBanRecord() *ipBanRecord {
return new(ipBanRecord)
}
func ipKey(ip string) ds.Key {
return ds.NewKey(ip)
}
func newIPBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*ipBanBook, error) {
book, err := newRecordsBook[string, *ipBanRecord](ctx, logger, clock, store, ipBanCacheSize, ipBanRecordExpiration, ipBanExpirationsBase, newIPBanRecord, ipKey)
if err != nil {
return nil, err
}
return &ipBanBook{book: book}, nil
}
func (d *ipBanBook) startGC() {
d.book.startGC()
}
func (d *ipBanBook) GetIPBanExpiration(ip net.IP) (time.Time, error) {
rec, err := d.book.getRecord(ip.To16().String())
if err == UnknownRecordErr {
return time.Time{}, UnknownBanErr
}
if err != nil {
return time.Time{}, err
}
return time.Unix(rec.Expiry, 0), nil
}
func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) error {
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(ip.To16().String())
}
return d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime))
}
func (d *ipBanBook) Close() {
d.book.Close()
}
package store
import (
"context"
"encoding/json"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
peerBanCacheSize = 100
peerBanRecordExpiration = time.Hour * 24 * 7
)
var peerBanExpirationsBase = ds.NewKey("/peers/ban_expiration")
type peerBanRecord struct {
Expiry int64 `json:"expiry"` // unix timestamp in seconds
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
}
func (s *peerBanRecord) SetLastUpdated(t time.Time) {
s.LastUpdate = t.Unix()
}
func (s *peerBanRecord) LastUpdated() time.Time {
return time.Unix(s.LastUpdate, 0)
}
func (s *peerBanRecord) MarshalBinary() (data []byte, err error) {
return json.Marshal(s)
}
func (s *peerBanRecord) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, s)
}
type peerBanUpdate time.Time
func (p peerBanUpdate) Apply(rec *peerBanRecord) {
rec.Expiry = time.Time(p).Unix()
}
type peerBanBook struct {
book *recordsBook[peer.ID, *peerBanRecord]
}
func newPeerBanRecord() *peerBanRecord {
return new(peerBanRecord)
}
func newPeerBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*peerBanBook, error) {
book, err := newRecordsBook[peer.ID, *peerBanRecord](ctx, logger, clock, store, peerBanCacheSize, peerBanRecordExpiration, peerBanExpirationsBase, newPeerBanRecord, peerIDKey)
if err != nil {
return nil, err
}
return &peerBanBook{book: book}, nil
}
func (d *peerBanBook) startGC() {
d.book.startGC()
}
func (d *peerBanBook) GetPeerBanExpiration(id peer.ID) (time.Time, error) {
rec, err := d.book.getRecord(id)
if err == UnknownRecordErr {
return time.Time{}, UnknownBanErr
}
if err != nil {
return time.Time{}, err
}
return time.Unix(rec.Expiry, 0), nil
}
func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time) error {
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(id)
}
return d.book.SetRecord(id, peerBanUpdate(expirationTime))
}
func (d *peerBanBook) Close() {
d.book.Close()
}
package store
import (
"context"
"encoding"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
)
const (
maxPruneBatchSize = 20
)
type record interface {
SetLastUpdated(time.Time)
LastUpdated() time.Time
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
type recordDiff[V record] interface {
Apply(v V)
}
var UnknownRecordErr = errors.New("unknown record")
// recordsBook is a generic K-V store to embed in the extended-peerstore.
// It prunes old entries to keep the store small.
// The recordsBook can be wrapped to customize typing more.
type recordsBook[K ~string, V record] struct {
ctx context.Context
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[K, V]
newRecord func() V
dsBaseKey ds.Key
dsEntryKey func(K) ds.Key
recordExpiry time.Duration // pruning is disabled if this is 0
sync.RWMutex
}
func newRecordsBook[K ~string, V record](ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, cacheSize int, recordExpiry time.Duration,
dsBaseKey ds.Key, newRecord func() V, dsEntryKey func(K) ds.Key) (*recordsBook[K, V], error) {
cache, err := lru.New[K, V](cacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create records cache: %w", err)
}
ctx, cancelFn := context.WithCancel(ctx)
book := &recordsBook[K, V]{
ctx: ctx,
cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
newRecord: newRecord,
dsBaseKey: dsBaseKey,
dsEntryKey: dsEntryKey,
recordExpiry: recordExpiry,
}
return book, nil
}
func (d *recordsBook[K, V]) startGC() {
if d.recordExpiry == 0 {
return
}
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}
func (d *recordsBook[K, V]) GetRecord(key K) (V, error) {
d.RLock()
defer d.RUnlock()
rec, err := d.getRecord(key)
return rec, err
}
func (d *recordsBook[K, V]) dsKey(key K) ds.Key {
return d.dsBaseKey.Child(d.dsEntryKey(key))
}
func (d *recordsBook[K, V]) deleteRecord(key K) error {
d.cache.Remove(key)
err := d.store.Delete(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to delete entry with key %v: %w", key, err)
}
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok {
return val, nil
}
data, err := d.store.Get(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return v, UnknownRecordErr
} else if err != nil {
return v, fmt.Errorf("failed to load value of key %v: %w", key, err)
}
v = d.newRecord()
if err := v.UnmarshalBinary(data); err != nil {
return v, fmt.Errorf("invalid value for key %v: %w", key, err)
}
d.cache.Add(key, v)
return v, nil
}
func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error {
d.Lock()
defer d.Unlock()
rec, err := d.getRecord(key)
if err == UnknownRecordErr { // instantiate new record if it does not exist yet
rec = d.newRecord()
} else if err != nil {
return err
}
rec.SetLastUpdated(d.clock.Now())
diff.Apply(rec)
data, err := rec.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to encode record for key %v: %w", key, err)
}
err = d.store.Put(d.ctx, d.dsKey(key), data)
if err != nil {
return fmt.Errorf("storing updated record for key %v: %w", key, err)
}
d.cache.Add(key, rec)
return nil
}
// prune deletes entries from the store that are older than the configured prune expiration.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
func (d *recordsBook[K, V]) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: d.dsBaseKey.String(),
})
if err != nil {
return err
}
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil {
return err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
v := d.newRecord()
if err := v.UnmarshalBinary(result.Value); err != nil {
return err
}
if v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
}
return nil
}
func (d *recordsBook[K, V]) Close() {
d.cancelFn()
d.bgTasks.Wait()
}
...@@ -2,25 +2,18 @@ package store ...@@ -2,25 +2,18 @@ package store
import ( import (
"context" "context"
"errors"
"fmt"
"sync"
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-base32" "github.com/multiformats/go-base32"
) )
const ( const (
scoreDataV0 = "0" scoreCacheSize = 100
scoreCacheSize = 100 scoreRecordExpiryPeriod = 24 * time.Hour
expiryPeriod = 24 * time.Hour
maxPruneBatchSize = 20
) )
var scoresBase = ds.NewKey("/peers/scores") var scoresBase = ds.NewKey("/peers/scores")
...@@ -30,143 +23,66 @@ type scoreRecord struct { ...@@ -30,143 +23,66 @@ type scoreRecord struct {
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
} }
type scoreBook struct { func (s *scoreRecord) SetLastUpdated(t time.Time) {
ctx context.Context s.LastUpdate = t.Unix()
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[peer.ID, scoreRecord]
sync.RWMutex
} }
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) { func (s *scoreRecord) LastUpdated() time.Time {
cache, err := lru.New[peer.ID, scoreRecord](scoreCacheSize) return time.Unix(s.LastUpdate, 0)
}
func (s *scoreRecord) MarshalBinary() (data []byte, err error) {
return serializeScoresV0(*s)
}
func (s *scoreRecord) UnmarshalBinary(data []byte) error {
r, err := deserializeScoresV0(data)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating cache: %w", err) return err
} }
*s = r
return nil
}
ctx, cancelFn := context.WithCancel(ctx) type scoreBook struct {
book := scoreBook{ book *recordsBook[peer.ID, *scoreRecord]
ctx: ctx,
cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
}
return &book, nil
} }
func (d *scoreBook) startGC() { func newScoreRecord() *scoreRecord {
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune) return new(scoreRecord)
} }
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) { func peerIDKey(id peer.ID) ds.Key {
d.RLock() return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id)))
defer d.RUnlock()
record, err := d.getRecord(id)
if err != nil {
return PeerScores{}, nil
}
return record.PeerScores, err
} }
func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) { func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
if scores, ok := d.cache.Get(id); ok { book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, scoreRecordExpiryPeriod, scoresBase, newScoreRecord, peerIDKey)
return scores, nil
}
data, err := d.store.Get(d.ctx, scoreKey(id))
if errors.Is(err, ds.ErrNotFound) {
return scoreRecord{}, nil
} else if err != nil {
return scoreRecord{}, fmt.Errorf("load scores for peer %v: %w", id, err)
}
record, err := deserializeScoresV0(data)
if err != nil { if err != nil {
return scoreRecord{}, fmt.Errorf("invalid score data for peer %v: %w", id, err) return nil, err
} }
d.cache.Add(id, record) return &scoreBook{book: book}, nil
return record, nil
} }
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error { func (d *scoreBook) startGC() {
d.Lock() d.book.startGC()
defer d.Unlock()
scores, err := d.getRecord(id)
if err != nil {
return err
}
scores.LastUpdate = d.clock.Now().Unix()
diff.Apply(&scores)
data, err := serializeScoresV0(scores)
if err != nil {
return fmt.Errorf("encode scores for peer %v: %w", id, err)
}
err = d.store.Put(d.ctx, scoreKey(id), data)
if err != nil {
return fmt.Errorf("storing updated scores for peer %v: %w", id, err)
}
d.cache.Add(id, scores)
return nil
} }
// prune deletes entries from the store that are older than expiryPeriod. func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present record, err := d.book.getRecord(id)
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after if err == UnknownRecordErr {
// having been deleted from the database. return PeerScores{}, nil // return zeroed scores by default
func (d *scoreBook) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: scoresBase.String(),
})
if err != nil {
return err
} }
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil { if err != nil {
return err return PeerScores{}, err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
record, err := deserializeScoresV0(result.Value)
if err != nil {
return err
}
if time.Unix(record.LastUpdate, 0).Add(expiryPeriod).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
} }
return nil return record.PeerScores, nil
} }
func (d *scoreBook) Close() { func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error {
d.cancelFn() return d.book.SetRecord(id, diff)
d.bgTasks.Wait()
} }
func scoreKey(id peer.ID) ds.Key { func (d *scoreBook) Close() {
return scoresBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(id))).ChildString(scoreDataV0) d.book.Close()
} }
...@@ -110,19 +110,19 @@ func TestPrune(t *testing.T) { ...@@ -110,19 +110,19 @@ func TestPrune(t *testing.T) {
require.True(t, hasScoreRecorded("dddd")) require.True(t, hasScoreRecorded("dddd"))
elapsedTime := clock.Now().Sub(firstStore) elapsedTime := clock.Now().Sub(firstStore)
timeToFirstExpiry := expiryPeriod - elapsedTime timeToFirstExpiry := book.book.recordExpiry - elapsedTime
// Advance time until the score for aaaa should be pruned. // Advance time until the score for aaaa should be pruned.
clock.AdvanceTime(timeToFirstExpiry + 1) clock.AdvanceTime(timeToFirstExpiry + 1)
require.NoError(t, book.prune()) require.NoError(t, book.book.prune())
// Clear the cache so reads have to come from the database // Clear the cache so reads have to come from the database
book.cache.Purge() book.book.cache.Purge()
require.False(t, hasScoreRecorded("aaaa"), "should have pruned aaaa record") require.False(t, hasScoreRecorded("aaaa"), "should have pruned aaaa record")
// Advance time so cccc, dddd and the original bbbb entry should be pruned // Advance time so cccc, dddd and the original bbbb entry should be pruned
clock.AdvanceTime(90 * time.Minute) clock.AdvanceTime(90 * time.Minute)
require.NoError(t, book.prune()) require.NoError(t, book.book.prune())
// Clear the cache so reads have to come from the database // Clear the cache so reads have to come from the database
book.cache.Purge() book.book.cache.Purge()
require.False(t, hasScoreRecorded("cccc"), "should have pruned cccc record") require.False(t, hasScoreRecorded("cccc"), "should have pruned cccc record")
require.False(t, hasScoreRecorded("dddd"), "should have pruned cccc record") require.False(t, hasScoreRecorded("dddd"), "should have pruned cccc record")
...@@ -149,10 +149,10 @@ func TestPruneMultipleBatches(t *testing.T) { ...@@ -149,10 +149,10 @@ func TestPruneMultipleBatches(t *testing.T) {
for i := 0; i < peerCount; i++ { for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45})) require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45}))
} }
clock.AdvanceTime(expiryPeriod + 1) clock.AdvanceTime(book.book.recordExpiry + 1)
require.NoError(t, book.prune()) require.NoError(t, book.book.prune())
// Clear the cache so reads have to come from the database // Clear the cache so reads have to come from the database
book.cache.Purge() book.book.cache.Purge()
for i := 0; i < peerCount; i++ { for i := 0; i < peerCount; i++ {
require.Falsef(t, hasScoreRecorded(peer.ID(strconv.Itoa(i))), "Should prune record peer %v", i) require.Falsef(t, hasScoreRecorded(peer.ID(strconv.Itoa(i))), "Should prune record peer %v", i)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment