Commit e0f50061 authored by acud's avatar acud Committed by GitHub

pushsync: package refactor (#1176)

* pushsync: reuse overlapping logic, improve instrumentation, don't bail on invalid receipt
parent 4abf0717
......@@ -26,7 +26,7 @@ type Service struct {
storer storage.Storer
pushSyncer pushsync.PushSyncer
logger logging.Logger
tagg *tags.Tags
tag *tags.Tags
tracer *tracing.Tracer
metrics metrics
quit chan struct{}
......@@ -42,7 +42,7 @@ func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer
service := &Service{
storer: storer,
pushSyncer: pushSyncer,
tagg: tagger,
tag: tagger,
logger: logger,
tracer: tracer,
metrics: newMetrics(),
......@@ -127,6 +127,8 @@ LOOP:
var (
err error
startTime = time.Now()
t *tags.Tag
setSent bool
)
defer func() {
if err == nil {
......@@ -147,16 +149,36 @@ LOOP:
// for now ignoring the receipt and checking only for error
_, err = s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil {
if !errors.Is(err, topology.ErrNotFound) {
logger.Debugf("pusher: error while sending chunk or receiving receipt: %v", err)
if errors.Is(err, topology.ErrWantSelf) {
// we are the closest ones - this is fine
// this is to make sure that the sent number does not diverge from the synced counter
// the edge case is on the uploader node, in the case where the uploader node is
// connected to other nodes, but is the closest one to the chunk.
setSent = true
} else {
return
}
}
if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
err = fmt.Errorf("pusher: set sync: %w", err)
return
}
err = s.setChunkAsSynced(ctx, ch)
t, err = s.tag.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSynced)
if err != nil {
logger.Debugf("pusher: error setting chunk as synced: %v", err)
err = fmt.Errorf("pusher: increment synced: %v", err)
return
}
if setSent {
err = t.Inc(tags.StateSent)
if err != nil {
err = fmt.Errorf("pusher: increment sent: %w", err)
return
}
}
}
}(ctx, ch)
case <-timer.C:
// initially timer is set to go off as well as every time we hit the end of push index
......@@ -209,21 +231,6 @@ LOOP:
}
}
func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) error {
if err := s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
return fmt.Errorf("set synced: %w", err)
}
t, err := s.tagg.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSynced)
if err != nil {
return err
}
}
return nil
}
func (s *Service) Close() error {
s.logger.Info("pusher shutting down")
close(s.quit)
......
......@@ -99,7 +99,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
_ = stream.FullClose()
}
}()
var ch pb.Delivery
if err = r.ReadMsgWithContext(ctx, &ch); err != nil {
return fmt.Errorf("pushsync read delivery: %w", err)
......@@ -112,115 +111,57 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if ps.unwrap != nil {
go ps.unwrap(chunk)
}
} else {
if !soc.Valid(chunk) {
} else if !soc.Valid(chunk) {
return swarm.ErrInvalidChunk
}
}
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()})
defer span.Finish()
// Select the closest peer to forward the chunk
peer, err := ps.peerSuggester.ClosestPeer(chunk.Address())
receipt, err := ps.pushToClosest(ctx, chunk)
if err != nil {
// If i am the closest peer then store the chunk and send receipt
if errors.Is(err, topology.ErrWantSelf) {
return ps.handleDeliveryResponse(ctx, w, p, chunk)
}
return err
}
// This is a special situation in that the other peer thinks thats we are the closest node
// and we think that the sending peer is the closest
if p.Address.Equal(peer) {
return ps.handleDeliveryResponse(ctx, w, p, chunk)
}
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, chunk.Address())
err = ps.accounting.Reserve(ctx, peer, receiptPrice)
// store the chunk in the local store
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
return fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
}
defer ps.accounting.Release(peer, receiptPrice)
// Forward chunk to closest peer
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return fmt.Errorf("new stream peer %s: %w", peer.String(), err)
}
defer func() {
if err != nil {
_ = streamer.Reset()
} else {
go streamer.FullClose()
}
}()
wc, rc := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(ctx, wc, chunk); err != nil {
return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err)
return fmt.Errorf("chunk store: %w", err)
}
receipt, err := ps.receiveReceipt(ctx, rc)
if err != nil {
return fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
receipt := pb.Receipt{Address: chunk.Address().Bytes()}
if err := w.WriteMsg(&receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
// Check if the receipt is valid
if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) {
return fmt.Errorf("invalid receipt from peer %s", peer.String())
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
}
err = ps.accounting.Credit(peer, receiptPrice)
if err != nil {
return err
return fmt.Errorf("handler: push to closest: %w", err)
}
// pass back the received receipt in the previously received stream
err = ps.sendReceipt(ctx, w, &receipt)
if err != nil {
return fmt.Errorf("send receipt to peer %s: %w", peer.String(), err)
}
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
}
func (ps *PushSync) sendChunkDelivery(ctx context.Context, w protobuf.Writer, chunk swarm.Chunk) (err error) {
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
return w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: chunk.Address().Bytes(),
Data: chunk.Data(),
})
}
func (ps *PushSync) sendReceipt(ctx context.Context, w protobuf.Writer, receipt *pb.Receipt) (err error) {
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := w.WriteMsgWithContext(ctx, receipt); err != nil {
return err
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
return nil
}
func (ps *PushSync) receiveReceipt(ctx context.Context, r protobuf.Reader) (receipt pb.Receipt, err error) {
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := r.ReadMsgWithContext(ctx, &receipt); err != nil {
return receipt, err
}
return receipt, nil
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
}
// PushChunkToClosest sends chunk to the closest peer by opening a stream. It then waits for
// a receipt from that peer and returns error or nil based on the receiving and
// the validity of the receipt.
func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (r *Receipt, reterr error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-push", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish()
func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) {
r, err := ps.pushToClosest(ctx, ch)
if err != nil {
return nil, err
}
return &Receipt{Address: swarm.NewAddress(r.Address)}, nil
}
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.Receipt, reterr error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish()
var (
skipPeers []swarm.Address
lastErr error
......@@ -246,31 +187,19 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (r *
defersFn()
// find next closes peer
peer, err := ps.peerSuggester.ClosestPeer(ch.Address(), skipPeers...)
if err != nil {
if errors.Is(err, topology.ErrNotFound) {
// NOTE: needed for tests
skipPeers = append(skipPeers, peer)
continue
deferFuncs = append(deferFuncs, func() {
if lastErr != nil {
ps.metrics.TotalErrors.Inc()
logger.Errorf("pushsync: %v", lastErr)
}
})
if errors.Is(err, topology.ErrWantSelf) {
// this is to make sure that the sent number does not diverge from the synced counter
t, err := ps.tagger.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSent)
// find next closest peer
peer, err := ps.peerSuggester.ClosestPeer(ch.Address(), skipPeers...)
if err != nil {
return nil, err
}
}
// if you are the closest node return a receipt immediately
return &Receipt{
Address: ch.Address(),
}, nil
}
// ClosestPeer can return ErrNotFound in case we are not connected to any peers
// in which case we should return immediately.
// if ErrWantSelf is returned, it means we are the closest peer.
return nil, fmt.Errorf("closest peer: %w", err)
}
......@@ -287,19 +216,20 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (r *
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
ps.metrics.TotalErrors.Inc()
lastErr = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
logger.Debugf("pushsync-push: %v", lastErr)
continue
}
deferFuncs = append(deferFuncs, func() { go streamer.FullClose() })
w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(ctx, w, ch); err != nil {
ps.metrics.TotalErrors.Inc()
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
}); err != nil {
_ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err)
logger.Debugf("pushsync-push: %v", lastErr)
continue
}
......@@ -310,23 +240,25 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (r *
if err == nil && t != nil {
err = t.Inc(tags.StateSent)
if err != nil {
lastErr = fmt.Errorf("tag %d increment: %v", ch.TagID(), err)
err = lastErr
return nil, err
}
}
receipt, err := ps.receiveReceipt(ctx, r)
if err != nil {
ps.metrics.TotalErrors.Inc()
var receipt pb.Receipt
cctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := r.ReadMsgWithContext(cctx, &receipt); err != nil {
_ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address().String(), peer.String(), err)
logger.Debugf("pushsync-push: %v", lastErr)
continue
}
// Check if the receipt is valid
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
_ = streamer.Reset()
return nil, fmt.Errorf("invalid receipt. peer %s", peer.String())
// if the receipt is invalid, try to push to the next peer
lastErr = fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address().String(), peer.String())
continue
}
err = ps.accounting.Credit(peer, receiptPrice)
......@@ -334,14 +266,10 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (r *
return nil, err
}
rec := &Receipt{
Address: swarm.NewAddress(receipt.Address),
}
return rec, nil
return &receipt, nil
}
logger.Tracef("pushsync-push: failed to push chunk %s: reached max peers of %v", ch.Address(), maxPeers)
logger.Tracef("pushsync: chunk %s: reached %v peers", ch.Address(), maxPeers)
if lastErr != nil {
return nil, lastErr
......@@ -349,20 +277,3 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (r *
return nil, topology.ErrNotFound
}
func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Writer, p p2p.Peer, chunk swarm.Chunk) error {
// Store the chunk in the local store
_, err := ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
return fmt.Errorf("chunk store: %w", err)
}
// Send a receipt immediately once the storage of the chunk is successfully
receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
err = ps.sendReceipt(ctx, w, receipt)
if err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
}
......@@ -96,8 +96,8 @@ func TestPushChunkToClosest(t *testing.T) {
// chunk data to upload
chunk := testingc.FixtureChunk("7000")
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1
pivotNode := swarm.MustParseHexAddress("0000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000") // binary 0110 -> po 1
callbackC := make(chan struct{}, 1)
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
......@@ -181,10 +181,10 @@ func TestPushChunkToNextClosest(t *testing.T) {
chunk := testingc.FixtureChunk("7000")
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
pivotNode := swarm.MustParseHexAddress("0000") // base is 0000
peer1 := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
peer1 := swarm.MustParseHexAddress("6000")
peer2 := swarm.MustParseHexAddress("5000")
peers := []swarm.Address{
peer1,
peer2,
......@@ -225,7 +225,6 @@ func TestPushChunkToNextClosest(t *testing.T) {
// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil,
mock.WithClosestPeerErr(topology.ErrNotFound),
mock.WithPeers(peers...),
)
defer storerPivot.Close()
......
......@@ -87,14 +87,18 @@ func (d *mock) Peers() []swarm.Address {
func (d *mock) ClosestPeer(_ swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
if len(skipPeers) == 0 {
if d.closestPeerErr != nil {
return d.closestPeer, d.closestPeerErr
}
if !d.closestPeer.Equal(swarm.ZeroAddress) {
return d.closestPeer, nil
}
}
d.mtx.Lock()
defer d.mtx.Unlock()
skipPeer := false
for _, p := range d.peers {
for _, a := range skipPeers {
if a.Equal(p) {
......@@ -113,7 +117,6 @@ func (d *mock) ClosestPeer(_ swarm.Address, skipPeers ...swarm.Address) (peerAdd
if peerAddr.IsZero() {
return peerAddr, topology.ErrNotFound
}
return peerAddr, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment