Commit 7f0760a8 authored by istae's avatar istae Committed by GitHub

fix: disable forwarding node stamp verification (#2233)

parent a7789208
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/pricer" "github.com/ethersphere/bee/pkg/pricer"
"github.com/ethersphere/bee/pkg/pushsync/pb" "github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/soc" "github.com/ethersphere/bee/pkg/soc"
...@@ -139,9 +140,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -139,9 +140,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
ps.metrics.TotalReceived.Inc() ps.metrics.TotalReceived.Inc()
chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data) chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data)
if chunk, err = ps.validStamp(chunk, ch.Stamp); err != nil { chunkAddress := chunk.Address()
return fmt.Errorf("pushsync valid stamp: %w", err) stamp := new(postage.Stamp)
// attaching the stamp is required becase pushToClosest expects a chunk with a stamp
err = stamp.UnmarshalBinary(ch.Stamp)
if err != nil {
return fmt.Errorf("pushsync stamp unmarshall: %w", err)
} }
chunk.WithStamp(stamp)
if cac.Valid(chunk) { if cac.Valid(chunk) {
if ps.unwrap != nil { if ps.unwrap != nil {
...@@ -151,16 +157,21 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -151,16 +157,21 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return swarm.ErrInvalidChunk return swarm.ErrInvalidChunk
} }
price := ps.pricer.Price(chunk.Address()) price := ps.pricer.Price(chunkAddress)
// if the peer is closer to the chunk, AND it's a full node, we were selected for replication. Return early. // if the peer is closer to the chunk, AND it's a full node, we were selected for replication. Return early.
if p.FullNode { if p.FullNode {
bytes := chunk.Address().Bytes() bytes := chunkAddress.Bytes()
if dcmp, _ := swarm.DistanceCmp(bytes, p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 { if dcmp, _ := swarm.DistanceCmp(bytes, p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 {
if ps.topologyDriver.IsWithinDepth(chunk.Address()) { if ps.topologyDriver.IsWithinDepth(chunkAddress) {
ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor) ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer canceld() defer canceld()
chunk, err = ps.validStamp(chunk, ch.Stamp)
if err != nil {
return fmt.Errorf("pushsync valid stamp: %w", err)
}
_, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk) _, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk)
if err != nil { if err != nil {
return fmt.Errorf("chunk store: %w", err) return fmt.Errorf("chunk store: %w", err)
...@@ -191,7 +202,13 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -191,7 +202,13 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// forwarding replication // forwarding replication
storedChunk := false storedChunk := false
if ps.topologyDriver.IsWithinDepth(chunk.Address()) { if ps.topologyDriver.IsWithinDepth(chunkAddress) {
chunk, err = ps.validStamp(chunk, ch.Stamp)
if err != nil {
return fmt.Errorf("pushsync valid stamp: %w", err)
}
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk) _, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil { if err != nil {
ps.logger.Warningf("pushsync: within depth peer's attempt to store chunk failed: %v", err) ps.logger.Warningf("pushsync: within depth peer's attempt to store chunk failed: %v", err)
...@@ -200,13 +217,19 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -200,13 +217,19 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
} }
} }
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()}) span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()})
defer span.Finish() defer span.Finish()
receipt, err := ps.pushToClosest(ctx, chunk, false, p.Address) receipt, err := ps.pushToClosest(ctx, chunk, false, p.Address)
if err != nil { if err != nil {
if errors.Is(err, topology.ErrWantSelf) { if errors.Is(err, topology.ErrWantSelf) {
if !storedChunk { if !storedChunk {
chunk, err = ps.validStamp(chunk, ch.Stamp)
if err != nil {
return fmt.Errorf("pushsync valid stamp: %w", err)
}
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk) _, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil { if err != nil {
return fmt.Errorf("chunk store: %w", err) return fmt.Errorf("chunk store: %w", err)
...@@ -225,7 +248,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -225,7 +248,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
} }
defer debit.Cleanup() defer debit.Cleanup()
receipt := pb.Receipt{Address: chunk.Address().Bytes(), Signature: signature, BlockHash: ps.blockHash} receipt := pb.Receipt{Address: chunkAddress.Bytes(), Signature: signature, BlockHash: ps.blockHash}
if err := w.WriteMsgWithContext(ctx, &receipt); err != nil { if err := w.WriteMsgWithContext(ctx, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err) return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
} }
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest" "github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/postage"
pricermock "github.com/ethersphere/bee/pkg/pricer/mock" pricermock "github.com/ethersphere/bee/pkg/pricer/mock"
"github.com/ethersphere/bee/pkg/pushsync" "github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/pushsync/pb" "github.com/ethersphere/bee/pkg/pushsync/pb"
...@@ -807,8 +806,9 @@ func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices p ...@@ -807,8 +806,9 @@ func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices p
if unwrap == nil { if unwrap == nil {
unwrap = func(swarm.Chunk) {} unwrap = func(swarm.Chunk) {}
} }
validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) { validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch.WithStamp(postage.NewStamp(nil, nil, nil, nil)), nil return ch, nil
} }
return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment