Commit bf76540e authored by Esad Akar's avatar Esad Akar Committed by GitHub

receipt is signed by storer node and verified by origin node (#1431)

parent 88953eea
......@@ -18,6 +18,7 @@ type signerMock struct {
signTx func(transaction *types.Transaction, chainID *big.Int) (*types.Transaction, error)
signTypedData func(*eip712.TypedData) ([]byte, error)
ethereumAddress func() (common.Address, error)
signFunc func([]byte) ([]byte, error)
}
func (m *signerMock) EthereumAddress() (common.Address, error) {
......@@ -27,8 +28,8 @@ func (m *signerMock) EthereumAddress() (common.Address, error) {
return common.Address{}, nil
}
func (*signerMock) Sign(data []byte) ([]byte, error) {
return nil, nil
func (m *signerMock) Sign(data []byte) ([]byte, error) {
return m.signFunc(data)
}
func (m *signerMock) SignTx(transaction *types.Transaction, chainID *big.Int) (*types.Transaction, error) {
......@@ -60,6 +61,12 @@ type optionFunc func(*signerMock)
func (f optionFunc) apply(r *signerMock) { f(r) }
func WithSignFunc(f func(data []byte) ([]byte, error)) Option {
return optionFunc(func(s *signerMock) {
s.signFunc = f
})
}
func WithSignTxFunc(f func(transaction *types.Transaction, chainID *big.Int) (*types.Transaction, error)) Option {
return optionFunc(func(s *signerMock) {
s.signTx = f
......
......@@ -428,7 +428,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
traversalService := traversal.NewService(ns)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, pricer, tracer)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, pricer, signer, tracer)
// set the pushSyncer in the PSS
pssService.SetPushSyncer(pushSyncProtocol)
......@@ -443,7 +443,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler)
}
pushSyncPusher := pusher.New(storer, kad, pushSyncProtocol, tagService, logger, tracer)
pushSyncPusher := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer)
b.pusherCloser = pushSyncPusher
pullStorage := pullstorage.New(storer)
......
......@@ -15,6 +15,7 @@ import (
"sync"
"time"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/storage"
......@@ -23,10 +24,12 @@ import (
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
type Service struct {
networkID uint64
storer storage.Storer
pushSyncer pushsync.PushSyncer
logger logging.Logger
......@@ -42,8 +45,11 @@ var (
concurrentJobs = 10 // how many chunks to push simultaneously
)
func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service {
var ErrInvalidAddress = errors.New("invalid address")
func New(networkID uint64, storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service {
service := &Service{
networkID: networkID,
storer: storer,
pushSyncer: pushSyncer,
tag: tagger,
......@@ -129,17 +135,18 @@ LOOP:
go func(ctx context.Context, ch swarm.Chunk) {
var (
err error
startTime = time.Now()
t *tags.Tag
setSent bool
err error
startTime = time.Now()
t *tags.Tag
setSent bool
storerPeer swarm.Address
)
defer func() {
if err == nil {
s.metrics.TotalSynced.Inc()
s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
// only print this if there was no error while sending the chunk
logger.Tracef("pusher pushed chunk %s", ch.Address().String())
logger.Tracef("pusher: pushed chunk %s to node %s", ch.Address().String(), storerPeer.String())
} else {
s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
......@@ -151,7 +158,7 @@ LOOP:
}()
// Later when we process receipt, get the receipt and process it
// for now ignoring the receipt and checking only for error
_, err = s.pushSyncer.PushChunkToClosest(ctx, ch)
receipt, err := s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
// we are the closest ones - this is fine
......@@ -163,6 +170,19 @@ LOOP:
return
}
}
publicKey, err := crypto.Recover(receipt.Signature, receipt.Address.Bytes())
if err != nil {
err = fmt.Errorf("pusher: receipt recover: %w", err)
return
}
storerPeer, err = crypto.NewOverlayAddress(*publicKey, s.networkID)
if err != nil {
err = fmt.Errorf("pusher: receipt storer address: %w", err)
return
}
if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
err = fmt.Errorf("pusher: set sync: %w", err)
return
......
......@@ -12,6 +12,7 @@ import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/crypto"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/localstore"
......@@ -73,12 +74,18 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
}
return receipt, nil
})
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
......@@ -124,9 +131,14 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
}
return receipt, nil
})
......@@ -203,11 +215,17 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
// Set 10 times more than the time we wait for the test to complete so that
// the response never reaches our testcase
time.Sleep(1 * time.Second)
return nil, nil
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
}
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
......@@ -251,10 +269,18 @@ func TestPusherClose(t *testing.T) {
close(goFuncAfterCloseC)
}()
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
goFuncStartedC <- struct{}{}
<-goFuncAfterCloseC
return nil, nil
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
}
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
......@@ -360,7 +386,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(pusherStorer, peerSuggester, pushSyncService, mtags, logger, nil)
pusherService := pusher.New(1, pusherStorer, peerSuggester, pushSyncService, mtags, logger, nil)
return mtags, pusherService, pusherStorer
}
......
......@@ -75,7 +75,8 @@ func (m *Delivery) GetData() []byte {
}
type Receipt struct {
Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
Signature []byte `protobuf:"bytes,2,opt,name=Signature,proto3" json:"Signature,omitempty"`
}
func (m *Receipt) Reset() { *m = Receipt{} }
......@@ -118,6 +119,13 @@ func (m *Receipt) GetAddress() []byte {
return nil
}
func (m *Receipt) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
func init() {
proto.RegisterType((*Delivery)(nil), "pushsync.Delivery")
proto.RegisterType((*Receipt)(nil), "pushsync.Receipt")
......@@ -126,16 +134,17 @@ func init() {
func init() { proto.RegisterFile("pushsync.proto", fileDescriptor_723cf31bfc02bfd6) }
var fileDescriptor_723cf31bfc02bfd6 = []byte{
// 139 bytes of a gzipped FileDescriptorProto
// 155 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0x2d, 0xce,
0x28, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0x2c,
0xb8, 0x38, 0x5c, 0x52, 0x73, 0x32, 0xcb, 0x52, 0x8b, 0x2a, 0x85, 0x24, 0xb8, 0xd8, 0x1d, 0x53,
0x52, 0x8a, 0x52, 0x8b, 0x8b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x78, 0x82, 0x60, 0x5c, 0x21, 0x21,
0x2e, 0x16, 0x97, 0xc4, 0x92, 0x44, 0x09, 0x26, 0xb0, 0x30, 0x98, 0xad, 0xa4, 0xcc, 0xc5, 0x1e,
0x94, 0x9a, 0x9c, 0x9a, 0x59, 0x50, 0x82, 0x5b, 0xa3, 0x93, 0xcc, 0x89, 0x47, 0x72, 0x8c, 0x17,
0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c,
0x37, 0x1e, 0xcb, 0x31, 0x44, 0x31, 0x15, 0x24, 0x25, 0xb1, 0x81, 0x5d, 0x63, 0x0c, 0x08, 0x00,
0x00, 0xff, 0xff, 0x1b, 0x63, 0x50, 0x4a, 0x9f, 0x00, 0x00, 0x00,
0x2e, 0x16, 0x97, 0xc4, 0x92, 0x44, 0x09, 0x26, 0xb0, 0x30, 0x98, 0xad, 0xe4, 0xc8, 0xc5, 0x1e,
0x94, 0x9a, 0x9c, 0x9a, 0x59, 0x50, 0x82, 0x47, 0xa3, 0x0c, 0x17, 0x67, 0x70, 0x66, 0x7a, 0x5e,
0x62, 0x49, 0x69, 0x51, 0x2a, 0x54, 0x37, 0x42, 0xc0, 0x49, 0xe6, 0xc4, 0x23, 0x39, 0xc6, 0x0b,
0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86,
0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0x98, 0x0a, 0x92, 0x92, 0xd8, 0xc0, 0x6e, 0x35, 0x06, 0x04, 0x00,
0x00, 0xff, 0xff, 0x72, 0xaf, 0x50, 0xbc, 0xbd, 0x00, 0x00, 0x00,
}
func (m *Delivery) Marshal() (dAtA []byte, err error) {
......@@ -195,6 +204,13 @@ func (m *Receipt) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.Signature) > 0 {
i -= len(m.Signature)
copy(dAtA[i:], m.Signature)
i = encodeVarintPushsync(dAtA, i, uint64(len(m.Signature)))
i--
dAtA[i] = 0x12
}
if len(m.Address) > 0 {
i -= len(m.Address)
copy(dAtA[i:], m.Address)
......@@ -243,6 +259,10 @@ func (m *Receipt) Size() (n int) {
if l > 0 {
n += 1 + l + sovPushsync(uint64(l))
}
l = len(m.Signature)
if l > 0 {
n += 1 + l + sovPushsync(uint64(l))
}
return n
}
......@@ -436,6 +456,40 @@ func (m *Receipt) Unmarshal(dAtA []byte) error {
m.Address = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Signature", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPushsync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthPushsync
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthPushsync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Signature = append(m.Signature[:0], dAtA[iNdEx:postIndex]...)
if m.Signature == nil {
m.Signature = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPushsync(dAtA[iNdEx:])
......
......@@ -15,4 +15,5 @@ message Delivery {
message Receipt {
bytes Address = 1;
bytes Signature = 2;
}
......@@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
......@@ -44,7 +45,8 @@ type PushSyncer interface {
}
type Receipt struct {
Address swarm.Address
Address swarm.Address
Signature []byte
}
type PushSync struct {
......@@ -58,11 +60,12 @@ type PushSync struct {
pricer pricer.Interface
metrics metrics
tracer *tracing.Tracer
signer crypto.Signer
}
var timeToLive = 5 * time.Second // request time to live
func New(streamer p2p.StreamerDisconnecter, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, unwrap func(swarm.Chunk), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, tracer *tracing.Tracer) *PushSync {
func New(streamer p2p.StreamerDisconnecter, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, unwrap func(swarm.Chunk), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer) *PushSync {
ps := &PushSync{
streamer: streamer,
storer: storer,
......@@ -74,6 +77,7 @@ func New(streamer p2p.StreamerDisconnecter, storer storage.Putter, closestPeerer
pricer: pricer,
metrics: newMetrics(),
tracer: tracer,
signer: signer,
}
return ps
}
......@@ -143,7 +147,11 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return fmt.Errorf("chunk store: %w", err)
}
receipt := pb.Receipt{Address: chunk.Address().Bytes()}
signature, err := ps.signer.Sign(ch.Address)
if err != nil {
return fmt.Errorf("receipt signature: %w", err)
}
receipt := pb.Receipt{Address: chunk.Address().Bytes(), Signature: signature}
if err := w.WriteMsgWithContext(ctx, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
......@@ -170,7 +178,9 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
if err != nil {
return nil, err
}
return &Receipt{Address: swarm.NewAddress(r.Address)}, nil
return &Receipt{
Address: swarm.NewAddress(r.Address),
Signature: r.Signature}, nil
}
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.Receipt, reterr error) {
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment