Commit 13c3e1a6 authored by Adrian Sutton's avatar Adrian Sutton

op-node: Periodically prune old peer score entries.

parent df11f76b
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
) )
...@@ -15,15 +17,16 @@ type extendedStore struct { ...@@ -15,15 +17,16 @@ type extendedStore struct {
*scoreBook *scoreBook
} }
func NewExtendedPeerstore(ctx context.Context, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) { func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps) cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok { if !ok {
return nil, errors.New("peerstore should also be a certified address book") return nil, errors.New("peerstore should also be a certified address book")
} }
sb, err := newScoreBook(ctx, store) sb, err := newScoreBook(ctx, logger, clock, store)
if err != nil { if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err) return nil, fmt.Errorf("create scorebook: %w", err)
} }
sb.startGC()
return &extendedStore{ return &extendedStore{
Peerstore: ps, Peerstore: ps,
CertifiedAddrBook: cab, CertifiedAddrBook: cab,
...@@ -31,4 +34,9 @@ func NewExtendedPeerstore(ctx context.Context, ps peerstore.Peerstore, store ds. ...@@ -31,4 +34,9 @@ func NewExtendedPeerstore(ctx context.Context, ps peerstore.Peerstore, store ds.
}, nil }, nil
} }
func (s *extendedStore) Close() error {
s.scoreBook.Close()
return s.Peerstore.Close()
}
var _ ExtendedPeerstore = (*extendedStore)(nil) var _ ExtendedPeerstore = (*extendedStore)(nil)
package store
import (
"context"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
)
const (
gcPeriod = 2 * time.Hour
)
type gcAction func() error
func startGc(ctx context.Context, logger log.Logger, clock clock.Clock, bgTasks *sync.WaitGroup, action gcAction) {
bgTasks.Add(1)
go func() {
defer bgTasks.Done()
gcTimer := clock.NewTicker(gcPeriod)
defer gcTimer.Stop()
for {
select {
case <-gcTimer.Ch():
if err := action(); err != nil {
logger.Warn("GC failed", "err", err)
}
case <-ctx.Done():
return
}
}
}()
}
package store
import (
"context"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestScheduleGcPeriodically(t *testing.T) {
var bgTasks sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
// Wait for the gc background process to complete after cancelling the context
bgTasks.Wait()
}()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(5000))
called := make(chan struct{}, 10)
action := func() error {
called <- struct{}{}
return nil
}
waitForGc := func(failMsg string) {
timeout, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
select {
case <-timeout.Done():
t.Fatal(failMsg)
case <-called:
require.Len(t, called, 0, "should only run once after gc period")
}
}
startGc(ctx, logger, clock, &bgTasks, action)
timeout, tCancel := context.WithTimeout(ctx, 10*time.Second)
defer tCancel()
require.True(t, clock.WaitForNewPendingTask(timeout), "did not schedule pending GC")
require.Len(t, called, 0, "should not run immediately")
clock.AdvanceTime(gcPeriod)
waitForGc("should run gc after first time period")
clock.AdvanceTime(gcPeriod)
waitForGc("should run gc again after second time period")
}
...@@ -5,71 +5,101 @@ import ( ...@@ -5,71 +5,101 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-base32" "github.com/multiformats/go-base32"
) )
type scoreBook struct { const (
ctx context.Context scoreDataV0 = "0"
store ds.Batching scoreCacheSize = 100
cache *lru.Cache[peer.ID, PeerScores] expiryPeriod = 24 * time.Hour
sync.RWMutex maxPruneBatchSize = 20
} )
var scoresBase = ds.NewKey("/peers/scores") var scoresBase = ds.NewKey("/peers/scores")
const ( type scoreRecord struct {
scoreDataV0 = "0" PeerScores
scoreCacheSize = 100 lastUpdate time.Time
) }
type scoreBook struct {
ctx context.Context
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[peer.ID, scoreRecord]
sync.RWMutex
}
func newScoreBook(ctx context.Context, store ds.Batching) (*scoreBook, error) { func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
cache, err := lru.New[peer.ID, PeerScores](scoreCacheSize) cache, err := lru.New[peer.ID, scoreRecord](scoreCacheSize)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating cache: %w", err) return nil, fmt.Errorf("creating cache: %w", err)
} }
return &scoreBook{
ctx: ctx, ctx, cancelFn := context.WithCancel(ctx)
store: store, book := scoreBook{
cache: cache, ctx: ctx,
}, nil cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
}
return &book, nil
}
func (d *scoreBook) startGC() {
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
} }
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) { func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
return d.getPeerScoresNoLock(id) record, err := d.getRecord(id)
if err != nil {
return PeerScores{}, nil
}
return record.PeerScores, err
} }
func (d *scoreBook) getPeerScoresNoLock(id peer.ID) (PeerScores, error) { func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) {
scores, ok := d.cache.Get(id) if scores, ok := d.cache.Get(id); ok {
if ok {
return scores, nil return scores, nil
} }
data, err := d.store.Get(d.ctx, scoreKey(id)) data, err := d.store.Get(d.ctx, scoreKey(id))
if errors.Is(err, ds.ErrNotFound) { if errors.Is(err, ds.ErrNotFound) {
return PeerScores{}, nil return scoreRecord{}, nil
} else if err != nil { } else if err != nil {
return PeerScores{}, fmt.Errorf("load scores for peer %v: %w", id, err) return scoreRecord{}, fmt.Errorf("load scores for peer %v: %w", id, err)
} }
scores, err = deserializeScoresV0(data) record, err := deserializeScoresV0(data)
if err != nil { if err != nil {
return PeerScores{}, fmt.Errorf("invalid score data for peer %v: %w", id, err) return scoreRecord{}, fmt.Errorf("invalid score data for peer %v: %w", id, err)
} }
d.cache.Add(id, scores) d.cache.Add(id, record)
return scores, nil return record, nil
} }
func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) error { func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
scores, err := d.getPeerScoresNoLock(id) scores, err := d.getRecord(id)
if err != nil { if err != nil {
return err return err
} }
scores.lastUpdate = d.clock.Now()
scores.Gossip = score
switch scoreType { switch scoreType {
case TypeGossip: case TypeGossip:
scores.Gossip = score scores.Gossip = score
...@@ -88,6 +118,61 @@ func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) err ...@@ -88,6 +118,61 @@ func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) err
return nil return nil
} }
// prune deletes entries from the store that are older than expiryPeriod.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
func (d *scoreBook) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: scoresBase.String(),
})
if err != nil {
return err
}
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil {
return err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
record, err := deserializeScoresV0(result.Value)
if err != nil {
return err
}
if record.lastUpdate.Add(expiryPeriod).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
}
return nil
}
func (d *scoreBook) Close() {
d.cancelFn()
d.bgTasks.Wait()
}
func scoreKey(id peer.ID) ds.Key { func scoreKey(id peer.ID) ds.Key {
return scoresBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(id))).ChildString(scoreDataV0) return scoresBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(id))).ChildString(scoreDataV0)
} }
...@@ -2,8 +2,13 @@ package store ...@@ -2,8 +2,13 @@ package store
import ( import (
"context" "context"
"strconv"
"testing" "testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
...@@ -71,6 +76,95 @@ func TestUnknownScoreType(t *testing.T) { ...@@ -71,6 +76,95 @@ func TestUnknownScoreType(t *testing.T) {
require.ErrorContains(t, err, "unknown score type") require.ErrorContains(t, err, "unknown score type")
} }
func TestCloseCompletes(t *testing.T) {
store := createMemoryStore(t)
require.NoError(t, store.Close())
}
func TestPrune(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
store := sync.MutexWrap(ds.NewMapDatastore())
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, store)
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
scores, err := book.GetPeerScores(id)
require.NoError(t, err)
return scores != PeerScores{}
}
firstStore := clock.Now()
// Set some scores all 30 minutes apart so they have different expiry times
require.NoError(t, book.SetScore("aaaa", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("bbbb", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("cccc", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("dddd", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
// Update bbbb again which should extend its expiry
require.NoError(t, book.SetScore("bbbb", TypeGossip, 123.45))
require.True(t, hasScoreRecorded("aaaa"))
require.True(t, hasScoreRecorded("bbbb"))
require.True(t, hasScoreRecorded("cccc"))
require.True(t, hasScoreRecorded("dddd"))
elapsedTime := clock.Now().Sub(firstStore)
timeToFirstExpiry := expiryPeriod - elapsedTime
// Advance time until the score for aaaa should be pruned.
clock.AdvanceTime(timeToFirstExpiry + 1)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
require.False(t, hasScoreRecorded("aaaa"), "should have pruned aaaa record")
// Advance time so cccc, dddd and the original bbbb entry should be pruned
clock.AdvanceTime(90 * time.Minute)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
require.False(t, hasScoreRecorded("cccc"), "should have pruned cccc record")
require.False(t, hasScoreRecorded("dddd"), "should have pruned cccc record")
require.True(t, hasScoreRecorded("bbbb"), "should not prune bbbb record")
}
func TestPruneMultipleBatches(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
scores, err := book.GetPeerScores(id)
require.NoError(t, err)
return scores != PeerScores{}
}
// Set scores for more peers than the max batch size
peerCount := maxPruneBatchSize*3 + 5
for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), TypeGossip, 123.45))
}
clock.AdvanceTime(expiryPeriod + 1)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
for i := 0; i < peerCount; i++ {
require.Falsef(t, hasScoreRecorded(peer.ID(strconv.Itoa(i))), "Should prune record peer %v", i)
}
}
func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) { func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id) result, err := store.GetPeerScores(id)
require.NoError(t, err) require.NoError(t, err)
...@@ -85,7 +179,12 @@ func createMemoryStore(t *testing.T) ExtendedPeerstore { ...@@ -85,7 +179,12 @@ func createMemoryStore(t *testing.T) ExtendedPeerstore {
func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) ExtendedPeerstore { func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) ExtendedPeerstore {
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts()) ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore") require.NoError(t, err, "Failed to create peerstore")
eps, err := NewExtendedPeerstore(context.Background(), ps, store) logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() {
_ = eps.Close()
})
return eps return eps
} }
...@@ -3,23 +3,34 @@ package store ...@@ -3,23 +3,34 @@ package store
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"time"
) )
func serializeScoresV0(scores PeerScores) ([]byte, error) { func serializeScoresV0(scores scoreRecord) ([]byte, error) {
var b bytes.Buffer var b bytes.Buffer
err := binary.Write(&b, binary.BigEndian, scores.Gossip) err := binary.Write(&b, binary.BigEndian, scores.lastUpdate.UnixMilli())
if err != nil {
return nil, err
}
err = binary.Write(&b, binary.BigEndian, scores.Gossip)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return b.Bytes(), nil return b.Bytes(), nil
} }
func deserializeScoresV0(data []byte) (PeerScores, error) { func deserializeScoresV0(data []byte) (scoreRecord, error) {
var scores PeerScores var scores scoreRecord
r := bytes.NewReader(data) r := bytes.NewReader(data)
err := binary.Read(r, binary.BigEndian, &scores.Gossip) var lastUpdate int64
err := binary.Read(r, binary.BigEndian, &lastUpdate)
if err != nil {
return scoreRecord{}, err
}
scores.lastUpdate = time.UnixMilli(lastUpdate)
err = binary.Read(r, binary.BigEndian, &scores.Gossip)
if err != nil { if err != nil {
return PeerScores{}, err return scoreRecord{}, err
} }
return scores, nil return scores, nil
} }
package store package store
import ( import (
"strconv"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestRoundtripScoresV0(t *testing.T) { func TestRoundtripScoresV0(t *testing.T) {
scores := PeerScores{ scores := scoreRecord{
Gossip: 1234.52382, PeerScores: PeerScores{Gossip: 1234.52382},
lastUpdate: time.UnixMilli(1923841),
} }
data, err := serializeScoresV0(scores) data, err := serializeScoresV0(scores)
require.NoError(t, err) require.NoError(t, err)
...@@ -24,19 +27,20 @@ func TestRoundtripScoresV0(t *testing.T) { ...@@ -24,19 +27,20 @@ func TestRoundtripScoresV0(t *testing.T) {
// A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized // A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized
func TestParseHistoricSerializationsV0(t *testing.T) { func TestParseHistoricSerializationsV0(t *testing.T) {
tests := []struct { tests := []struct {
name string
data []byte data []byte
expected PeerScores expected scoreRecord
}{ }{
{ {
name: "GossipOnly", data: common.Hex2Bytes("00000000001D5B0140934A18644523F6"),
data: common.Hex2Bytes("40934A18644523F6"), expected: scoreRecord{
expected: PeerScores{Gossip: 1234.52382}, PeerScores: PeerScores{Gossip: 1234.52382},
lastUpdate: time.UnixMilli(1923841),
},
}, },
} }
for _, test := range tests { for idx, test := range tests {
test := test test := test
t.Run(test.name, func(t *testing.T) { t.Run(strconv.Itoa(idx), func(t *testing.T) {
result, err := deserializeScoresV0(test.data) result, err := deserializeScoresV0(test.data)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, test.expected, result) require.Equal(t, test.expected, result)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment