Commit d7970054 authored by Esad Akar's avatar Esad Akar Committed by GitHub

fix: light node check in pushsync replication (#1739)

parent c57f0750
......@@ -31,6 +31,7 @@ var (
type Recorder struct {
base swarm.Address
fullNode bool
records map[string][]*Record
recordsMu sync.Mutex
protocols []p2p.ProtocolSpec
......@@ -63,6 +64,12 @@ func WithBaseAddr(a swarm.Address) Option {
})
}
func WithLightNode() Option {
return optionFunc(func(r *Recorder) {
r.fullNode = false
})
}
func WithStreamError(streamErr func(swarm.Address, string, string, string) error) Option {
return optionFunc(func(r *Recorder) {
r.streamErr = streamErr
......@@ -71,7 +78,8 @@ func WithStreamError(streamErr func(swarm.Address, string, string, string) error
func New(opts ...Option) *Recorder {
r := &Recorder{
records: make(map[string][]*Record),
records: make(map[string][]*Record),
fullNode: true,
}
r.middlewares = append(r.middlewares, noopMiddleware)
......@@ -129,7 +137,7 @@ func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Head
// pass a new context to handler,
// do not cancel it with the client stream context
err := handler(context.Background(), p2p.Peer{Address: r.base}, streamIn)
err := handler(context.Background(), p2p.Peer{Address: r.base, FullNode: r.fullNode}, streamIn)
if err != nil && err != io.EOF {
record.setErr(err)
}
......
......@@ -147,29 +147,37 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
price := ps.pricer.Price(chunk.Address())
// if the peer is closer to the chunk, we were selected for replication. Return early.
if dcmp, _ := swarm.DistanceCmp(chunk.Address().Bytes(), p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 {
if ps.topologyDriver.IsWithinDepth(chunk.Address()) {
ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer canceld()
// if the peer is closer to the chunk, AND it's a full node, we were selected for replication. Return early.
if p.FullNode {
bytes := chunk.Address().Bytes()
if dcmp, _ := swarm.DistanceCmp(bytes, p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 {
if ps.topologyDriver.IsWithinDepth(chunk.Address()) {
ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer canceld()
_, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk)
if err != nil {
ps.logger.Errorf("pushsync: chunk store: %v", err)
}
_, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk)
if err != nil {
ps.logger.Errorf("pushsync: chunk store: %v", err)
}
debit := ps.accounting.PrepareDebit(p.Address, price)
defer debit.Cleanup()
debit := ps.accounting.PrepareDebit(p.Address, price)
defer debit.Cleanup()
// return back receipt
signature, err := ps.signer.Sign(bytes)
if err != nil {
return fmt.Errorf("receipt signature: %w", err)
}
receipt := pb.Receipt{Address: bytes, Signature: signature}
if err := w.WriteMsgWithContext(ctxd, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
// return back receipt
receipt := pb.Receipt{Address: chunk.Address().Bytes()}
if err := w.WriteMsgWithContext(ctxd, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
return debit.Apply()
}
return debit.Apply()
}
return ErrOutOfDepthReplication
return ErrOutOfDepthReplication
}
}
// forwarding replication
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment