Commit 4741dee0 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6069 from ethereum-optimism/aj/application-scores

op-node: Add application specific peer scores for p2p alt-sync
parents a205f8e3 d1a01a0d
package p2p
import (
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
type ApplicationScoreParams struct {
ValidResponseCap float64
ValidResponseWeight float64
ValidResponseDecay float64
ErrorResponseCap float64
ErrorResponseWeight float64
ErrorResponseDecay float64
RejectedPayloadCap float64
RejectedPayloadWeight float64
RejectedPayloadDecay float64
DecayToZero float64
DecayInterval time.Duration
}
func LightApplicationScoreParams(cfg *rollup.Config) ApplicationScoreParams {
slot := time.Duration(cfg.BlockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
// We initialize an "epoch" as 6 blocks suggesting 6 blocks,
// each taking ~ 2 seconds, is 12 seconds
epoch := 6 * slot
tenEpochs := 10 * epoch
return ApplicationScoreParams{
// Max positive score from valid responses: 5
ValidResponseCap: 10,
ValidResponseWeight: 0.5,
ValidResponseDecay: ScoreDecay(tenEpochs, slot),
// Takes 20 error responses to reach the default ban threshold of -100
// But at most we track 10. These errors include not supporting p2p sync
// so we don't (yet) want to ban a peer based on this measure alone.
ErrorResponseCap: 10,
ErrorResponseWeight: -5,
ErrorResponseDecay: ScoreDecay(tenEpochs, slot),
// Takes 5 rejected payloads to reach the default ban threshold of -100
RejectedPayloadCap: 20,
RejectedPayloadWeight: -20,
RejectedPayloadDecay: ScoreDecay(tenEpochs, slot),
DecayToZero: DecayToZero,
DecayInterval: slot,
}
}
package p2p
import (
"context"
"sync"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
)
type ScoreBook interface {
GetPeerScores(id peer.ID) (store.PeerScores, error)
SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error)
}
type ApplicationScorer interface {
ApplicationScore(id peer.ID) float64
onValidResponse(id peer.ID)
onResponseError(id peer.ID)
onRejectedPayload(id peer.ID)
start()
stop()
}
type peerApplicationScorer struct {
ctx context.Context
cancelFunc context.CancelFunc
log log.Logger
clock clock.Clock
params *ApplicationScoreParams
scorebook ScoreBook
connectedPeers func() []peer.ID
done sync.WaitGroup
}
var _ ApplicationScorer = (*peerApplicationScorer)(nil)
func newPeerApplicationScorer(ctx context.Context, logger log.Logger, clock clock.Clock, params *ApplicationScoreParams, scorebook ScoreBook, connectedPeers func() []peer.ID) *peerApplicationScorer {
ctx, cancelFunc := context.WithCancel(ctx)
return &peerApplicationScorer{
ctx: ctx,
cancelFunc: cancelFunc,
log: logger,
clock: clock,
params: params,
scorebook: scorebook,
connectedPeers: connectedPeers,
}
}
func (s *peerApplicationScorer) ApplicationScore(id peer.ID) float64 {
scores, err := s.scorebook.GetPeerScores(id)
if err != nil {
s.log.Error("Failed to load peer scores", "peer", id, "err", err)
return 0
}
score := scores.ReqResp.ValidResponses * s.params.ValidResponseWeight
score += scores.ReqResp.ErrorResponses * s.params.ErrorResponseWeight
score += scores.ReqResp.RejectedPayloads * s.params.RejectedPayloadWeight
return score
}
func (s *peerApplicationScorer) onValidResponse(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementValidResponses{Cap: s.params.ValidResponseCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) onResponseError(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementErrorResponses{Cap: s.params.ErrorResponseCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) decayScores(id peer.ID) {
_, err := s.scorebook.SetScore(id, &store.DecayApplicationScores{
ValidResponseDecay: s.params.ValidResponseDecay,
ErrorResponseDecay: s.params.ErrorResponseDecay,
RejectedPayloadDecay: s.params.RejectedPayloadDecay,
DecayToZero: s.params.DecayToZero,
})
if err != nil {
s.log.Error("Unable to decay peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) decayConnectedPeerScores() {
for _, id := range s.connectedPeers() {
s.decayScores(id)
}
}
func (s *peerApplicationScorer) start() {
s.done.Add(1)
go func() {
defer s.done.Done()
ticker := s.clock.NewTicker(s.params.DecayInterval)
for {
select {
case <-s.ctx.Done():
return
case <-ticker.Ch():
s.decayConnectedPeerScores()
}
}
}()
}
func (s *peerApplicationScorer) stop() {
s.cancelFunc()
s.done.Wait()
}
type NoopApplicationScorer struct{}
func (n *NoopApplicationScorer) ApplicationScore(_ peer.ID) float64 {
return 0
}
func (n *NoopApplicationScorer) onValidResponse(_ peer.ID) {
}
func (n *NoopApplicationScorer) onResponseError(_ peer.ID) {
}
func (n *NoopApplicationScorer) onRejectedPayload(_ peer.ID) {
}
func (n *NoopApplicationScorer) start() {
}
func (n *NoopApplicationScorer) stop() {
}
var _ ApplicationScorer = (*NoopApplicationScorer)(nil)
package p2p
import (
"context"
"errors"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
type stubScoreBookUpdate struct {
id peer.ID
diff store.ScoreDiff
}
type stubScoreBook struct {
err error
scores map[peer.ID]store.PeerScores
updates chan stubScoreBookUpdate
}
func (s *stubScoreBook) GetPeerScores(id peer.ID) (store.PeerScores, error) {
if s.err != nil {
return store.PeerScores{}, s.err
}
scores, ok := s.scores[id]
if !ok {
return store.PeerScores{}, nil
}
return scores, nil
}
func (s *stubScoreBook) SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error) {
s.updates <- stubScoreBookUpdate{id, diff}
return s.GetPeerScores(id)
}
type appScoreTestData struct {
ctx context.Context
logger log.Logger
clock *clock.DeterministicClock
peers []peer.ID
scorebook *stubScoreBook
}
func (a *appScoreTestData) WaitForNextScoreBookUpdate(t *testing.T) stubScoreBookUpdate {
ctx, cancelFunc := context.WithTimeout(a.ctx, 30*time.Second)
defer cancelFunc()
select {
case update := <-a.scorebook.updates:
return update
case <-ctx.Done():
t.Fatal("Did not receive expected scorebook update")
return stubScoreBookUpdate{}
}
}
func setupPeerApplicationScorerTest(t *testing.T, params *ApplicationScoreParams) (*appScoreTestData, *peerApplicationScorer) {
data := &appScoreTestData{
ctx: context.Background(),
logger: testlog.Logger(t, log.LvlInfo),
clock: clock.NewDeterministicClock(time.UnixMilli(1000)),
peers: []peer.ID{},
scorebook: &stubScoreBook{
scores: make(map[peer.ID]store.PeerScores),
updates: make(chan stubScoreBookUpdate, 10),
},
}
appScorer := newPeerApplicationScorer(data.ctx, data.logger, data.clock, params, data.scorebook, func() []peer.ID {
return data.peers
})
return data, appScorer
}
func TestIncrementValidResponses(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
ValidResponseCap: 10,
})
appScorer.onValidResponse("aaa")
require.Len(t, data.scorebook.updates, 1)
update := <-data.scorebook.updates
require.Equal(t, stubScoreBookUpdate{peer.ID("aaa"), store.IncrementValidResponses{Cap: 10}}, update)
}
func TestIncrementErrorResponses(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
ErrorResponseCap: 10,
})
appScorer.onResponseError("aaa")
require.Len(t, data.scorebook.updates, 1)
update := <-data.scorebook.updates
require.Equal(t, stubScoreBookUpdate{peer.ID("aaa"), store.IncrementErrorResponses{Cap: 10}}, update)
}
func TestIncrementRejectedPayloads(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
RejectedPayloadCap: 10,
})
appScorer.onRejectedPayload("aaa")
require.Len(t, data.scorebook.updates, 1)
update := <-data.scorebook.updates
require.Equal(t, stubScoreBookUpdate{peer.ID("aaa"), store.IncrementRejectedPayloads{Cap: 10}}, update)
}
func TestApplicationScore(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{
ValidResponseWeight: 0.8,
ErrorResponseWeight: 0.6,
RejectedPayloadWeight: 0.4,
})
peerScore := store.PeerScores{
ReqResp: store.ReqRespScores{
ValidResponses: 1,
ErrorResponses: 2,
RejectedPayloads: 3,
},
}
data.scorebook.scores["aaa"] = peerScore
score := appScorer.ApplicationScore("aaa")
require.Equal(t, 1*0.8+2*0.6+3*0.4, score)
}
func TestApplicationScoreZeroWhenScoreDoesNotLoad(t *testing.T) {
data, appScorer := setupPeerApplicationScorerTest(t, &ApplicationScoreParams{})
data.scorebook.err = errors.New("boom")
score := appScorer.ApplicationScore("aaa")
require.Zero(t, score)
}
func TestDecayScoresAfterDecayInterval(t *testing.T) {
params := &ApplicationScoreParams{
ValidResponseDecay: 0.8,
ErrorResponseDecay: 0.7,
RejectedPayloadDecay: 0.3,
DecayToZero: 0.1,
DecayInterval: 90 * time.Second,
}
data, appScorer := setupPeerApplicationScorerTest(t, params)
data.peers = []peer.ID{"aaa", "bbb"}
expectedDecay := &store.DecayApplicationScores{
ValidResponseDecay: 0.8,
ErrorResponseDecay: 0.7,
RejectedPayloadDecay: 0.3,
DecayToZero: 0.1,
}
appScorer.start()
defer appScorer.stop()
data.clock.WaitForNewPendingTaskWithTimeout(30 * time.Second)
data.clock.AdvanceTime(params.DecayInterval)
require.Equal(t, stubScoreBookUpdate{id: "aaa", diff: expectedDecay}, data.WaitForNextScoreBookUpdate(t))
require.Equal(t, stubScoreBookUpdate{id: "bbb", diff: expectedDecay}, data.WaitForNextScoreBookUpdate(t))
}
......@@ -53,7 +53,8 @@ type SetupP2P interface {
// ScoringParams defines the various types of peer scoring parameters.
type ScoringParams struct {
PeerScoring pubsub.PeerScoreParams
PeerScoring pubsub.PeerScoreParams
ApplicationScoring ApplicationScoreParams
}
// Config sets up a p2p host and discv5 service from configuration.
......@@ -137,11 +138,11 @@ func (conf *Config) Disabled() bool {
return conf.DisableP2P
}
func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams {
func (conf *Config) PeerScoringParams() *ScoringParams {
if conf.ScoringParams == nil {
return nil
}
return &conf.ScoringParams.PeerScoring
return conf.ScoringParams
}
func (conf *Config) BanPeers() bool {
......
......@@ -51,7 +51,7 @@ var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface {
PeerScoringParams() *pubsub.PeerScoreParams
PeerScoringParams() *ScoringParams
// ConfigureGossip creates configuration options to apply to the GossipSub setup
ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option
}
......
......@@ -149,7 +149,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
var scoreRetention time.Duration
if peerScoreParams != nil {
// Use the same retention period as gossip will if available
scoreRetention = peerScoreParams.RetainScore
scoreRetention = peerScoreParams.PeerScoring.RetainScore
} else {
// Disable score GC if peer scoring is disabled
scoreRetention = 0
......
......@@ -37,6 +37,7 @@ type NodeP2P struct {
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
peerMonitor *monitor.PeerMonitor // peer monitor to disconnect bad peers, may be nil even with p2p enabled
store store.ExtendedPeerstore // peerstore of host, with extra bindings for scoring and banning
appScorer ApplicationScorer
log log.Logger
// the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local *enode.LocalNode // p2p discovery identity
......@@ -89,9 +90,21 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager()
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
scoreParams := setup.PeerScoringParams()
if scoreParams != nil {
n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
} else {
n.appScorer = &NoopApplicationScorer{}
}
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics)
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
......@@ -112,12 +125,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
n.scorer = NewScorer(rollupCfg, eps, metrics, log)
n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
......@@ -150,6 +158,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
n.peerMonitor.Start()
}
n.appScorer.start()
}
return nil
}
......@@ -258,6 +267,9 @@ func (n *NodeP2P) Close() error {
}
}
}
if n.appScorer != nil {
n.appScorer.stop()
}
return result.ErrorOrNil()
}
......
......@@ -32,7 +32,7 @@ func ScoreDecay(duration time.Duration, slot time.Duration) float64 {
// See [PeerScoreParams] for detailed documentation.
//
// [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams
var LightPeerScoreParams = func(cfg *rollup.Config) pubsub.PeerScoreParams {
func LightPeerScoreParams(cfg *rollup.Config) pubsub.PeerScoreParams {
slot := time.Duration(cfg.BlockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
......@@ -91,7 +91,8 @@ func GetScoringParams(name string, cfg *rollup.Config) (*ScoringParams, error) {
switch name {
case "light":
return &ScoringParams{
PeerScoring: LightPeerScoreParams(cfg),
PeerScoring: LightPeerScoreParams(cfg),
ApplicationScoring: LightApplicationScoreParams(cfg),
}, nil
case "none":
return nil, nil
......
......@@ -81,6 +81,19 @@ func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_Light() {
testSuite.Equal(peerParams.DecayInterval, slot)
testSuite.Equal(peerParams.DecayToZero, DecayToZero)
testSuite.Equal(peerParams.RetainScore, oneHundredEpochs)
appParams := scoringParams.ApplicationScoring
testSuite.Positive(appParams.ValidResponseCap)
testSuite.Positive(appParams.ValidResponseWeight)
testSuite.Positive(appParams.ValidResponseDecay)
testSuite.Positive(appParams.ErrorResponseCap)
testSuite.Negative(appParams.ErrorResponseWeight)
testSuite.Positive(appParams.ErrorResponseDecay)
testSuite.Positive(appParams.RejectedPayloadCap)
testSuite.Negative(appParams.RejectedPayloadWeight)
testSuite.Positive(appParams.RejectedPayloadDecay)
testSuite.Equal(DecayToZero, appParams.DecayToZero)
testSuite.Equal(slot, appParams.DecayInterval)
}
// TestParamsZeroBlockTime validates peer score params use default slot for 0 block time.
......@@ -91,4 +104,5 @@ func (testSuite *PeerParamsTestSuite) TestParamsZeroBlockTime() {
params, err := GetScoringParams("light", &cfg)
testSuite.NoError(err)
testSuite.Equal(params.PeerScoring.DecayInterval, slot)
testSuite.Equal(params.ApplicationScoring.DecayInterval, slot)
}
......@@ -14,6 +14,7 @@ import (
type scorer struct {
peerStore Peerstore
metricer ScoreMetrics
appScorer ApplicationScorer
log log.Logger
cfg *rollup.Config
}
......@@ -36,6 +37,7 @@ type Peerstore interface {
// Scorer is a peer scorer that scores peers based on application-specific metrics.
type Scorer interface {
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
ApplicationScore(peer.ID) float64
}
//go:generate mockery --name ScoreMetrics --output mocks/
......@@ -44,10 +46,11 @@ type ScoreMetrics interface {
}
// NewScorer returns a new peer scorer.
func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, log log.Logger) Scorer {
func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, appScorer ApplicationScorer, log log.Logger) Scorer {
return &scorer{
peerStore: peerStore,
metricer: metricer,
appScorer: appScorer,
log: log,
cfg: cfg,
}
......@@ -84,3 +87,7 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
s.metricer.SetPeerScores(allScores)
}
}
func (s *scorer) ApplicationScore(id peer.ID) float64 {
return s.appScorer.ApplicationScore(id)
}
......@@ -44,6 +44,7 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
&p2p.NoopApplicationScorer{},
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()
......
......@@ -9,12 +9,15 @@ import (
func ConfigurePeerScoring(gossipConf GossipSetupConfigurables, scorer Scorer, log log.Logger) []pubsub.Option {
// If we want to completely disable scoring config here, we can use the [peerScoringParams]
// to return early without returning any [pubsub.Option].
peerScoreParams := gossipConf.PeerScoringParams()
peerScoreThresholds := NewPeerScoreThresholds()
scoreParams := gossipConf.PeerScoringParams()
opts := []pubsub.Option{}
if peerScoreParams != nil {
if scoreParams != nil {
peerScoreThresholds := NewPeerScoreThresholds()
// Create copy of params before modifying the AppSpecificScore
params := scoreParams.PeerScoring
params.AppSpecificScore = scorer.ApplicationScore
opts = []pubsub.Option{
pubsub.WithPeerScore(peerScoreParams, &peerScoreThresholds),
pubsub.WithPeerScore(&params, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
}
} else {
......
......@@ -82,6 +82,18 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h
return out
}
type discriminatingAppScorer struct {
badPeer peer.ID
NoopApplicationScorer
}
func (d *discriminatingAppScorer) ApplicationScore(id peer.ID) float64 {
if id == d.badPeer {
return -1000
}
return 0
}
func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []host.Host) []*pubsub.PubSub {
var psubs []*pubsub.PubSub
......@@ -100,17 +112,10 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
scorer := NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)},
extPeerStore, testSuite.mockMetricer, logger)
extPeerStore, testSuite.mockMetricer, &discriminatingAppScorer{badPeer: hosts[0].ID()}, logger)
opts = append(opts, ConfigurePeerScoring(&Config{
ScoringParams: &ScoringParams{
PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() {
return -1000
} else {
return 0
}
},
AppSpecificWeight: 1,
DecayInterval: time.Second,
DecayToZero: 0.01,
......
......@@ -69,7 +69,7 @@ func (p *Prepared) ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option {
}
}
func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams {
func (p *Prepared) PeerScoringParams() *ScoringParams {
return nil
}
......
......@@ -111,6 +111,12 @@ type SyncClientMetrics interface {
PayloadsQuarantineSize(n int)
}
type SyncPeerScorer interface {
onValidResponse(id peer.ID)
onResponseError(id peer.ID)
onRejectedPayload(id peer.ID)
}
// SyncClient implements a reverse chain sync with a minimal interface:
// signal the desired range, and receive blocks within this range back.
// Through parent-hash verification, received blocks are all ensured to be part of the canonical chain at one point,
......@@ -180,7 +186,8 @@ type SyncClient struct {
cfg *rollup.Config
metrics SyncClientMetrics
metrics SyncClientMetrics
appScorer SyncPeerScorer
newStreamFn newStreamFn
payloadByNumber protocol.ID
......@@ -227,13 +234,14 @@ type SyncClient struct {
closingPeers bool
}
func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rcv receivePayloadFn, metrics SyncClientMetrics) *SyncClient {
func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rcv receivePayloadFn, metrics SyncClientMetrics, appScorer SyncPeerScorer) *SyncClient {
ctx, cancel := context.WithCancel(context.Background())
c := &SyncClient{
log: log,
cfg: cfg,
metrics: metrics,
appScorer: appScorer,
newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc),
......@@ -424,7 +432,8 @@ func (s *SyncClient) onQuarantineEvict(key common.Hash, value syncResult) {
s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
if !s.trusted.Contains(key) {
s.log.Debug("evicting untrusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer)
// TODO(CLI-3732): downscore peer for having provided us a bad block that never turned out to be canonical
// Down-score peer for having provided us a bad block that never turned out to be canonical
s.appScorer.onRejectedPayload(value.peer)
} else {
s.log.Debug("evicting trusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer)
}
......@@ -525,6 +534,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// mark as complete if there's an error: we are not sending any result and can complete immediately.
pr.complete.Store(true)
log.Warn("failed p2p sync request", "num", pr.num, "err", err)
s.appScorer.onResponseError(id)
// If we hit an error, then count it as many requests.
// We'd like to avoid making more requests for a while, to back off.
if err := rl.WaitN(ctx, clientErrRateCost); err != nil {
......@@ -532,11 +542,9 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
}
} else {
log.Debug("completed p2p sync request", "num", pr.num)
s.appScorer.onValidResponse(id)
}
took := time.Since(start)
// TODO(CLI-3732): update scores: depending on the speed of the result,
// increase the p2p-sync part of the peer score
// (don't allow the score to grow indefinitely only based on this factor though)
resultCode := byte(0)
if err != nil {
......
......@@ -137,7 +137,7 @@ func TestSinglePeerSync(t *testing.T) {
hostA.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
// Setup host B as the client
cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics)
cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
// Setup host B (client) to sync from its peer Host A (server)
cl.AddPeer(hostA.ID())
......@@ -190,7 +190,7 @@ func TestMultiPeerSync(t *testing.T) {
payloadByNumber := MakeStreamHandler(ctx, log.New("serve", "payloads_by_number"), srv.HandleSyncRequest)
h.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics)
cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
return cl, received
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment