Commit c446b09f authored by Adrian Sutton's avatar Adrian Sutton

op-node: Add thread-safe method to check if p2p sync has a request for a block in-flight

parent 4c66cb35
......@@ -100,6 +100,12 @@ type peerRequest struct {
complete *atomic.Bool
}
type inFlightCheck struct {
num uint64
result chan bool
}
type SyncClientMetrics interface {
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int)
......@@ -198,8 +204,9 @@ type SyncClient struct {
// inFlight requests are not repeated
inFlight map[uint64]*atomic.Bool
requests chan rangeRequest
peerRequests chan peerRequest
requests chan rangeRequest
peerRequests chan peerRequest
inFlightChecks chan inFlightCheck
results chan syncResult
......@@ -235,6 +242,7 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
requests: make(chan rangeRequest), // blocking
peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128),
inFlightChecks: make(chan inFlightCheck, 128),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCtx: ctx,
resCancel: cancel,
......@@ -328,6 +336,14 @@ func (s *SyncClient) mainLoop() {
ctx, cancel := context.WithTimeout(s.resCtx, maxResultProcessing)
s.onResult(ctx, res)
cancel()
case check := <-s.inFlightChecks:
s.log.Info("Checking in flight", "num", check.num)
complete, ok := s.inFlight[check.num]
if !ok {
check.result <- false
} else {
check.result <- !complete.Load()
}
case <-s.resCtx.Done():
s.log.Info("stopped P2P req-resp L2 block sync client")
return
......@@ -335,6 +351,21 @@ func (s *SyncClient) mainLoop() {
}
}
func (s *SyncClient) isInFlight(ctx context.Context, num uint64) (bool, error) {
check := inFlightCheck{num: num, result: make(chan bool, 1)}
select {
case s.inFlightChecks <- check:
case <-ctx.Done():
return false, errors.New("context cancelled when publishing in flight check")
}
select {
case res := <-check.result:
return res, nil
case <-ctx.Done():
return false, errors.New("context cancelled while waiting for in flight check response")
}
}
// onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state.
// This function transforms requested block ranges into work for each peer.
func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
......
......@@ -271,12 +271,11 @@ func TestMultiPeerSync(t *testing.T) {
// Wait till the failed request is recognized as marked as done, so the re-request actually runs.
ctx, cancelFunc = context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()
for clB.inFlight[25] != nil && !clB.inFlight[25].Load() {
select {
case <-ctx.Done():
t.Fatal("Did not complete request for block 25 in timely manner")
default:
// Carry on, timeout not yet reached
for {
isInFlight, err := clB.isInFlight(ctx, 25)
require.NoError(t, err)
if !isInFlight {
break
}
time.Sleep(time.Second)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment