Commit 73012b0e authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5800 from ethereum-optimism/sync-test-race-fix

op-node: fix TestMultiPeerSync race condition
parents 30a5aa10 2b558e5e
......@@ -368,6 +368,7 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
}
if _, ok := s.inFlight[num]; ok {
log.Debug("request still in-flight, not rescheduling sync request", "num", num)
continue // request still in flight
}
pr := peerRequest{num: num, complete: new(atomic.Bool)}
......
......@@ -3,7 +3,9 @@ package p2p
import (
"context"
"math/big"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
......@@ -29,7 +31,42 @@ func (fn mockPayloadFn) PayloadByNumber(_ context.Context, number uint64) (*eth.
var _ L2Chain = mockPayloadFn(nil)
func setupSyncTestData(length uint64) (*rollup.Config, map[uint64]*eth.ExecutionPayload, func(i uint64) eth.L2BlockRef) {
type syncTestData struct {
sync.RWMutex
payloads map[uint64]*eth.ExecutionPayload
}
func (s *syncTestData) getPayload(i uint64) (payload *eth.ExecutionPayload, ok bool) {
s.RLock()
defer s.RUnlock()
payload, ok = s.payloads[i]
return payload, ok
}
func (s *syncTestData) deletePayload(i uint64) {
s.Lock()
defer s.Unlock()
delete(s.payloads, i)
}
func (s *syncTestData) addPayload(payload *eth.ExecutionPayload) {
s.Lock()
defer s.Unlock()
s.payloads[uint64(payload.BlockNumber)] = payload
}
func (s *syncTestData) getBlockRef(i uint64) eth.L2BlockRef {
s.RLock()
defer s.RUnlock()
return eth.L2BlockRef{
Hash: s.payloads[i].BlockHash,
Number: uint64(s.payloads[i].BlockNumber),
ParentHash: s.payloads[i].ParentHash,
Time: uint64(s.payloads[i].Timestamp),
}
}
func setupSyncTestData(length uint64) (*rollup.Config, *syncTestData) {
// minimal rollup config to build mock blocks & verify their time.
cfg := &rollup.Config{
Genesis: rollup.Genesis{
......@@ -57,15 +94,7 @@ func setupSyncTestData(length uint64) (*rollup.Config, map[uint64]*eth.Execution
payloads[i] = payload
}
l2Ref := func(i uint64) eth.L2BlockRef {
return eth.L2BlockRef{
Hash: payloads[i].BlockHash,
Number: uint64(payloads[i].BlockNumber),
ParentHash: payloads[i].ParentHash,
Time: uint64(payloads[i].Timestamp),
}
}
return cfg, payloads, l2Ref
return cfg, &syncTestData{payloads: payloads}
}
func TestSinglePeerSync(t *testing.T) {
......@@ -73,11 +102,11 @@ func TestSinglePeerSync(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
cfg, payloads, l2Ref := setupSyncTestData(25)
cfg, payloads := setupSyncTestData(25)
// Serving payloads: just load them from the map, if they exist
servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) {
p, ok := payloads[n]
p, ok := payloads.getPayload(n)
if !ok {
return nil, ethereum.NotFound
}
......@@ -116,13 +145,13 @@ func TestSinglePeerSync(t *testing.T) {
defer cl.Close()
// request to start syncing between 10 and 20
require.NoError(t, cl.RequestL2Range(ctx, l2Ref(10), l2Ref(20)))
require.NoError(t, cl.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(20)))
// and wait for the sync results to come in (in reverse order)
for i := uint64(19); i > 10; i-- {
p := <-received
require.Equal(t, uint64(p.BlockNumber), i, "expecting payloads in order")
exp, ok := payloads[uint64(p.BlockNumber)]
exp, ok := payloads.getPayload(uint64(p.BlockNumber))
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
}
......@@ -131,14 +160,14 @@ func TestSinglePeerSync(t *testing.T) {
func TestMultiPeerSync(t *testing.T) {
t.Parallel() // Takes a while, but can run in parallel
log := testlog.Logger(t, log.LvlError)
log := testlog.Logger(t, log.LvlDebug)
cfg, payloads, l2Ref := setupSyncTestData(100)
cfg, payloads := setupSyncTestData(100)
setupPeer := func(ctx context.Context, h host.Host) (*SyncClient, chan *eth.ExecutionPayload) {
// Serving payloads: just load them from the map, if they exist
servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) {
p, ok := payloads[n]
p, ok := payloads.getPayload(n)
if !ok {
return nil, ethereum.NotFound
}
......@@ -190,23 +219,25 @@ func TestMultiPeerSync(t *testing.T) {
clC.Start()
defer clC.Close()
// request to start syncing between 10 and 100
require.NoError(t, clA.RequestL2Range(ctx, l2Ref(10), l2Ref(90)))
// request to start syncing between 10 and 90
require.NoError(t, clA.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(90)))
// With such large range to request we are going to hit the rate-limits of B and C,
// but that means we'll balance the work between the peers.
p := <-recvA
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
for i := uint64(89); i > 10; i-- { // wait for all payloads
p := <-recvA
exp, ok := payloads.getPayload(uint64(p.BlockNumber))
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
}
// now see if B can sync a range, and fill the gap with a re-request
bl25 := payloads[25] // temporarily remove it from the available payloads. This will create a gap
delete(payloads, uint64(25))
require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(30)))
bl25, _ := payloads.getPayload(25) // temporarily remove it from the available payloads. This will create a gap
payloads.deletePayload(25)
require.NoError(t, clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(30)))
for i := uint64(29); i > 25; i-- {
p := <-recvB
exp, ok := payloads[uint64(p.BlockNumber)]
exp, ok := payloads.getPayload(uint64(p.BlockNumber))
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
}
......@@ -215,13 +246,19 @@ func TestMultiPeerSync(t *testing.T) {
// client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1"
require.Zero(t, len(recvB), "there is a gap, should not see other payloads yet")
// Add back the block
payloads[25] = bl25
payloads.addPayload(bl25)
// race-condition fix: the request for 25 is expected to error, but is marked as complete in the peer-loop.
// But the re-request checks the status in the main loop, and it may thus look like it's still in-flight,
// and thus not run the new request.
// Wait till the failed request is recognized as marked as done, so the re-request actually runs.
for !clB.inFlight[25].Load() {
time.Sleep(time.Second)
}
// And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine)
require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(26)))
require.NoError(t, clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(26)))
for i := uint64(25); i > 20; i-- {
p := <-recvB
exp, ok := payloads[uint64(p.BlockNumber)]
exp, ok := payloads.getPayload(uint64(p.BlockNumber))
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment