Commit 43759e62 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6002 from ethereum-optimism/aj/flaky-multipeer

op-node: Fix TestMultiPeerSync flakiness
parents 37ed84de e808e58b
...@@ -100,6 +100,12 @@ type peerRequest struct { ...@@ -100,6 +100,12 @@ type peerRequest struct {
complete *atomic.Bool complete *atomic.Bool
} }
type inFlightCheck struct {
num uint64
result chan bool
}
type SyncClientMetrics interface { type SyncClientMetrics interface {
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int) PayloadsQuarantineSize(n int)
...@@ -198,8 +204,9 @@ type SyncClient struct { ...@@ -198,8 +204,9 @@ type SyncClient struct {
// inFlight requests are not repeated // inFlight requests are not repeated
inFlight map[uint64]*atomic.Bool inFlight map[uint64]*atomic.Bool
requests chan rangeRequest requests chan rangeRequest
peerRequests chan peerRequest peerRequests chan peerRequest
inFlightChecks chan inFlightCheck
results chan syncResult results chan syncResult
...@@ -235,6 +242,7 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc ...@@ -235,6 +242,7 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
requests: make(chan rangeRequest), // blocking requests: make(chan rangeRequest), // blocking
peerRequests: make(chan peerRequest, 128), peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128), results: make(chan syncResult, 128),
inFlightChecks: make(chan inFlightCheck, 128),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCtx: ctx, resCtx: ctx,
resCancel: cancel, resCancel: cancel,
...@@ -328,6 +336,14 @@ func (s *SyncClient) mainLoop() { ...@@ -328,6 +336,14 @@ func (s *SyncClient) mainLoop() {
ctx, cancel := context.WithTimeout(s.resCtx, maxResultProcessing) ctx, cancel := context.WithTimeout(s.resCtx, maxResultProcessing)
s.onResult(ctx, res) s.onResult(ctx, res)
cancel() cancel()
case check := <-s.inFlightChecks:
s.log.Info("Checking in flight", "num", check.num)
complete, ok := s.inFlight[check.num]
if !ok {
check.result <- false
} else {
check.result <- !complete.Load()
}
case <-s.resCtx.Done(): case <-s.resCtx.Done():
s.log.Info("stopped P2P req-resp L2 block sync client") s.log.Info("stopped P2P req-resp L2 block sync client")
return return
...@@ -335,6 +351,21 @@ func (s *SyncClient) mainLoop() { ...@@ -335,6 +351,21 @@ func (s *SyncClient) mainLoop() {
} }
} }
func (s *SyncClient) isInFlight(ctx context.Context, num uint64) (bool, error) {
check := inFlightCheck{num: num, result: make(chan bool, 1)}
select {
case s.inFlightChecks <- check:
case <-ctx.Done():
return false, errors.New("context cancelled when publishing in flight check")
}
select {
case res := <-check.result:
return res, nil
case <-ctx.Done():
return false, errors.New("context cancelled while waiting for in flight check response")
}
}
// onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state. // onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state.
// This function transforms requested block ranges into work for each peer. // This function transforms requested block ranges into work for each peer.
func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
......
...@@ -164,9 +164,13 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -164,9 +164,13 @@ func TestMultiPeerSync(t *testing.T) {
cfg, payloads := setupSyncTestData(100) cfg, payloads := setupSyncTestData(100)
// Buffered channel of all blocks requested from any client.
requested := make(chan uint64, 100)
setupPeer := func(ctx context.Context, h host.Host) (*SyncClient, chan *eth.ExecutionPayload) { setupPeer := func(ctx context.Context, h host.Host) (*SyncClient, chan *eth.ExecutionPayload) {
// Serving payloads: just load them from the map, if they exist // Serving payloads: just load them from the map, if they exist
servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) { servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) {
requested <- n
p, ok := payloads.getPayload(n) p, ok := payloads.getPayload(n)
if !ok { if !ok {
return nil, ethereum.NotFound return nil, ethereum.NotFound
...@@ -241,6 +245,20 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -241,6 +245,20 @@ func TestMultiPeerSync(t *testing.T) {
require.True(t, ok, "expecting known payload") require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload") require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
} }
// Wait for the request for block 25 to be made
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()
requestMade := false
for requestMade != true {
select {
case blockNum := <-requested:
if blockNum == 25 {
requestMade = true
}
case <-ctx.Done():
t.Fatal("Did not request block 25 in a reasonable time")
}
}
// the request for 25 should fail. See: // the request for 25 should fail. See:
// server: WARN peer requested unknown block by number num=25 // server: WARN peer requested unknown block by number num=25
// client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1" // client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1"
...@@ -251,7 +269,14 @@ func TestMultiPeerSync(t *testing.T) { ...@@ -251,7 +269,14 @@ func TestMultiPeerSync(t *testing.T) {
// But the re-request checks the status in the main loop, and it may thus look like it's still in-flight, // But the re-request checks the status in the main loop, and it may thus look like it's still in-flight,
// and thus not run the new request. // and thus not run the new request.
// Wait till the failed request is recognized as marked as done, so the re-request actually runs. // Wait till the failed request is recognized as marked as done, so the re-request actually runs.
for !clB.inFlight[25].Load() { ctx, cancelFunc = context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()
for {
isInFlight, err := clB.isInFlight(ctx, 25)
require.NoError(t, err)
if !isInFlight {
break
}
time.Sleep(time.Second) time.Sleep(time.Second)
} }
// And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine) // And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment