Commit 657c723f authored by istae's avatar istae Committed by GitHub

feat: pusher skipping syncing for chunks with expired stamps (#2392)

parent 0f012ed9
......@@ -643,7 +643,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler)
}
pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer, warmupTime)
pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, validStamp, tagService, logger, tracer, warmupTime)
b.pusherCloser = pusherService
pullStorage := pullstorage.New(storer)
......
......@@ -19,6 +19,7 @@ import (
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -34,6 +35,7 @@ type Service struct {
networkID uint64
storer storage.Storer
pushSyncer pushsync.PushSyncer
validStamp postage.ValidStampFn
depther topology.NeighborhoodDepther
logger logging.Logger
tag *tags.Tags
......@@ -54,11 +56,12 @@ var (
ErrShallowReceipt = errors.New("shallow recipt")
)
func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service {
func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, validStamp postage.ValidStampFn, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service {
service := &Service{
networkID: networkID,
storer: storer,
pushSyncer: pushSyncer,
validStamp: validStamp,
depther: depther,
tag: tagger,
logger: logger,
......@@ -121,6 +124,27 @@ LOOP:
break
}
// If the stamp is invalid, the chunk is not synced with the network
// since other nodes would reject the chunk, so the chunk is marked as
// synced which makes it available to the node but not to the network
stampBytes, err := ch.Stamp().MarshalBinary()
if err != nil {
s.logger.Errorf("pusher: stamp marshal: %w", err)
if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
s.logger.Errorf("pusher: set sync: %w", err)
}
continue
}
_, err = s.validStamp(ch, stampBytes)
if err != nil {
s.logger.Warningf("pusher: stamp with batch ID %x is no longer valid, skipping syncing for chunk %s: %v", ch.Stamp().BatchID(), ch.Address().String(), err)
if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
s.logger.Errorf("pusher: set sync: %w", err)
}
continue
}
if span == nil {
mtx.Lock()
span, logger, ctx = s.tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger)
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/postage"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/localstore"
......@@ -32,6 +33,9 @@ import (
// no of times to retry to see if we have received response from pushsync
var noOfRetries = 20
var block = common.HexToHash("0x1").Bytes()
var defaultMockValidStamp = func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch, nil
}
// Wrap the actual storer to intercept the modeSet that the pusher will call when a valid receipt is received
type Store struct {
......@@ -91,7 +95,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
return receipt, nil
})
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
......@@ -107,7 +111,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(50 * time.Millisecond)
......@@ -148,7 +152,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
......@@ -157,7 +161,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(50 * time.Millisecond)
......@@ -186,7 +190,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
return nil, errors.New("invalid receipt")
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer))
defer storer.Close()
defer p.Close()
......@@ -195,7 +199,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
......@@ -234,7 +238,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
......@@ -243,7 +247,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
......@@ -290,7 +294,7 @@ func TestPusherClose(t *testing.T) {
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
chunk := testingc.GenerateTestRandomChunk()
......@@ -397,7 +401,7 @@ func TestPusherRetryShallow(t *testing.T) {
// create the pivot peer pusher with depth 31, this makes
// sure that virtually any receipt generated by the random
// key will be considered too shallow
_, ps, storer := createPusher(t, pivotPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(31))
_, ps, storer := createPusher(t, pivotPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(31))
defer ps.Close()
// generate a chunk at PO 1 with closestPeer, meaning that we get a
......@@ -424,7 +428,56 @@ func TestPusherRetryShallow(t *testing.T) {
t.Fatalf("timed out waiting for retries. got %d want %d", c, *pusher.RetryCount)
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
// TestChunkWithInvalidStampSkipped tests that chunks with invalid stamps are skipped in pusher
func TestChunkWithInvalidStampSkipped(t *testing.T) {
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
BlockHash: block,
}
return receipt, nil
})
validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return nil, errors.New("valid stamp error")
}
_, p, storer := createPusher(t, triggerPeer, pushSyncService, validStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
chunk := testingc.GenerateTestRandomChunk()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(50 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSync, storer)
if err == nil {
break
}
}
if err != nil {
t.Fatal(err)
}
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, validStamp postage.ValidStampFn, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, nil, logger)
......@@ -442,7 +495,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(1, pusherStorer, peerSuggester, pushSyncService, mtags, logger, nil, 0)
pusherService := pusher.New(1, pusherStorer, peerSuggester, pushSyncService, validStamp, mtags, logger, nil, 0)
return mtags, pusherService, pusherStorer
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment