Commit eac94c70 authored by acud's avatar acud Committed by GitHub

fix(pushsync): originator replication (#2115)

parent 40dac40d
......@@ -107,7 +107,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
// Check is the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSync, storer)
if err == nil {
......
......@@ -200,15 +200,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()})
defer span.Finish()
receipt, err := ps.pushToClosest(ctx, chunk, false)
receipt, err := ps.pushToClosest(ctx, chunk, false, p.Address)
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
if time.Now().Before(ps.warmupPeriod) {
err = ErrWarmup
return
}
if !storedChunk {
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
......@@ -216,95 +210,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
}
count := 0
// Push the chunk to some peers in the neighborhood in parallel for replication.
// Any errors here should NOT impact the rest of the handler.
err = ps.topologyDriver.EachNeighbor(func(peer swarm.Address, po uint8) (bool, bool, error) {
// skip forwarding peer
if peer.Equal(p.Address) {
return false, false, nil
}
if count == nPeersToPushsync {
return true, false, nil
}
count++
go func(peer swarm.Address) {
var err error
defer func() {
if err != nil {
ps.logger.Tracef("pushsync replication: %v", err)
ps.metrics.TotalReplicatedError.Inc()
} else {
ps.metrics.TotalReplicated.Inc()
}
}()
// price for neighborhood replication
receiptPrice := ps.pricer.PeerPrice(peer, chunk.Address())
ctx, cancel := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer cancel()
err = ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil {
err = fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
return
}
defer ps.accounting.Release(peer, receiptPrice)
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
err = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
return
}
defer func() {
if err != nil {
ps.metrics.TotalErrors.Inc()
_ = streamer.Reset()
} else {
_ = streamer.FullClose()
}
}()
w, r := protobuf.NewWriterAndReader(streamer)
stamp, err := chunk.Stamp().MarshalBinary()
if err != nil {
return
}
err = w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: chunk.Address().Bytes(),
Data: chunk.Data(),
Stamp: stamp,
})
if err != nil {
return
}
var receipt pb.Receipt
if err = r.ReadMsgWithContext(ctx, &receipt); err != nil {
return
}
if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) {
// if the receipt is invalid, give up
return
}
err = ps.accounting.Credit(peer, receiptPrice, false)
}(peer)
return false, false, nil
})
if err != nil {
ps.logger.Tracef("pushsync replication closest peer: %w", err)
}
signature, err := ps.signer.Sign(ch.Address)
if err != nil {
return fmt.Errorf("receipt signature: %w", err)
......@@ -346,7 +251,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// a receipt from that peer and returns error or nil based on the receiving and
// the validity of the receipt.
func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) {
r, err := ps.pushToClosest(ctx, ch, true)
r, err := ps.pushToClosest(ctx, ch, true, swarm.ZeroAddress)
if err != nil {
return nil, err
}
......@@ -361,7 +266,7 @@ type pushResult struct {
attempted bool
}
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllowed bool) (*pb.Receipt, error) {
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllowed bool, origin swarm.Address) (*pb.Receipt, error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish()
defer ps.skipList.PruneExpired()
......@@ -385,6 +290,29 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
// ClosestPeer can return ErrNotFound in case we are not connected to any peers
// in which case we should return immediately.
// if ErrWantSelf is returned, it means we are the closest peer.
if errors.Is(err, topology.ErrWantSelf) {
if time.Now().Before(ps.warmupPeriod) {
return nil, ErrWarmup
}
count := 0
// Push the chunk to some peers in the neighborhood in parallel for replication.
// Any errors here should NOT impact the rest of the handler.
_ = ps.topologyDriver.EachNeighbor(func(peer swarm.Address, po uint8) (bool, bool, error) {
// skip forwarding peer
if peer.Equal(origin) {
return false, false, nil
}
if count == nPeersToPushsync {
return true, false, nil
}
count++
go ps.pushToNeighbour(peer, ch)
return false, false, nil
})
return nil, err
}
return nil, fmt.Errorf("closest peer: %w", err)
}
......@@ -504,6 +432,73 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
return &receipt, true, nil
}
// pushToNeighbour handles in-neighborhood replication for a single peer.
func (ps *PushSync) pushToNeighbour(peer swarm.Address, ch swarm.Chunk) {
var err error
defer func() {
if err != nil {
ps.logger.Tracef("pushsync replication: %v", err)
ps.metrics.TotalReplicatedError.Inc()
} else {
ps.metrics.TotalReplicated.Inc()
}
}()
// price for neighborhood replication
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
ctx, cancel := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer cancel()
err = ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil {
err = fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
return
}
defer ps.accounting.Release(peer, receiptPrice)
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
err = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
return
}
defer func() {
if err != nil {
ps.metrics.TotalErrors.Inc()
_ = streamer.Reset()
} else {
_ = streamer.FullClose()
}
}()
w, r := protobuf.NewWriterAndReader(streamer)
stamp, err := ch.Stamp().MarshalBinary()
if err != nil {
return
}
err = w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Stamp: stamp,
})
if err != nil {
return
}
var receipt pb.Receipt
if err = r.ReadMsgWithContext(ctx, &receipt); err != nil {
return
}
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
// if the receipt is invalid, give up
return
}
err = ps.accounting.Credit(peer, receiptPrice, false)
}
type peerSkipList struct {
sync.Mutex
chunks map[string]struct{}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment