Commit d2b3746a authored by santicomp2014's avatar santicomp2014 Committed by GitHub

trojan, pss, pss_test: pss port from Swarm (#466)

* trojan,pss,pss_test: added pss base, still have to fix logging metrics and tests

* pss,pss_test,mock,trojan: added tests for pss

* mock,pss_test: added LastPullSubscription in mock store with @acud, still needs another pass to finish it

* mock: add init for bins

* pss: changed localstore to storer

* pss_test: changed recovery topic to generic pss topic

* pss_test: deadline added to TestPssMonitor

* pss_test: deadline added to TestPssMonitor

* pss_test: deadline added to TestPssMonitor

* pss_test: deadline added to TestPssMonitor

* pss_test: skip TestTrojanChunkRetrieval refactor in progress

* pss_test: removed TestTrojanChunkRetrieval until refactor complete

* pss,pss_test: pss refactored removed Monitor struct replaced with tags, also replaced storer.Put with pushsync

* pss_test: disabled checks for tag in TestPssMonitor

* pss,pss_test: fixed monitor test and removed storer from pss

* pss,pss_test: added metrics and logger to pss

* pss,pss_test: refactored NewPss with Options Struct

* pss,pss_test: changed signature of pss to include ctx and return error

* metrics,pss,pss_test,pusher_test: refactored to interfaces and cleanup of tests

* pss,pss_test: changed Deliver to TryUnwrap

* pss: refactored metrics

* pss,pss_test: refactored Interface again for testing

* pss: fixed error not being used

* pss_test: fix in test and rename TestPssMonitor to TestPssTags

* pss_test: fixed deepsource complaint

* pss_test: fixed deepsource complaint

* pss_test: remove unsed Store struct

* pusher_test: rollback changes done in pusher test

* mock: revert storer mock to master, these changes are not necesarry any more

* pss_test: remove state stored from tags, not needed in tc

* pss_test: removed tag.Total in Send, not needed

* pss_test: removed comment on Store state, since this was removed, cleanup

* pss: removedlong in TryUnwrap comment

* pss,pss_test: removed tags and deferenced handler

* fix test case

* pss_test: added more info in debug comment

* pss_test: added err check in Unwrap
Co-authored-by: default avataracud <12988138+acud@users.noreply.github.com>
parent b2b1eff8
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pss
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
TotalMessagesSentCounter prometheus.Counter
}
func newMetrics() metrics {
subsystem := "pss"
return metrics{
TotalMessagesSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_message_sent",
Help: "Total messages sent.",
}),
}
}
func (s *pss) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pss
import (
"context"
"errors"
"fmt"
"sync"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/trojan"
)
var (
_ Interface = (*pss)(nil)
ErrNoHandler = errors.New("no handler found")
)
type Interface interface {
Send(ctx context.Context, targets trojan.Targets, topic trojan.Topic, payload []byte) error
Register(topic trojan.Topic, hndlr Handler)
GetHandler(topic trojan.Topic) Handler
TryUnwrap(ctx context.Context, c swarm.Chunk) error
}
// pss is the top-level struct, which takes care of message sending
type pss struct {
pusher pushsync.PushSyncer
handlers map[trojan.Topic]Handler
handlersMu sync.RWMutex
metrics metrics
logger logging.Logger
}
// New inits the pss struct with the storer
func New(logger logging.Logger, pusher pushsync.PushSyncer) Interface {
return &pss{
logger: logger,
pusher: pusher,
handlers: make(map[trojan.Topic]Handler),
metrics: newMetrics(),
}
}
// Handler defines code to be executed upon reception of a trojan message
type Handler func(*trojan.Message)
// Send constructs a padded message with topic and payload,
// wraps it in a trojan chunk such that one of the targets is a prefix of the chunk address
// uses push-sync to deliver message
func (p *pss) Send(ctx context.Context, targets trojan.Targets, topic trojan.Topic, payload []byte) error {
p.metrics.TotalMessagesSentCounter.Inc()
//construct Trojan Chunk
m, err := trojan.NewMessage(topic, payload)
if err != nil {
return err
}
var tc swarm.Chunk
tc, err = m.Wrap(targets)
if err != nil {
return err
}
// push the chunk using push sync so that it reaches it destination in network
if _, err = p.pusher.PushChunkToClosest(ctx, tc); err != nil {
return err
}
return nil
}
// Register allows the definition of a Handler func for a specific topic on the pss struct
func (p *pss) Register(topic trojan.Topic, hndlr Handler) {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()
p.handlers[topic] = hndlr
}
// TryUnwrap allows unwrapping a chunk as a trojan message and calling its handler func based on its topic
func (p *pss) TryUnwrap(ctx context.Context, c swarm.Chunk) error {
if !trojan.IsPotential(c) {
return nil
}
m, err := trojan.Unwrap(c) // if err occurs unwrapping, there will be no handler
if err != nil {
return err
}
h := p.GetHandler(m.Topic)
if h == nil {
return fmt.Errorf("topic %v, %w", m.Topic, ErrNoHandler)
}
h(m)
return nil
}
// GetHandler returns the Handler func registered in pss for the given topic
func (p *pss) GetHandler(topic trojan.Topic) Handler {
p.handlersMu.RLock()
defer p.handlersMu.RUnlock()
return p.handlers[topic]
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pss_test
import (
"bytes"
"context"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/pushsync"
pushsyncmock "github.com/ethersphere/bee/pkg/pushsync/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/trojan"
)
// TestSend creates a trojan chunk and sends it using push sync
func TestSend(t *testing.T) {
var err error
ctx := context.TODO()
// create a mock pushsync service to push the chunk to its destination
var receipt *pushsync.Receipt
var storedChunk swarm.Chunk
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
rcpt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
}
storedChunk = chunk
receipt = rcpt
return rcpt, nil
})
pss := pss.New(logging.New(ioutil.Discard, 0), pushSyncService)
target := trojan.Target([]byte{1}) // arbitrary test target
targets := trojan.Targets([]trojan.Target{target})
payload := []byte("RECOVERY CHUNK")
topic := trojan.NewTopic("RECOVERY TOPIC")
// call Send to store trojan chunk in localstore
if err = pss.Send(ctx, targets, topic, payload); err != nil {
t.Fatal(err)
}
if receipt == nil {
t.Fatal("no receipt")
}
m, err := trojan.Unwrap(storedChunk)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(m.Payload, payload) {
t.Fatalf("payload mismatch expected %v but is %v instead", m.Payload, payload)
}
if !bytes.Equal(m.Topic[:], topic[:]) {
t.Fatalf("topic mismatch expected %v but is %v instead", m.Topic, topic)
}
}
// TestRegister verifies that handler funcs are able to be registered correctly in pss
func TestRegister(t *testing.T) {
pss := pss.New(logging.New(ioutil.Discard, 0), nil)
handlerVerifier := 0 // test variable to check handler funcs are correctly retrieved
// register first handler
testHandler := func(m *trojan.Message) {
handlerVerifier = 1
}
testTopic := trojan.NewTopic("FIRST_HANDLER")
pss.Register(testTopic, testHandler)
registeredHandler := pss.GetHandler(testTopic)
registeredHandler(&trojan.Message{}) // call handler to verify the retrieved func is correct
if handlerVerifier != 1 {
t.Fatalf("unexpected handler retrieved, verifier variable should be 1 but is %d instead", handlerVerifier)
}
// register second handler
testHandler = func(m *trojan.Message) {
handlerVerifier = 2
}
testTopic = trojan.NewTopic("SECOND_HANDLER")
pss.Register(testTopic, testHandler)
registeredHandler = pss.GetHandler(testTopic)
registeredHandler(&trojan.Message{}) // call handler to verify the retrieved func is correct
if handlerVerifier != 2 {
t.Fatalf("unexpected handler retrieved, verifier variable should be 2 but is %d instead", handlerVerifier)
}
}
// TestDeliver verifies that registering a handler on pss for a given topic and then submitting a trojan chunk with said topic to it
// results in the execution of the expected handler func
func TestDeliver(t *testing.T) {
pss := pss.New(logging.New(ioutil.Discard, 0), nil)
ctx := context.TODO()
// test message
topic := trojan.NewTopic("footopic")
payload := []byte("foopayload")
msg, err := trojan.NewMessage(topic, payload)
if err != nil {
t.Fatal(err)
}
// test chunk
target := trojan.Target([]byte{1}) // arbitrary test target
targets := trojan.Targets([]trojan.Target{target})
c, err := msg.Wrap(targets)
if err != nil {
t.Fatal(err)
}
// trojan chunk has its type set through the validator called by the store, so this needs to be simulated
c.WithType(swarm.ContentAddressed)
// create and register handler
var tt trojan.Topic // test variable to check handler func was correctly called
hndlr := func(m *trojan.Message) {
tt = m.Topic // copy the message topic to the test variable
}
pss.Register(topic, hndlr)
// call pss TryUnwrap on chunk and verify test topic variable value changes
err = pss.TryUnwrap(ctx, c)
if err != nil {
t.Fatal(err)
}
if tt != msg.Topic {
t.Fatalf("unexpected result for pss Deliver func, expected test variable to have a value of %v but is %v instead", msg.Topic, tt)
}
}
func TestHandler(t *testing.T) {
pss := pss.New(logging.New(ioutil.Discard, 0), nil)
testTopic := trojan.NewTopic("TEST_TOPIC")
// verify handler is null
if pss.GetHandler(testTopic) != nil {
t.Errorf("handler should be null")
}
// register first handler
testHandler := func(m *trojan.Message) {}
// set handler for test topic
pss.Register(testTopic, testHandler)
if pss.GetHandler(testTopic) == nil {
t.Errorf("handler should be registered")
}
}
package trojan package trojan
var ( var (
Contains = contains Contains = contains
HashBytes = hashBytes HashBytes = hashBytes
PadBytes = padBytesLeft PadBytes = padBytesLeft
IsPotential = isPotential
) )
...@@ -113,7 +113,7 @@ func Unwrap(c swarm.Chunk) (*Message, error) { ...@@ -113,7 +113,7 @@ func Unwrap(c swarm.Chunk) (*Message, error) {
} }
// IsPotential returns true if the given chunk is a potential trojan // IsPotential returns true if the given chunk is a potential trojan
func isPotential(c swarm.Chunk) bool { func IsPotential(c swarm.Chunk) bool {
// chunk must be content-addressed to be trojan // chunk must be content-addressed to be trojan
if c.Type() != swarm.ContentAddressed { if c.Type() != swarm.ContentAddressed {
return false return false
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment