Commit 62dffcc4 authored by protolambda's avatar protolambda

op-node: p2p alt-sync review fixes

parent 0d7859bb
......@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
......@@ -125,7 +126,7 @@ func TestP2PFull(t *testing.T) {
runCfgB := &testutils.MockRuntimeConfig{P2PSeqAddress: common.Address{0x42}}
logA := testlog.Logger(t, log.LvlError).New("host", "A")
nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, nil, runCfgA, nil)
nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, nil, runCfgA, metrics.NoopMetrics)
require.NoError(t, err)
defer nodeA.Close()
......@@ -148,7 +149,7 @@ func TestP2PFull(t *testing.T) {
logB := testlog.Logger(t, log.LvlError).New("host", "B")
nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, nil, runCfgB, nil)
nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, nil, runCfgB, metrics.NoopMetrics)
require.NoError(t, err)
defer nodeB.Close()
hostB := nodeB.Host()
......@@ -277,7 +278,7 @@ func TestDiscovery(t *testing.T) {
resourcesCtx, resourcesCancel := context.WithCancel(context.Background())
defer resourcesCancel()
nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, nil, runCfgA, nil)
nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, nil, runCfgA, metrics.NoopMetrics)
require.NoError(t, err)
defer nodeA.Close()
hostA := nodeA.Host()
......@@ -292,7 +293,7 @@ func TestDiscovery(t *testing.T) {
confB.DiscoveryDB = discDBC
// Start B
nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, nil, runCfgB, nil)
nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, nil, runCfgB, metrics.NoopMetrics)
require.NoError(t, err)
defer nodeB.Close()
hostB := nodeB.Host()
......@@ -307,7 +308,7 @@ func TestDiscovery(t *testing.T) {
}})
// Start C
nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, nil, runCfgC, nil)
nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, nil, runCfgC, metrics.NoopMetrics)
require.NoError(t, err)
defer nodeC.Close()
hostC := nodeC.Host()
......
......@@ -178,7 +178,7 @@ type SyncClient struct {
newStreamFn newStreamFn
payloadByNumber protocol.ID
sync.Mutex
peersLock sync.Mutex
// syncing worker per peer
peers map[peer.ID]context.CancelFunc
......@@ -244,8 +244,8 @@ func (s *SyncClient) Start() {
}
func (s *SyncClient) AddPeer(id peer.ID) {
s.Lock()
defer s.Unlock()
s.peersLock.Lock()
defer s.peersLock.Unlock()
if _, ok := s.peers[id]; ok {
s.log.Warn("cannot register peer for sync duties, peer was already registered", "peer", id)
return
......@@ -258,8 +258,8 @@ func (s *SyncClient) AddPeer(id peer.ID) {
}
func (s *SyncClient) RemovePeer(id peer.ID) {
s.Lock()
defer s.Unlock()
s.peersLock.Lock()
defer s.peersLock.Unlock()
cancel, ok := s.peers[id]
if !ok {
s.log.Warn("cannot remove peer from sync duties, peer was not registered", "peer", id)
......@@ -357,6 +357,7 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
s.inFlight[num] = pr.complete
case <-ctx.Done():
log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
return
default: // peers may all be busy processing requests already
log.Info("no peers ready to handle block requests for more P2P requests for L2 block history", "current", num)
return
......@@ -366,9 +367,7 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
func (s *SyncClient) onQuarantineEvict(key common.Hash, value syncResult) {
delete(s.quarantineByNum, uint64(value.payload.BlockNumber))
if s.metrics != nil {
s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
}
s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
if !s.trusted.Contains(key) {
s.log.Debug("evicting untrusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer)
// TODO(CLI-3732): downscore peer for having provided us a bad block that never turned out to be canonical
......@@ -380,7 +379,8 @@ func (s *SyncClient) onQuarantineEvict(key common.Hash, value syncResult) {
func (s *SyncClient) tryPromote(h common.Hash) {
parentRes, ok := s.quarantine.Get(h)
if ok {
// Simply reschedule the result, to get it (and possibly its parents) out of quarantine without recursion
// Simply reschedule the result, to get it (and possibly its parents) out of quarantine without recursion.
// s.results is buffered, but skip the promotion if the channel is full as it would cause a deadlock.
select {
case s.results <- parentRes:
default:
......@@ -426,9 +426,7 @@ func (s *SyncClient) onResult(ctx context.Context, res syncResult) {
// Always put it in quarantine first. If promotion fails because the receiver is too busy, this functions as cache.
s.quarantine.Add(res.payload.BlockHash, res)
s.quarantineByNum[uint64(res.payload.BlockNumber)] = res.payload.BlockHash
if s.metrics != nil {
s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
}
s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
// If we know this block is canonical, then promote it
if s.trusted.Contains(res.payload.BlockHash) {
s.promote(ctx, res)
......@@ -438,10 +436,10 @@ func (s *SyncClient) onResult(ctx context.Context, res syncResult) {
// peerLoop for syncing from a single peer
func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
defer func() {
s.Lock()
s.peersLock.Lock()
delete(s.peers, id) // clean up
s.wg.Done()
s.Unlock()
s.peersLock.Unlock()
s.log.Debug("stopped syncing loop of peer", "id", id)
}()
......@@ -485,17 +483,15 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// increase the p2p-sync part of the peer score
// (don't allow the score to grow indefinitely only based on this factor though)
if s.metrics != nil {
resultCode := byte(0)
if err != nil {
if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
} else {
resultCode = 1
}
resultCode := byte(0)
if err != nil {
if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
} else {
resultCode = 1
}
s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took)
}
s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took)
case <-ctx.Done():
return
}
......@@ -666,9 +662,7 @@ func (srv *ReqRespServer) HandleSyncRequest(ctx context.Context, log log.Logger,
} else {
log.Debug("successfully served sync response", "req", req)
}
if srv.metrics != nil {
srv.metrics.ServerPayloadByNumberEvent(req, 0, time.Since(start))
}
srv.metrics.ServerPayloadByNumberEvent(req, 0, time.Since(start))
}
var invalidRequestErr = errors.New("invalid request")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment