Commit 589b1fd2 authored by Sam Stokes's avatar Sam Stokes Committed by GitHub

op-node: prevent spamming of reqs for blocks triggered by `checkForGapInUnsafeQueue` (#10063)

* Add rangeReqId so we can cancel all associated block req via single flag

* Use crypto/rand instead of math/rand for randomReqId

* Use atomic counter instead of random num for rangeReqId

* Remove redundant peerRequest.complete field. Use inFlight instead

* Add mutex to activeRangeRequests map

* Do not penalize peer for block not found error

* Fix inFlight request clean up logic

* Add test checks for cancelled range request

* Add mutex protection to inFlight map

* Use constants for ResultCode

* Simplify inFlight.get method logic

* Use same struct for activeRangeRequests and inFlight
parent 0d221da6
...@@ -176,7 +176,8 @@ func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) ...@@ -176,7 +176,8 @@ func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef)
if !n.AltSyncEnabled() { if !n.AltSyncEnabled() {
return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end) return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end)
} }
return n.syncCl.RequestL2Range(ctx, start, end) _, err := n.syncCl.RequestL2Range(ctx, start, end)
return err
} }
func (n *NodeP2P) Host() host.Host { func (n *NodeP2P) Host() host.Host {
......
...@@ -61,6 +61,13 @@ const ( ...@@ -61,6 +61,13 @@ const (
clientErrRateCost = peerServerBlocksBurst clientErrRateCost = peerServerBlocksBurst
) )
const (
ResultCodeSuccess byte = 0
ResultCodeNotFoundErr byte = 1
ResultCodeInvalidErr byte = 2
ResultCodeUnknownErr byte = 3
)
func PayloadByNumberProtocolID(l2ChainID *big.Int) protocol.ID { func PayloadByNumberProtocolID(l2ChainID *big.Int) protocol.ID {
return protocol.ID(fmt.Sprintf("/opstack/req/payload_by_number/%d/0", l2ChainID)) return protocol.ID(fmt.Sprintf("/opstack/req/payload_by_number/%d/0", l2ChainID))
} }
...@@ -87,6 +94,7 @@ type receivePayloadFn func(ctx context.Context, from peer.ID, payload *eth.Execu ...@@ -87,6 +94,7 @@ type receivePayloadFn func(ctx context.Context, from peer.ID, payload *eth.Execu
type rangeRequest struct { type rangeRequest struct {
start uint64 start uint64
end eth.L2BlockRef end eth.L2BlockRef
id uint64
} }
type syncResult struct { type syncResult struct {
...@@ -95,17 +103,44 @@ type syncResult struct { ...@@ -95,17 +103,44 @@ type syncResult struct {
} }
type peerRequest struct { type peerRequest struct {
num uint64 num uint64
rangeReqId uint64
complete *atomic.Bool
} }
type inFlightCheck struct { type inFlightCheck struct {
num uint64 num uint64
result chan bool result chan bool
} }
type requestIdMap struct {
requests map[uint64]bool
mu sync.Mutex
}
func newRequestIdMap() *requestIdMap {
return &requestIdMap{
requests: make(map[uint64]bool),
}
}
func (r *requestIdMap) set(key uint64, value bool) {
r.mu.Lock()
r.requests[key] = value
r.mu.Unlock()
}
func (r *requestIdMap) get(key uint64) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.requests[key]
}
func (r *requestIdMap) delete(key uint64) {
r.mu.Lock()
delete(r.requests, key)
r.mu.Unlock()
}
type SyncClientMetrics interface { type SyncClientMetrics interface {
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int) PayloadsQuarantineSize(n int)
...@@ -209,12 +244,14 @@ type SyncClient struct { ...@@ -209,12 +244,14 @@ type SyncClient struct {
quarantineByNum map[uint64]common.Hash quarantineByNum map[uint64]common.Hash
// inFlight requests are not repeated // inFlight requests are not repeated
inFlight map[uint64]*atomic.Bool inFlight *requestIdMap
requests chan rangeRequest
peerRequests chan peerRequest
inFlightChecks chan inFlightCheck inFlightChecks chan inFlightCheck
rangeRequests chan rangeRequest
activeRangeRequests *requestIdMap
rangeReqId uint64
peerRequests chan peerRequest
results chan syncResult results chan syncResult
receivePayload receivePayloadFn receivePayload receivePayloadFn
...@@ -238,24 +275,26 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc ...@@ -238,24 +275,26 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
c := &SyncClient{ c := &SyncClient{
log: log, log: log,
cfg: cfg, cfg: cfg,
metrics: metrics, metrics: metrics,
appScorer: appScorer, appScorer: appScorer,
newStreamFn: newStream, newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc), peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash), quarantineByNum: make(map[uint64]common.Hash),
inFlight: make(map[uint64]*atomic.Bool), rangeRequests: make(chan rangeRequest), // blocking
requests: make(chan rangeRequest), // blocking activeRangeRequests: newRequestIdMap(),
peerRequests: make(chan peerRequest, 128), peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128), results: make(chan syncResult, 128),
inFlightChecks: make(chan inFlightCheck, 128), inFlight: newRequestIdMap(),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), inFlightChecks: make(chan inFlightCheck, 128),
resCtx: ctx, globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCancel: cancel, resCtx: ctx,
receivePayload: rcv, resCancel: cancel,
receivePayload: rcv,
} }
// never errors with positive LRU cache size // never errors with positive LRU cache size
// TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count, // TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count,
// we can safely buffer more data in the happy case. // we can safely buffer more data in the happy case.
...@@ -313,17 +352,23 @@ func (s *SyncClient) Close() error { ...@@ -313,17 +352,23 @@ func (s *SyncClient) Close() error {
return nil return nil
} }
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) (uint64, error) {
if end == (eth.L2BlockRef{}) { if end == (eth.L2BlockRef{}) {
s.log.Debug("P2P sync client received range signal, but cannot sync open-ended chain: need sync target to verify blocks through parent-hashes", "start", start) s.log.Debug("P2P sync client received range signal, but cannot sync open-ended chain: need sync target to verify blocks through parent-hashes", "start", start)
return nil return 0, nil
} }
// Create shared rangeReqId so associated peerRequests can all be cancelled by setting a single flag
rangeReqId := atomic.AddUint64(&s.rangeReqId, 1)
// need to flag request as active before adding request to s.rangeRequests to avoid race
s.activeRangeRequests.set(rangeReqId, true)
// synchronize requests with the main loop for state access // synchronize requests with the main loop for state access
select { select {
case s.requests <- rangeRequest{start: start.Number, end: end}: case s.rangeRequests <- rangeRequest{start: start.Number, end: end, id: rangeReqId}:
return nil return rangeReqId, nil
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err()) s.activeRangeRequests.delete(rangeReqId)
return rangeReqId, fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err())
} }
} }
...@@ -336,7 +381,7 @@ func (s *SyncClient) mainLoop() { ...@@ -336,7 +381,7 @@ func (s *SyncClient) mainLoop() {
defer s.wg.Done() defer s.wg.Done()
for { for {
select { select {
case req := <-s.requests: case req := <-s.rangeRequests:
ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling) ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling)
s.onRangeRequest(ctx, req) s.onRangeRequest(ctx, req)
cancel() cancel()
...@@ -346,12 +391,7 @@ func (s *SyncClient) mainLoop() { ...@@ -346,12 +391,7 @@ func (s *SyncClient) mainLoop() {
cancel() cancel()
case check := <-s.inFlightChecks: case check := <-s.inFlightChecks:
s.log.Info("Checking in flight", "num", check.num) s.log.Info("Checking in flight", "num", check.num)
complete, ok := s.inFlight[check.num] check.result <- s.inFlight.get(check.num)
if !ok {
check.result <- false
} else {
check.result <- !complete.Load()
}
case <-s.resCtx.Done(): case <-s.resCtx.Done():
s.log.Info("stopped P2P req-resp L2 block sync client") s.log.Info("stopped P2P req-resp L2 block sync client")
return return
...@@ -377,19 +417,13 @@ func (s *SyncClient) isInFlight(ctx context.Context, num uint64) (bool, error) { ...@@ -377,19 +417,13 @@ func (s *SyncClient) isInFlight(ctx context.Context, num uint64) (bool, error) {
// onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state. // onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state.
// This function transforms requested block ranges into work for each peer. // This function transforms requested block ranges into work for each peer.
func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
log := s.log.New("target", req.start, "end", req.end)
log.Info("processing L2 range request", "rangeReqId", req.id)
// add req head to trusted set of blocks // add req head to trusted set of blocks
s.trusted.Add(req.end.Hash, struct{}{}) s.trusted.Add(req.end.Hash, struct{}{})
s.trusted.Add(req.end.ParentHash, struct{}{}) s.trusted.Add(req.end.ParentHash, struct{}{})
log := s.log.New("target", req.start, "end", req.end)
// clean up the completed in-flight requests
for k, v := range s.inFlight {
if v.Load() {
delete(s.inFlight, k)
}
}
// Now try to fetch lower numbers than current end, to traverse back towards the updated start. // Now try to fetch lower numbers than current end, to traverse back towards the updated start.
for i := uint64(0); ; i++ { for i := uint64(0); ; i++ {
num := req.end.Number - 1 - i num := req.end.Number - 1 - i
...@@ -406,17 +440,17 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { ...@@ -406,17 +440,17 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
continue continue
} }
if _, ok := s.inFlight[num]; ok { if s.inFlight.get(num) {
log.Debug("request still in-flight, not rescheduling sync request", "num", num) log.Debug("request still in-flight, not rescheduling sync request", "num", num)
continue // request still in flight continue // request still in flight
} }
pr := peerRequest{num: num, complete: new(atomic.Bool)} pr := peerRequest{num: num, rangeReqId: req.id}
log.Debug("Scheduling P2P block request", "num", num) log.Debug("Scheduling P2P block request", "num", num, "rangeReqId", req.id)
// schedule number // schedule number
select { select {
case s.peerRequests <- pr: case s.peerRequests <- pr:
s.inFlight[num] = pr.complete s.inFlight.set(num, true)
case <-ctx.Done(): case <-ctx.Done():
log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err()) log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
return return
...@@ -487,7 +521,7 @@ func (s *SyncClient) onResult(ctx context.Context, res syncResult) { ...@@ -487,7 +521,7 @@ func (s *SyncClient) onResult(ctx context.Context, res syncResult) {
payload := res.payload.ExecutionPayload payload := res.payload.ExecutionPayload
s.log.Debug("processing p2p sync result", "payload", payload.ID(), "peer", res.peer) s.log.Debug("processing p2p sync result", "payload", payload.ID(), "peer", res.peer)
// Clean up the in-flight request, we have a result now. // Clean up the in-flight request, we have a result now.
delete(s.inFlight, uint64(payload.BlockNumber)) s.inFlight.delete(uint64(payload.BlockNumber))
// Always put it in quarantine first. If promotion fails because the receiver is too busy, this functions as cache. // Always put it in quarantine first. If promotion fails because the receiver is too busy, this functions as cache.
s.quarantine.Add(payload.BlockHash, res) s.quarantine.Add(payload.BlockHash, res)
s.quarantineByNum[uint64(payload.BlockNumber)] = payload.BlockHash s.quarantineByNum[uint64(payload.BlockNumber)] = payload.BlockHash
...@@ -528,17 +562,39 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { ...@@ -528,17 +562,39 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// once the peer is available, wait for a sync request. // once the peer is available, wait for a sync request.
select { select {
case pr := <-s.peerRequests: case pr := <-s.peerRequests:
if !s.activeRangeRequests.get(pr.rangeReqId) {
log.Debug("dropping cancelled p2p sync request", "num", pr.num)
s.inFlight.delete(pr.num)
continue
}
// We already established the peer is available w.r.t. rate-limiting, // We already established the peer is available w.r.t. rate-limiting,
// and this is the only loop over this peer, so we can request now. // and this is the only loop over this peer, so we can request now.
start := time.Now() start := time.Now()
resultCode := ResultCodeSuccess
err := s.doRequest(ctx, id, pr.num) err := s.doRequest(ctx, id, pr.num)
if err != nil { if err != nil {
// mark as complete if there's an error: we are not sending any result and can complete immediately. s.inFlight.delete(pr.num)
pr.complete.Store(true)
log.Warn("failed p2p sync request", "num", pr.num, "err", err) log.Warn("failed p2p sync request", "num", pr.num, "err", err)
s.appScorer.onResponseError(id) resultCode = ResultCodeNotFoundErr
sendResponseError := true
if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
if resultCode == ResultCodeNotFoundErr {
log.Warn("cancelling p2p sync range request", "rangeReqId", pr.rangeReqId)
s.activeRangeRequests.delete(pr.rangeReqId)
sendResponseError = false // don't penalize peer for this error
}
}
if sendResponseError {
s.appScorer.onResponseError(id)
}
// If we hit an error, then count it as many requests. // If we hit an error, then count it as many requests.
// We'd like to avoid making more requests for a while, to back off. // We'd like to avoid making more requests for a while, so back off.
if err := rl.WaitN(ctx, clientErrRateCost); err != nil { if err := rl.WaitN(ctx, clientErrRateCost); err != nil {
return return
} }
...@@ -546,16 +602,8 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { ...@@ -546,16 +602,8 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
log.Debug("completed p2p sync request", "num", pr.num) log.Debug("completed p2p sync request", "num", pr.num)
s.appScorer.onValidResponse(id) s.appScorer.onValidResponse(id)
} }
took := time.Since(start)
resultCode := byte(0) took := time.Since(start)
if err != nil {
if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
} else {
resultCode = 1
}
}
s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took) s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took)
case <-ctx.Done(): case <-ctx.Done():
return return
...@@ -740,22 +788,22 @@ func (srv *ReqRespServer) HandleSyncRequest(ctx context.Context, log log.Logger, ...@@ -740,22 +788,22 @@ func (srv *ReqRespServer) HandleSyncRequest(ctx context.Context, log log.Logger,
req, err := srv.handleSyncRequest(ctx, stream) req, err := srv.handleSyncRequest(ctx, stream)
cancel() cancel()
resultCode := byte(0) resultCode := ResultCodeSuccess
if err != nil { if err != nil {
log.Warn("failed to serve p2p sync request", "req", req, "err", err) log.Warn("failed to serve p2p sync request", "req", req, "err", err)
if errors.Is(err, ethereum.NotFound) { if errors.Is(err, ethereum.NotFound) {
resultCode = 1 resultCode = ResultCodeNotFoundErr
} else if errors.Is(err, invalidRequestErr) { } else if errors.Is(err, invalidRequestErr) {
resultCode = 2 resultCode = ResultCodeInvalidErr
} else { } else {
resultCode = 3 resultCode = ResultCodeUnknownErr
} }
// try to write error code, so the other peer can understand the reason for failure. // try to write error code, so the other peer can understand the reason for failure.
_, _ = stream.Write([]byte{resultCode}) _, _ = stream.Write([]byte{resultCode})
} else { } else {
log.Debug("successfully served sync response", "req", req) log.Debug("successfully served sync response", "req", req)
} }
srv.metrics.ServerPayloadByNumberEvent(req, 0, time.Since(start)) srv.metrics.ServerPayloadByNumberEvent(req, resultCode, time.Since(start))
} }
var invalidRequestErr = errors.New("invalid request") var invalidRequestErr = errors.New("invalid request")
......
...@@ -169,7 +169,8 @@ func TestSinglePeerSync(t *testing.T) { ...@@ -169,7 +169,8 @@ func TestSinglePeerSync(t *testing.T) {
defer cl.Close() defer cl.Close()
// request to start syncing between 10 and 20 // request to start syncing between 10 and 20
require.NoError(t, cl.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(20))) _, err = cl.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(20))
require.NoError(t, err)
// and wait for the sync results to come in (in reverse order) // and wait for the sync results to come in (in reverse order)
for i := uint64(19); i > 10; i-- { for i := uint64(19); i > 10; i-- {
...@@ -255,7 +256,8 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -255,7 +256,8 @@ func TestMultiPeerSync(t *testing.T) {
defer clC.Close() defer clC.Close()
// request to start syncing between 10 and 90 // request to start syncing between 10 and 90
require.NoError(t, clA.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(90))) _, err = clA.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(90))
require.NoError(t, err)
// With such large range to request we are going to hit the rate-limits of B and C, // With such large range to request we are going to hit the rate-limits of B and C,
// but that means we'll balance the work between the peers. // but that means we'll balance the work between the peers.
...@@ -270,13 +272,18 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -270,13 +272,18 @@ func TestMultiPeerSync(t *testing.T) {
// now see if B can sync a range, and fill the gap with a re-request // now see if B can sync a range, and fill the gap with a re-request
bl25, _ := payloads.getPayload(25) // temporarily remove it from the available payloads. This will create a gap bl25, _ := payloads.getPayload(25) // temporarily remove it from the available payloads. This will create a gap
payloads.deletePayload(25) payloads.deletePayload(25)
require.NoError(t, clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(30))) rangeReqId, err := clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(30))
require.NoError(t, err)
require.True(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be active")
for i := uint64(29); i > 25; i-- { for i := uint64(29); i > 25; i-- {
p := <-recvB p := <-recvB
exp, ok := payloads.getPayload(uint64(p.ExecutionPayload.BlockNumber)) exp, ok := payloads.getPayload(uint64(p.ExecutionPayload.BlockNumber))
require.True(t, ok, "expecting known payload") require.True(t, ok, "expecting known payload")
require.Equal(t, exp.ExecutionPayload.BlockHash, p.ExecutionPayload.BlockHash, "expecting the correct payload") require.Equal(t, exp.ExecutionPayload.BlockHash, p.ExecutionPayload.BlockHash, "expecting the correct payload")
} }
// Wait for the request for block 25 to be made // Wait for the request for block 25 to be made
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc() defer cancelFunc()
...@@ -291,12 +298,12 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -291,12 +298,12 @@ func TestMultiPeerSync(t *testing.T) {
t.Fatal("Did not request block 25 in a reasonable time") t.Fatal("Did not request block 25 in a reasonable time")
} }
} }
// the request for 25 should fail. See: // the request for 25 should fail. See:
// server: WARN peer requested unknown block by number num=25 // server: WARN peer requested unknown block by number num=25
// client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1" // client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1"
require.Zero(t, len(recvB), "there is a gap, should not see other payloads yet") require.Zero(t, len(recvB), "there is a gap, should not see other payloads yet")
// Add back the block
payloads.addPayload(bl25)
// race-condition fix: the request for 25 is expected to error, but is marked as complete in the peer-loop. // race-condition fix: the request for 25 is expected to error, but is marked as complete in the peer-loop.
// But the re-request checks the status in the main loop, and it may thus look like it's still in-flight, // But the re-request checks the status in the main loop, and it may thus look like it's still in-flight,
// and thus not run the new request. // and thus not run the new request.
...@@ -306,13 +313,18 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -306,13 +313,18 @@ func TestMultiPeerSync(t *testing.T) {
for { for {
isInFlight, err := clB.isInFlight(ctx, 25) isInFlight, err := clB.isInFlight(ctx, 25)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Second)
if !isInFlight { if !isInFlight {
break break
} }
time.Sleep(time.Second)
} }
require.False(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be cancelled")
// Add back the block
payloads.addPayload(bl25)
// And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine) // And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine)
require.NoError(t, clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(26))) _, err = clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(26))
require.NoError(t, err)
for i := uint64(25); i > 20; i-- { for i := uint64(25); i > 20; i-- {
p := <-recvB p := <-recvB
exp, ok := payloads.getPayload(uint64(p.ExecutionPayload.BlockNumber)) exp, ok := payloads.getPayload(uint64(p.ExecutionPayload.BlockNumber))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment