Commit da6cd9b0 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Increase timeout for pending push operations (#844)

parent b65f50ef
...@@ -193,7 +193,7 @@ LOOP: ...@@ -193,7 +193,7 @@ LOOP:
select { select {
case <-closeC: case <-closeC:
case <-time.After(2 * time.Second): case <-time.After(5 * time.Second):
s.logger.Warning("pusher shutting down with pending operations") s.logger.Warning("pusher shutting down with pending operations")
} }
} }
...@@ -214,12 +214,13 @@ func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) error { ...@@ -214,12 +214,13 @@ func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) error {
} }
func (s *Service) Close() error { func (s *Service) Close() error {
s.logger.Info("pusher shutting down")
close(s.quit) close(s.quit)
// Wait for chunks worker to finish // Wait for chunks worker to finish
select { select {
case <-s.chunksWorkerQuitC: case <-s.chunksWorkerQuitC:
case <-time.After(3 * time.Second): case <-time.After(6 * time.Second):
} }
return nil return nil
} }
...@@ -31,20 +31,40 @@ var noOfRetries = 20 ...@@ -31,20 +31,40 @@ var noOfRetries = 20
// Wrap the actual storer to intercept the modeSet that the pusher will call when a valid receipt is received // Wrap the actual storer to intercept the modeSet that the pusher will call when a valid receipt is received
type Store struct { type Store struct {
storage.Storer storage.Storer
modeSet map[string]storage.ModeSet internalStorer storage.Storer
modeSetMu *sync.Mutex modeSet map[string]storage.ModeSet
modeSetMu *sync.Mutex
closed bool
setBeforeCloseCount int
setAfterCloseCount int
} }
// Override the Set function to capture the ModeSetSyncPush // Override the Set function to capture the ModeSetSyncPush
func (s Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) error { func (s *Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) error {
s.modeSetMu.Lock() s.modeSetMu.Lock()
defer s.modeSetMu.Unlock() defer s.modeSetMu.Unlock()
if s.closed {
s.setAfterCloseCount++
} else {
s.setBeforeCloseCount++
}
for _, addr := range addrs { for _, addr := range addrs {
s.modeSet[addr.String()] = mode s.modeSet[addr.String()] = mode
} }
return nil return nil
} }
func (s *Store) Close() error {
s.modeSetMu.Lock()
defer s.modeSetMu.Unlock()
s.closed = true
return s.internalStorer.Close()
}
// TestSendChunkToPushSync sends a chunk to pushsync to be sent ot its closest peer and get a receipt. // TestSendChunkToPushSync sends a chunk to pushsync to be sent ot its closest peer and get a receipt.
// once the receipt is got this check to see if the localstore is updated to see if the chunk is set // once the receipt is got this check to see if the localstore is updated to see if the chunk is set
// as ModeSetSyncPush status. // as ModeSetSyncPush status.
...@@ -214,6 +234,107 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { ...@@ -214,6 +234,107 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
} }
} }
func TestPusherClose(t *testing.T) {
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
var (
goFuncStartedC = make(chan struct{})
pusherClosedC = make(chan struct{})
goFuncAfterCloseC = make(chan struct{})
)
defer func() {
close(goFuncStartedC)
close(pusherClosedC)
close(goFuncAfterCloseC)
}()
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
goFuncStartedC <- struct{}{}
<-goFuncAfterCloseC
return nil, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
chunk := createChunk()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
storer.modeSetMu.Lock()
if storer.closed == true {
t.Fatal("store should not be closed")
}
if storer.setBeforeCloseCount != 0 {
t.Fatalf("store 'Set' called %d times before close, expected 0", storer.setBeforeCloseCount)
}
if storer.setAfterCloseCount != 0 {
t.Fatalf("store 'Set' called %d times after close, expected 0", storer.setAfterCloseCount)
}
storer.modeSetMu.Unlock()
select {
case <-goFuncStartedC:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting to start worker job")
}
// close in the background
go func() {
p.Close()
storer.Close()
pusherClosedC <- struct{}{}
}()
select {
case <-pusherClosedC:
case <-time.After(2 * time.Second):
// internal 5 second timeout that waits for all pending push operations to terminate
}
storer.modeSetMu.Lock()
if storer.setBeforeCloseCount != 0 {
t.Fatalf("store 'Set' called %d times before close, expected 0", storer.setBeforeCloseCount)
}
if storer.setAfterCloseCount != 0 {
t.Fatalf("store 'Set' called %d times after close, expected 0", storer.setAfterCloseCount)
}
storer.modeSetMu.Unlock()
select {
case goFuncAfterCloseC <- struct{}{}:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for chunk")
}
// we need this to allow some goroutines to complete
time.Sleep(100 * time.Millisecond)
storer.modeSetMu.Lock()
if storer.closed != true {
t.Fatal("store should be closed")
}
if storer.setBeforeCloseCount != 1 {
t.Fatalf("store 'Set' called %d times before close, expected 1", storer.setBeforeCloseCount)
}
if storer.setAfterCloseCount != 0 {
t.Fatalf("store 'Set' called %d times after close, expected 0", storer.setAfterCloseCount)
}
storer.modeSetMu.Unlock()
// should be closed by now
select {
case <-pusherClosedC:
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting to close pusher")
}
}
func createChunk() swarm.Chunk { func createChunk() swarm.Chunk {
// chunk data to upload // chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000") chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
...@@ -232,9 +353,10 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus ...@@ -232,9 +353,10 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
mockStatestore := statestore.NewStateStore() mockStatestore := statestore.NewStateStore()
mtags := tags.NewTags(mockStatestore, logger) mtags := tags.NewTags(mockStatestore, logger)
pusherStorer := &Store{ pusherStorer := &Store{
Storer: storer, Storer: storer,
modeSet: make(map[string]storage.ModeSet), internalStorer: storer,
modeSetMu: &sync.Mutex{}, modeSet: make(map[string]storage.ModeSet),
modeSetMu: &sync.Mutex{},
} }
peerSuggester := mock.NewTopologyDriver(mockOpts...) peerSuggester := mock.NewTopologyDriver(mockOpts...)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment