Commit 7ba295a8 authored by metacertain's avatar metacertain Committed by GitHub

fix: delay concluding attempt in pushsync (#2016)

require a successful write request before concluding pushsync was attempted as a forwarder
parent ebbaf404
...@@ -438,7 +438,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C ...@@ -438,7 +438,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return nil, true, fmt.Errorf("new stream for peer %s: %w", peer, err) return nil, false, fmt.Errorf("new stream for peer %s: %w", peer, err)
} }
defer streamer.Close() defer streamer.Close()
...@@ -449,7 +449,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C ...@@ -449,7 +449,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
Stamp: stamp, Stamp: stamp,
}); err != nil { }); err != nil {
_ = streamer.Reset() _ = streamer.Reset()
return nil, true, fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address(), peer, err) return nil, false, fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address(), peer, err)
} }
ps.metrics.TotalSent.Inc() ps.metrics.TotalSent.Inc()
......
...@@ -933,10 +933,8 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { ...@@ -933,10 +933,8 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
) )
defer storerPeer4.Close() defer storerPeer4.Close()
var ( triggerCount := 0
fail = true var lock sync.Mutex
lock sync.Mutex
)
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithPeerProtocols( streamtest.WithPeerProtocols(
...@@ -947,15 +945,25 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { ...@@ -947,15 +945,25 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
peer4.String(): psPeer4.Protocol(), peer4.String(): psPeer4.Protocol(),
}, },
), ),
streamtest.WithStreamError( streamtest.WithMiddlewares(
func(addr swarm.Address, _, _, _ string) error { func(h p2p.HandlerFunc) p2p.HandlerFunc {
lock.Lock() return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
defer lock.Unlock() lock.Lock()
if fail && addr.String() != peer4.String() { defer lock.Unlock()
return errors.New("peer not reachable")
}
return nil if triggerCount < 9 {
triggerCount++
stream.Close()
return errors.New("new error")
}
if err := h(ctx, peer, stream); err != nil {
return err
}
// close stream after all previous middlewares wrote to it
// so that the receiving peer can get all the post messages
return stream.Close()
}
}, },
), ),
streamtest.WithBaseAddr(pivotNode), streamtest.WithBaseAddr(pivotNode),
...@@ -1008,7 +1016,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { ...@@ -1008,7 +1016,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
} }
// out of 4, 3 peers should return accouting error. So we should have effectively // out of 4, 3 peers should return accouting error. So we should have effectively
// sent only 1 msg // sent only 1 msg
if ta2.Get(tags.StateSent) != 1 { if ta2.Get(tags.StateSent) != 10 {
t.Fatalf("tags error") t.Fatalf("tags error")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment