Commit e31c3dab authored by Adrian Sutton's avatar Adrian Sutton

op-node: Add peerApplicationScorer

Handles updates for application specific peer scores.
parent 4397661b
package p2p
import (
"context"
"sync"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
)
type ScoreBook interface {
GetPeerScores(id peer.ID) (store.PeerScores, error)
SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error)
}
type peerApplicationScorer struct {
ctx context.Context
cancelFunc context.CancelFunc
log log.Logger
clock clock.Clock
params *ApplicationScoreParams
scorebook ScoreBook
connectedPeers func() []peer.ID
done sync.WaitGroup
}
func newPeerApplicationScorer(ctx context.Context, logger log.Logger, clock clock.Clock, params *ApplicationScoreParams, scorebook ScoreBook, connectedPeers func() []peer.ID) *peerApplicationScorer {
ctx, cancelFunc := context.WithCancel(ctx)
return &peerApplicationScorer{
ctx: ctx,
cancelFunc: cancelFunc,
log: logger,
clock: clock,
params: params,
scorebook: scorebook,
connectedPeers: connectedPeers,
}
}
func (s *peerApplicationScorer) ApplicationScore(id peer.ID) float64 {
scores, err := s.scorebook.GetPeerScores(id)
if err != nil {
s.log.Error("Failed to load peer scores", "peer", id, "err", err)
return 0
}
score := scores.ReqResp.ValidResponses * s.params.ValidResponseWeight
score += scores.ReqResp.ErrorResponses * s.params.ErrorResponseWeight
score += scores.ReqResp.RejectedPayloads * s.params.RejectedPayloadWeight
return score
}
func (s *peerApplicationScorer) onValidResponse(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementValidResponses{Cap: s.params.ValidResponseCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) onResponseError(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementErrorResponses{Cap: s.params.ErrorResponseCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) decayScores(id peer.ID) {
_, err := s.scorebook.SetScore(id, &store.DecayApplicationScores{
ValidResponseDecay: s.params.ValidResponseDecay,
ErrorResponseDecay: s.params.ErrorResponseDecay,
RejectedPayloadDecay: s.params.RejectedPayloadDecay,
DecayToZero: s.params.DecayToZero,
})
if err != nil {
s.log.Error("Unable to decay peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) decayConnectedPeerScores() {
for _, id := range s.connectedPeers() {
s.decayScores(id)
}
}
func (s *peerApplicationScorer) start() {
s.done.Add(1)
go func() {
defer s.done.Done()
ticker := s.clock.NewTicker(s.params.DecayInterval)
for {
select {
case <-s.ctx.Done():
return
case <-ticker.Ch():
s.decayConnectedPeerScores()
}
}
}()
}
func (s *peerApplicationScorer) stop() {
s.cancelFunc()
s.done.Wait()
}
package p2p
import (
"context"
"errors"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
type stubScoreBookUpdate struct {
id peer.ID
diff store.ScoreDiff
}
type stubScoreBook struct {
err error
scores map[peer.ID]store.PeerScores
updates chan stubScoreBookUpdate
}
func (s *stubScoreBook) GetPeerScores(id peer.ID) (store.PeerScores, error) {
if s.err != nil {
return store.PeerScores{}, s.err
}
scores, ok := s.scores[id]
if !ok {
return store.PeerScores{}, nil
}
return scores, nil
}
func (s *stubScoreBook) SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error) {
s.updates <- stubScoreBookUpdate{id, diff}
return s.GetPeerScores(id)
}
type appScoreTestData struct {
ctx context.Context
logger log.Logger
clock *clock.DeterministicClock
peers []peer.ID
scorebook *stubScoreBook
}
func (a *appScoreTestData) WaitForNextScoreBookUpdate(t *testing.T) stubScoreBookUpdate {
ctx, cancelFunc := context.WithTimeout(a.ctx, 30*time.Second)
defer cancelFunc()
select {
case update := <-a.scorebook.updates:
return update
case <-ctx.Done():
t.Fatal("Did not receive expected scorebook update")
return stubScoreBookUpdate{}
}
}
func setupPeerApplicationScorerTest(t *testing.T, params *ApplicationScoreParams) (*appScoreTestData, *peerApplicationScorer) {
data := &appScoreTestData{
ctx: context.Background(),
logger: testlog.Logger(t, log.LvlInfo),
clock: clock.NewDeterministicClock(time.UnixMilli(1000)),
peers: []peer.ID{},
scorebook: &stubScoreBook{
scores: make(map[peer.ID]store.PeerScores),
updates: make(chan stubScoreBookUpdate, 10),
},
}
appScorer := newPeerApplicationScorer(data.ctx, data.logger, data.clock, params, data.scorebook, func() []peer.ID {
return data.peers
})
return data, appScorer
}
func TestIncrementValidResponses(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
ValidResponseCap: 10,
})
appScorer.onValidResponse("aaa")
require.Len(t, data.scorebook.updates, 1)
update := <-data.scorebook.updates
require.Equal(t, stubScoreBookUpdate{peer.ID("aaa"), store.IncrementValidResponses{Cap: 10}}, update)
}
func TestIncrementErrorResponses(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
ErrorResponseCap: 10,
})
appScorer.onResponseError("aaa")
require.Len(t, data.scorebook.updates, 1)
update := <-data.scorebook.updates
require.Equal(t, stubScoreBookUpdate{peer.ID("aaa"), store.IncrementErrorResponses{Cap: 10}}, update)
}
func TestIncrementRejectedPayloads(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
RejectedPayloadCap: 10,
})
appScorer.onRejectedPayload("aaa")
require.Len(t, data.scorebook.updates, 1)
update := <-data.scorebook.updates
require.Equal(t, stubScoreBookUpdate{peer.ID("aaa"), store.IncrementRejectedPayloads{Cap: 10}}, update)
}
func TestApplicationScore(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
ValidResponseWeight: 0.8,
ErrorResponseWeight: 0.6,
RejectedPayloadWeight: 0.4,
})
peerScore := store.PeerScores{
ReqResp: store.ReqRespScores{
ValidResponses: 1,
ErrorResponses: 2,
RejectedPayloads: 3,
},
}
data.scorebook.scores["aaa"] = peerScore
score := appScorer.ApplicationScore("aaa")
require.Equal(t, 1*0.8+2*0.6+3*0.4, score)
}
func TestApplicationScoreZeroWhenScoreDoesNotLoad(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{})
data.scorebook.err = errors.New("boom")
score := appScorer.ApplicationScore("aaa")
require.Zero(t, score)
}
func TestDecayScoresAfterDecayInterval(t *testing.T) {
params := &ApplicationScoreParams{
ValidResponseDecay: 0.8,
ErrorResponseDecay: 0.7,
RejectedPayloadDecay: 0.3,
DecayToZero: 0.1,
DecayInterval: 90 * time.Second,
}
data, appScorer := setupPeerApplicationScorerTest(t, params)
data.peers = []peer.ID{"aaa", "bbb"}
expectedDecay := &store.DecayApplicationScores{
ValidResponseDecay: 0.8,
ErrorResponseDecay: 0.7,
RejectedPayloadDecay: 0.3,
DecayToZero: 0.1,
}
appScorer.start()
defer appScorer.stop()
data.clock.WaitForNewPendingTaskWithTimeout(30 * time.Second)
data.clock.AdvanceTime(params.DecayInterval)
require.Equal(t, stubScoreBookUpdate{id: "aaa", diff: expectedDecay}, data.WaitForNextScoreBookUpdate(t))
require.Equal(t, stubScoreBookUpdate{id: "bbb", diff: expectedDecay}, data.WaitForNextScoreBookUpdate(t))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment