Commit 0b7f1c63 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

Assume that tags cannot be present for all chunks since bzz api didnt implement it (#260)

parent c1bac941
...@@ -86,9 +86,11 @@ func (s *Service) chunksWorker() { ...@@ -86,9 +86,11 @@ func (s *Service) chunksWorker() {
t, err := s.tag.GetByAddress(ch.Address()) t, err := s.tag.GetByAddress(ch.Address())
if err != nil { if err != nil {
s.logger.Debugf("pusher: get tag by address %s: %v", ch.Address(), err) s.logger.Debugf("pusher: get tag by address %s: %v", ch.Address(), err)
continue //continue // // until bzz api implements tags dont continue here
} else {
// update the tags only if we get it
t.Inc(tags.StateSent)
} }
t.Inc(tags.StateSent)
// Later when we process receipt, get the receipt and process it // Later when we process receipt, get the receipt and process it
// for now ignoring the receipt and checking only for error // for now ignoring the receipt and checking only for error
...@@ -137,9 +139,12 @@ func (s *Service) setChunkAsSynced(ctx context.Context, addr swarm.Address) { ...@@ -137,9 +139,12 @@ func (s *Service) setChunkAsSynced(ctx context.Context, addr swarm.Address) {
ta, err := s.tag.GetByAddress(addr) ta, err := s.tag.GetByAddress(addr)
if err != nil { if err != nil {
s.logger.Debugf("pusher: get tag by address %s: %v", addr, err) s.logger.Debugf("pusher: get tag by address %s: %v", addr, err)
return // return // until bzz api implements tags dont retunrn here
} else {
// update the tags only if we get it
ta.Inc(tags.StateSynced)
} }
ta.Inc(tags.StateSynced)
} }
} }
......
...@@ -7,12 +7,13 @@ package pusher_test ...@@ -7,12 +7,13 @@ package pusher_test
import ( import (
"context" "context"
"errors" "errors"
"github.com/ethersphere/bee/pkg/tags"
"io/ioutil" "io/ioutil"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/localstore" "github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pusher" "github.com/ethersphere/bee/pkg/pusher"
...@@ -89,6 +90,50 @@ func TestSendChunkToPushSync(t *testing.T) { ...@@ -89,6 +90,50 @@ func TestSendChunkToPushSync(t *testing.T) {
p.Close() p.Close()
} }
// TestSendChunkToPushSyncWithoutTag is similar to TestSendChunkToPushSync, excep that the tags are not
// present to simulate bzz api withotu splitter condition
func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
chunk := createChunk()
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
}
return receipt, nil
})
mtag := tags.NewTags()
_, err := mtag.Create("name", 1, false)
if err != nil {
t.Fatal(err)
}
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
if err == nil {
break
}
}
if err != nil {
t.Fatal(err)
}
p.Close()
}
// TestSendChunkAndReceiveInvalidReceipt sends a chunk to pushsync to be sent ot its closest peer and // TestSendChunkAndReceiveInvalidReceipt sends a chunk to pushsync to be sent ot its closest peer and
// get a invalid receipt (not with the address of the chunk sent). The test makes sure that this error // get a invalid receipt (not with the address of the chunk sent). The test makes sure that this error
// is received and the ModeSetSyncPush is not set for the chunk. // is received and the ModeSetSyncPush is not set for the chunk.
...@@ -158,6 +203,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { ...@@ -158,6 +203,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
tag.Address = chunk.Address() tag.Address = chunk.Address()
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer)) p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
defer storer.Close() defer storer.Close()
defer p.Close()
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk) _, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil { if err != nil {
...@@ -177,7 +223,6 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { ...@@ -177,7 +223,6 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
if err == nil { if err == nil {
t.Fatalf("chunk not syned error expected") t.Fatalf("chunk not syned error expected")
} }
p.Close()
} }
func createChunk() swarm.Chunk { func createChunk() swarm.Chunk {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment