Commit 06245265 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5792 from ethereum-optimism/aj/expire-scores

op-node: Remove peer scores once the retain period is reached
parents 9b3cfbe3 3f808b44
...@@ -679,7 +679,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) { ...@@ -679,7 +679,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) {
_ = ps.AddPubKey(p, sk.GetPublic()) _ = ps.AddPubKey(p, sk.GetPublic())
ds := sync.MutexWrap(ds.NewMapDatastore()) ds := sync.MutexWrap(ds.NewMapDatastore())
eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds) eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds, 24*time.Hour)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -141,7 +141,8 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host ...@@ -141,7 +141,8 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
return nil, fmt.Errorf("failed to open peerstore: %w", err) return nil, fmt.Errorf("failed to open peerstore: %w", err)
} }
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store) peerScoreParams := conf.PeerScoringParams()
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, peerScoreParams.RetainScore)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open extended peerstore: %w", err) return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
} }
......
...@@ -76,7 +76,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h ...@@ -76,7 +76,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h
log := testlog.Logger(testSuite.T(), log.LvlError) log := testlog.Logger(testSuite.T(), log.LvlError)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
swarm := tswarm.GenSwarm(testSuite.T()) swarm := tswarm.GenSwarm(testSuite.T())
eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore())) eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()), 1*time.Hour)
netw := &customPeerstoreNetwork{swarm, eps} netw := &customPeerstoreNetwork{swarm, eps}
require.NoError(testSuite.T(), err) require.NoError(testSuite.T(), err)
h := bhost.NewBlankHost(netw) h := bhost.NewBlankHost(netw)
...@@ -99,7 +99,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] ...@@ -99,7 +99,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
dataStore := sync.MutexWrap(ds.NewMapDatastore()) dataStore := sync.MutexWrap(ds.NewMapDatastore())
peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts()) peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts())
require.NoError(testSuite.T(), err) require.NoError(testSuite.T(), err)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore) extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore, 1*time.Hour)
require.NoError(testSuite.T(), err) require.NoError(testSuite.T(), err)
scorer := NewScorer( scorer := NewScorer(
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -19,12 +20,12 @@ type extendedStore struct { ...@@ -19,12 +20,12 @@ type extendedStore struct {
*ipBanBook *ipBanBook
} }
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) { func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching, scoreRetention time.Duration) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps) cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok { if !ok {
return nil, errors.New("peerstore should also be a certified address book") return nil, errors.New("peerstore should also be a certified address book")
} }
sb, err := newScoreBook(ctx, logger, clock, store) sb, err := newScoreBook(ctx, logger, clock, store, scoreRetention)
if err != nil { if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err) return nil, fmt.Errorf("create scorebook: %w", err)
} }
......
...@@ -102,6 +102,9 @@ func (d *recordsBook[K, V]) deleteRecord(key K) error { ...@@ -102,6 +102,9 @@ func (d *recordsBook[K, V]) deleteRecord(key K) error {
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok { if val, ok := d.cache.Get(key); ok {
if d.hasExpired(val) {
return v, UnknownRecordErr
}
return val, nil return val, nil
} }
data, err := d.store.Get(d.ctx, d.dsKey(key)) data, err := d.store.Get(d.ctx, d.dsKey(key))
...@@ -114,6 +117,9 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { ...@@ -114,6 +117,9 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if err := v.UnmarshalBinary(data); err != nil { if err := v.UnmarshalBinary(data); err != nil {
return v, fmt.Errorf("invalid value for key %v: %w", key, err) return v, fmt.Errorf("invalid value for key %v: %w", key, err)
} }
if d.hasExpired(v) {
return v, UnknownRecordErr
}
d.cache.Add(key, v) d.cache.Add(key, v)
return v, nil return v, nil
} }
...@@ -142,9 +148,9 @@ func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error { ...@@ -142,9 +148,9 @@ func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error {
} }
// prune deletes entries from the store that are older than the configured prune expiration. // prune deletes entries from the store that are older than the configured prune expiration.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present // Entries that are eligible for deletion may still be present either because the prune function hasn't yet run or
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after // because they are still preserved in the in-memory cache after having been deleted from the database.
// having been deleted from the database. // Such expired entries are filtered out in getRecord
func (d *recordsBook[K, V]) prune() error { func (d *recordsBook[K, V]) prune() error {
results, err := d.store.Query(d.ctx, query.Query{ results, err := d.store.Query(d.ctx, query.Query{
Prefix: d.dsBaseKey.String(), Prefix: d.dsBaseKey.String(),
...@@ -168,7 +174,7 @@ func (d *recordsBook[K, V]) prune() error { ...@@ -168,7 +174,7 @@ func (d *recordsBook[K, V]) prune() error {
if err := v.UnmarshalBinary(result.Value); err != nil { if err := v.UnmarshalBinary(result.Value); err != nil {
return err return err
} }
if v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) { if d.hasExpired(v) {
if pending > maxPruneBatchSize { if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil { if err := batch.Commit(d.ctx); err != nil {
return err return err
...@@ -191,6 +197,10 @@ func (d *recordsBook[K, V]) prune() error { ...@@ -191,6 +197,10 @@ func (d *recordsBook[K, V]) prune() error {
return nil return nil
} }
func (d *recordsBook[K, V]) hasExpired(v V) bool {
return v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now())
}
func (d *recordsBook[K, V]) Close() { func (d *recordsBook[K, V]) Close() {
d.cancelFn() d.cancelFn()
d.bgTasks.Wait() d.bgTasks.Wait()
......
...@@ -12,8 +12,7 @@ import ( ...@@ -12,8 +12,7 @@ import (
) )
const ( const (
scoreCacheSize = 100 scoreCacheSize = 100
scoreRecordExpiryPeriod = 24 * time.Hour
) )
var scoresBase = ds.NewKey("/peers/scores") var scoresBase = ds.NewKey("/peers/scores")
...@@ -56,8 +55,8 @@ func peerIDKey(id peer.ID) ds.Key { ...@@ -56,8 +55,8 @@ func peerIDKey(id peer.ID) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id))) return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id)))
} }
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) { func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, retain time.Duration) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, scoreRecordExpiryPeriod, scoresBase, newScoreRecord, peerIDKey) book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, retain, scoresBase, newScoreRecord, peerIDKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -81,7 +81,7 @@ func TestPrune(t *testing.T) { ...@@ -81,7 +81,7 @@ func TestPrune(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
store := sync.MutexWrap(ds.NewMapDatastore()) store := sync.MutexWrap(ds.NewMapDatastore())
clock := clock.NewDeterministicClock(time.UnixMilli(1000)) clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, store) book, err := newScoreBook(ctx, logger, clock, store, 24*time.Hour)
require.NoError(t, err) require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool { hasScoreRecorded := func(id peer.ID) bool {
...@@ -135,7 +135,7 @@ func TestPruneMultipleBatches(t *testing.T) { ...@@ -135,7 +135,7 @@ func TestPruneMultipleBatches(t *testing.T) {
defer cancelFunc() defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000)) clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore())) book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), 24*time.Hour)
require.NoError(t, err) require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool { hasScoreRecorded := func(id peer.ID) bool {
...@@ -159,6 +159,31 @@ func TestPruneMultipleBatches(t *testing.T) { ...@@ -159,6 +159,31 @@ func TestPruneMultipleBatches(t *testing.T) {
} }
} }
// Check that scores that are eligible for pruning are not returned, even if they haven't yet been removed
func TestIgnoreOutdatedScores(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
retentionPeriod := 24 * time.Hour
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod)
require.NoError(t, err)
require.NoError(t, book.SetScore("a", &GossipScores{Total: 123.45}))
clock.AdvanceTime(retentionPeriod + 1)
// Not available from cache
scores, err := book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})
book.book.cache.Purge()
// Not available from disk
scores, err = book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})
}
func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) { func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id) result, err := store.GetPeerScores(id)
require.NoError(t, err) require.NoError(t, err)
...@@ -174,8 +199,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend ...@@ -174,8 +199,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts()) ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore") require.NoError(t, err, "Failed to create peerstore")
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(100)) c := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store) eps, err := NewExtendedPeerstore(context.Background(), logger, c, ps, store, 24*time.Hour)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
_ = eps.Close() _ = eps.Close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment