diff --git a/op-node/p2p/sync.go b/op-node/p2p/sync.go index 75033ec809e8bfb97eecaf3940ab3556dfc8148a..ab2b8f46290cb0916d5222136cfe1ec905cfdf7b 100644 --- a/op-node/p2p/sync.go +++ b/op-node/p2p/sync.go @@ -100,6 +100,12 @@ type peerRequest struct { complete *atomic.Bool } +type inFlightCheck struct { + num uint64 + + result chan bool +} + type SyncClientMetrics interface { ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) PayloadsQuarantineSize(n int) @@ -198,8 +204,9 @@ type SyncClient struct { // inFlight requests are not repeated inFlight map[uint64]*atomic.Bool - requests chan rangeRequest - peerRequests chan peerRequest + requests chan rangeRequest + peerRequests chan peerRequest + inFlightChecks chan inFlightCheck results chan syncResult @@ -235,6 +242,7 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc requests: make(chan rangeRequest), // blocking peerRequests: make(chan peerRequest, 128), results: make(chan syncResult, 128), + inFlightChecks: make(chan inFlightCheck, 128), globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), resCtx: ctx, resCancel: cancel, @@ -328,6 +336,14 @@ func (s *SyncClient) mainLoop() { ctx, cancel := context.WithTimeout(s.resCtx, maxResultProcessing) s.onResult(ctx, res) cancel() + case check := <-s.inFlightChecks: + s.log.Info("Checking in flight", "num", check.num) + complete, ok := s.inFlight[check.num] + if !ok { + check.result <- false + } else { + check.result <- !complete.Load() + } case <-s.resCtx.Done(): s.log.Info("stopped P2P req-resp L2 block sync client") return @@ -335,6 +351,21 @@ func (s *SyncClient) mainLoop() { } } +func (s *SyncClient) isInFlight(ctx context.Context, num uint64) (bool, error) { + check := inFlightCheck{num: num, result: make(chan bool, 1)} + select { + case s.inFlightChecks <- check: + case <-ctx.Done(): + return false, errors.New("context cancelled when publishing in flight check") + } + select { + case res := <-check.result: + return res, nil + case <-ctx.Done(): + return false, errors.New("context cancelled while waiting for in flight check response") + } +} + // onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state. // This function transforms requested block ranges into work for each peer. func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { diff --git a/op-node/p2p/sync_test.go b/op-node/p2p/sync_test.go index c9c3528f192841879195f70755bb924400a66465..a11bf9168189bc117a583ef6f8572c21c3f0385c 100644 --- a/op-node/p2p/sync_test.go +++ b/op-node/p2p/sync_test.go @@ -271,12 +271,11 @@ func TestMultiPeerSync(t *testing.T) { // Wait till the failed request is recognized as marked as done, so the re-request actually runs. ctx, cancelFunc = context.WithTimeout(context.Background(), 30*time.Second) defer cancelFunc() - for clB.inFlight[25] != nil && !clB.inFlight[25].Load() { - select { - case <-ctx.Done(): - t.Fatal("Did not complete request for block 25 in timely manner") - default: - // Carry on, timeout not yet reached + for { + isInFlight, err := clB.isInFlight(ctx, 25) + require.NoError(t, err) + if !isInFlight { + break } time.Sleep(time.Second) }