Commit a3bfe87f authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Options structs cleanup - part2 (#583)

* kadmelia options struct

* hive options struct

* debugapi options struct

* pingpong  options struct

* pullsync  options struct

* pusgsync options

* pusgsync options

* retreival options struct
parent 7bc05a56
......@@ -25,13 +25,6 @@ type Service interface {
}
type server struct {
Options
http.Handler
metricsRegistry *prometheus.Registry
}
type Options struct {
Overlay swarm.Address
P2P p2p.DebugService
Pingpong pingpong.Interface
......@@ -41,11 +34,22 @@ type Options struct {
Tracer *tracing.Tracer
Tags *tags.Tags
Accounting accounting.Interface
http.Handler
metricsRegistry *prometheus.Registry
}
func New(o Options) Service {
func New(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.PeerAdder, storer storage.Storer, logger logging.Logger, tracer *tracing.Tracer, tags *tags.Tags, accounting accounting.Interface) Service {
s := &server{
Options: o,
Overlay: overlay,
P2P: p2p,
Pingpong: pingpong,
TopologyDriver: topologyDriver,
Storer: storer,
Logger: logger,
Tracer: tracer,
Tags: tags,
Accounting: accounting,
metricsRegistry: newMetricsRegistry(),
}
......
......@@ -44,16 +44,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
topologyDriver := topologymock.NewTopologyDriver(o.TopologyOpts...)
acc := accountingmock.NewAccounting(o.AccountingOpts...)
s := debugapi.New(debugapi.Options{
Overlay: o.Overlay,
P2P: o.P2P,
Pingpong: o.Pingpong,
Tags: o.Tags,
Logger: logging.New(ioutil.Discard, 0),
Storer: o.Storer,
TopologyDriver: topologyDriver,
Accounting: acc,
})
s := debugapi.New(o.Overlay, o.P2P, o.Pingpong, topologyDriver, o.Storer, logging.New(ioutil.Discard, 0), nil, o.Tags, acc)
ts := httptest.NewServer(s)
t.Cleanup(ts.Close)
......
......@@ -34,19 +34,12 @@ type Service struct {
logger logging.Logger
}
type Options struct {
Streamer p2p.Streamer
AddressBook addressbook.GetPutter
NetworkID uint64
Logger logging.Logger
}
func New(o Options) *Service {
func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service {
return &Service{
streamer: o.Streamer,
logger: o.Logger,
addressBook: o.AddressBook,
networkID: o.NetworkID,
streamer: streamer,
logger: logger,
addressBook: addressbook,
networkID: networkID,
}
}
......
......@@ -128,11 +128,7 @@ func TestBroadcastPeers(t *testing.T) {
addressbookclean := ab.New(mock.NewStateStore())
// create a hive server that handles the incoming stream
server := hive.New(hive.Options{
Logger: logger,
AddressBook: addressbookclean,
NetworkID: networkID,
})
server := hive.New(nil, addressbookclean, networkID, logger)
// setup the stream recorder to record stream data
recorder := streamtest.New(
......@@ -140,13 +136,7 @@ func TestBroadcastPeers(t *testing.T) {
)
// create a hive client that will do broadcast
client := hive.New(hive.Options{
Streamer: recorder,
Logger: logger,
AddressBook: addressbook,
NetworkID: networkID,
})
client := hive.New(recorder, addressbook, networkID, logger)
if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil {
t.Fatal(err)
}
......
......@@ -39,13 +39,8 @@ type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) bool
// Options for injecting services to Kademlia.
type Options struct {
Base swarm.Address
Discovery discovery.Driver
AddressBook addressbook.Interface
P2P p2p.Service
SaturationFunc binSaturationFunc
Bootnodes []ma.Multiaddr
Logger logging.Logger
}
// Kad is the Swarm forwarding kademlia implementation.
......@@ -77,23 +72,23 @@ type retryInfo struct {
}
// New returns a new Kademlia.
func New(o Options) *Kad {
func New(base swarm.Address, addressbook addressbook.Interface, discovery discovery.Driver, p2p p2p.Service, logger logging.Logger, o Options) *Kad {
if o.SaturationFunc == nil {
o.SaturationFunc = binSaturated
}
k := &Kad{
base: o.Base,
discovery: o.Discovery,
addressBook: o.AddressBook,
p2p: o.P2P,
base: base,
discovery: discovery,
addressBook: addressbook,
p2p: p2p,
saturationFunc: o.SaturationFunc,
connectedPeers: pslice.New(int(swarm.MaxBins)),
knownPeers: pslice.New(int(swarm.MaxBins)),
bootnodes: o.Bootnodes,
manageC: make(chan struct{}, 1),
waitNext: make(map[string]retryInfo),
logger: o.Logger,
logger: logger,
quit: make(chan struct{}),
done: make(chan struct{}),
wg: sync.WaitGroup{},
......
......@@ -479,7 +479,7 @@ func TestClosestPeer(t *testing.T) {
ab := addressbook.New(mockstate.NewStateStore())
var conns int32
kad := kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2pMock(ab, &conns, nil), Logger: logger})
kad := kademlia.New(base, ab, disc, p2pMock(ab, &conns, nil), logger, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
......@@ -733,7 +733,7 @@ func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin uint8, pe
p2p = p2pMock(ab, connCounter, failedConnCounter)
logger = logging.New(ioutil.Discard, 0) // logger
disc = mock.NewDiscovery()
kad = kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2p, Logger: logger, SaturationFunc: f, Bootnodes: bootnodes}) // kademlia instance
kad = kademlia.New(base, ab, disc, p2p, logger, kademlia.Options{SaturationFunc: f, Bootnodes: bootnodes}) // kademlia instance
)
pk, _ := crypto.GenerateSecp256k1Key()
......
......@@ -182,23 +182,13 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
}
// Construct protocols.
pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps,
Logger: logger,
Tracer: tracer,
})
pingPong := pingpong.New(p2ps, logger, tracer)
if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
return nil, fmt.Errorf("pingpong service: %w", err)
}
hive := hive.New(hive.Options{
Streamer: p2ps,
AddressBook: addressbook,
NetworkID: o.NetworkID,
Logger: logger,
})
hive := hive.New(p2ps, addressbook, o.NetworkID, logger)
if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
return nil, fmt.Errorf("hive service: %w", err)
}
......@@ -215,7 +205,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
bootnodes = append(bootnodes, addr)
}
kad := kademlia.New(kademlia.Options{Base: address, Discovery: hive, AddressBook: addressbook, P2P: p2ps, Bootnodes: bootnodes, Logger: logger})
kad := kademlia.New(address, addressbook, hive, p2ps, logger, kademlia.Options{Bootnodes: bootnodes})
b.topologyCloser = kad
hive.SetAddPeersHandler(kad.AddPeers)
p2ps.AddNotifier(kad)
......@@ -266,14 +256,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
retrieve := retrieval.New(retrieval.Options{
Streamer: p2ps,
ChunkPeerer: kad,
Logger: logger,
Accounting: acc,
Pricer: accounting.NewFixedPricer(address, 10),
Validator: chunkvalidator,
})
retrieve := retrieval.New(p2ps, kad, logger, acc, accounting.NewFixedPricer(address, 10), chunkvalidator)
tagg := tags.NewTags()
if err = p2ps.AddProtocol(retrieve.Protocol()); err != nil {
......@@ -293,14 +276,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
}
retrieve.SetStorer(ns)
pushSyncProtocol := pushsync.New(pushsync.Options{
Streamer: p2ps,
Storer: storer,
ClosestPeerer: kad,
DeliveryCallback: psss.TryUnwrap,
Tagger: tagg,
Logger: logger,
})
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, psss.TryUnwrap, logger)
// set the pushSyncer in the PSS
psss.WithPushSyncer(pushSyncProtocol)
......@@ -315,34 +291,19 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
psss.Register(recovery.RecoveryTopic, chunkRepairHandler)
}
pushSyncPusher := pusher.New(pusher.Options{
Storer: storer,
PeerSuggester: kad,
PushSyncer: pushSyncProtocol,
Tagger: tagg,
Logger: logger,
})
pushSyncPusher := pusher.New(storer, kad, pushSyncProtocol, tagg, logger)
b.pusherCloser = pushSyncPusher
pullStorage := pullstorage.New(storer)
pullSync := pullsync.New(pullsync.Options{
Streamer: p2ps,
Storage: pullStorage,
Logger: logger,
})
pullSync := pullsync.New(p2ps, pullStorage, logger)
b.pullSyncCloser = pullSync
if err = p2ps.AddProtocol(pullSync.Protocol()); err != nil {
return nil, fmt.Errorf("pullsync protocol: %w", err)
}
puller := puller.New(puller.Options{
StateStore: stateStore,
Topology: kad,
PullSync: pullSync,
Logger: logger,
})
puller := puller.New(stateStore, kad, pullSync, logger, puller.Options{})
b.pullerCloser = puller
......@@ -374,17 +335,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
if o.DebugAPIAddr != "" {
// Debug API server
debugAPIService := debugapi.New(debugapi.Options{
Overlay: address,
P2P: p2ps,
Pingpong: pingPong,
Logger: logger,
Tracer: tracer,
TopologyDriver: kad,
Storer: storer,
Tags: tagg,
Accounting: acc,
})
debugAPIService := debugapi.New(address, p2ps, pingPong, kad, storer, logger, tracer, tagg, acc)
// register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
......
......@@ -35,17 +35,11 @@ type Service struct {
metrics metrics
}
type Options struct {
Streamer p2p.Streamer
Logger logging.Logger
Tracer *tracing.Tracer
}
func New(o Options) *Service {
func New(streamer p2p.Streamer, logger logging.Logger, tracer *tracing.Tracer) *Service {
return &Service{
streamer: o.Streamer,
logger: o.Logger,
tracer: o.Tracer,
streamer: streamer,
logger: logger,
tracer: tracer,
metrics: newMetrics(),
}
}
......
......@@ -27,9 +27,7 @@ func TestPing(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// create a pingpong server that handles the incoming stream
server := pingpong.New(pingpong.Options{
Logger: logger,
})
server := pingpong.New(nil, logger, nil)
// setup the stream recorder to record stream data
recorder := streamtest.New(
......@@ -46,10 +44,7 @@ func TestPing(t *testing.T) {
)
// create a pingpong client that will do pinging
client := pingpong.New(pingpong.Options{
Streamer: recorder,
Logger: logger,
})
client := pingpong.New(recorder, logger, nil)
// ping
addr := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
......
......@@ -26,10 +26,6 @@ var (
)
type Options struct {
StateStore storage.StateStorer
Topology topology.Driver
PullSync pullsync.Interface
Logger logging.Logger
Bins uint8
ShallowBinPeers int
}
......@@ -57,7 +53,7 @@ type Puller struct {
shallowBinPeers int // how many peers per bin do we want to sync with outside of depth
}
func New(o Options) *Puller {
func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pullsync.Interface, logger logging.Logger, o Options) *Puller {
var (
bins uint8 = swarm.MaxBins
shallowBinPeers int = defaultShallowBinPeers
......@@ -70,11 +66,11 @@ func New(o Options) *Puller {
}
p := &Puller{
statestore: o.StateStore,
topology: o.Topology,
syncer: o.PullSync,
statestore: stateStore,
topology: topology,
syncer: pullSync,
metrics: newMetrics(),
logger: o.Logger,
logger: logger,
cursors: make(map[string][]uint64),
syncPeers: make([]map[string]*syncPeer, bins),
......
......@@ -558,16 +558,12 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
logger := logging.New(ioutil.Discard, 6)
o := puller.Options{
Topology: kad,
StateStore: s,
PullSync: ps,
Logger: logger,
Bins: ops.bins,
}
if ops.shallowBinPeers != nil {
o.ShallowBinPeers = *ops.shallowBinPeers
}
return puller.New(o), s, kad, ps
return puller.New(s, kad, ps, logger, o), s, kad, ps
}
type c struct {
......
......@@ -63,19 +63,12 @@ type Syncer struct {
io.Closer
}
type Options struct {
Streamer p2p.Streamer
Storage pullstorage.Storer
Logger logging.Logger
}
func New(o Options) *Syncer {
func New(streamer p2p.Streamer, storage pullstorage.Storer, logger logging.Logger) *Syncer {
return &Syncer{
streamer: o.Streamer,
storage: o.Storage,
streamer: streamer,
storage: storage,
metrics: newMetrics(),
logger: o.Logger,
logger: logger,
ruidCtx: make(map[uint32]func()),
wg: sync.WaitGroup{},
quit: make(chan struct{}),
......
......@@ -224,7 +224,7 @@ func haveChunks(t *testing.T, s *mock.PullStorage, addrs ...swarm.Address) {
func newPullSync(s p2p.Streamer, o ...mock.Option) (*pullsync.Syncer, *mock.PullStorage) {
storage := mock.NewPullStorage(o...)
logger := logging.New(ioutil.Discard, 0)
return pullsync.New(pullsync.Options{Streamer: s, Storage: storage, Logger: logger}), storage
return pullsync.New(s, storage, logger), storage
}
func waitSet(t *testing.T, db *mock.PullStorage, v int) {
......
......@@ -28,22 +28,14 @@ type Service struct {
chunksWorkerQuitC chan struct{}
}
type Options struct {
Storer storage.Storer
PeerSuggester topology.ClosestPeerer
PushSyncer pushsync.PushSyncer
Tagger *tags.Tags
Logger logging.Logger
}
var retryInterval = 10 * time.Second // time interval between retries
func New(o Options) *Service {
func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger) *Service {
service := &Service{
storer: o.Storer,
pushSyncer: o.PushSyncer,
tagg: o.Tagger,
logger: o.Logger,
storer: storer,
pushSyncer: pushSyncer,
tagg: tagger,
logger: logger,
metrics: newMetrics(),
quit: make(chan struct{}),
chunksWorkerQuitC: make(chan struct{}),
......
......@@ -235,7 +235,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(pusher.Options{Storer: pusherStorer, PushSyncer: pushSyncService, Tagger: mtags, PeerSuggester: peerSuggester, Logger: logger})
pusherService := pusher.New(pusherStorer, peerSuggester, pushSyncService, mtags, logger)
return mtags, pusherService, pusherStorer
}
......
......@@ -44,25 +44,16 @@ type PushSync struct {
metrics metrics
}
type Options struct {
Streamer p2p.Streamer
Storer storage.Putter
ClosestPeerer topology.ClosestPeerer
Tagger *tags.Tags
DeliveryCallback func(context.Context, swarm.Chunk) error
Logger logging.Logger
}
var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
func New(o Options) *PushSync {
func New(streamer p2p.Streamer, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, deliveryCallback func(context.Context, swarm.Chunk) error, logger logging.Logger) *PushSync {
ps := &PushSync{
streamer: o.Streamer,
storer: o.Storer,
peerSuggester: o.ClosestPeerer,
tagg: o.Tagger,
deliveryCallback: o.DeliveryCallback,
logger: o.Logger,
streamer: streamer,
storer: storer,
peerSuggester: closestPeerer,
tagg: tagger,
deliveryCallback: deliveryCallback,
logger: logger,
metrics: newMetrics(),
}
return ps
......
......@@ -7,6 +7,10 @@ package pushsync_test
import (
"bytes"
"context"
"io/ioutil"
"testing"
"time"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
......@@ -17,9 +21,6 @@ import (
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
"io/ioutil"
"testing"
"time"
)
// TestSendChunkAndGetReceipt inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node
......@@ -209,17 +210,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.R
mockTopology := mock.NewTopologyDriver(mockOpts...)
mtag := tags.NewTags()
ps := pushsync.New(pushsync.Options{
Streamer: recorder,
Storer: storer,
Tagger: mtag,
DeliveryCallback: pssDeliver,
ClosestPeerer: mockTopology,
Logger: logger,
})
return ps, storer, mtag
return pushsync.New(recorder, storer, mockTopology, mtag, pssDeliver, logger), storer, mtag
}
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
......
......@@ -269,23 +269,13 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S
_, _, _ = f(peerID, 0)
return nil
}}
server := retrieval.New(retrieval.Options{
Storer: mockStorer,
Logger: logger,
Accounting: serverMockAccounting,
})
server := retrieval.New(nil, nil, logger, serverMockAccounting, nil, nil)
server.SetStorer(mockStorer)
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
)
retrieve := retrieval.New(retrieval.Options{
Streamer: recorder,
ChunkPeerer: ps,
Storer: mockStorer,
Logger: logger,
Accounting: serverMockAccounting,
Pricer: pricerMock,
})
retrieve := retrieval.New(recorder, ps, logger, serverMockAccounting, pricerMock, nil)
retrieve.SetStorer(mockStorer)
ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil)
return ns
}
......
......@@ -45,25 +45,14 @@ type Service struct {
validator swarm.Validator
}
type Options struct {
Streamer p2p.Streamer
ChunkPeerer topology.EachPeerer
Storer storage.Storer
Logger logging.Logger
Accounting accounting.Interface
Pricer accounting.Pricer
Validator swarm.Validator
}
func New(o Options) *Service {
func New(streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator) *Service {
return &Service{
streamer: o.Streamer,
peerSuggester: o.ChunkPeerer,
storer: o.Storer,
logger: o.Logger,
accounting: o.Accounting,
pricer: o.Pricer,
validator: o.Validator,
streamer: streamer,
peerSuggester: chunkPeerer,
logger: logger,
accounting: accounting,
pricer: pricer,
validator: validator,
}
}
......
......@@ -51,13 +51,8 @@ func TestDelivery(t *testing.T) {
pricerMock := accountingmock.NewPricer(price, price)
// create the server that will handle the request and will serve the response
server := retrieval.New(retrieval.Options{
Storer: mockStorer,
Logger: logger,
Accounting: serverMockAccounting,
Pricer: pricerMock,
Validator: mockValidator,
})
server := retrieval.New(nil, nil, logger, serverMockAccounting, pricerMock, mockValidator)
server.SetStorer(mockStorer)
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
)
......@@ -75,15 +70,8 @@ func TestDelivery(t *testing.T) {
_, _, _ = f(peerID, 0)
return nil
}}
client := retrieval.New(retrieval.Options{
Streamer: recorder,
ChunkPeerer: ps,
Storer: clientMockStorer,
Logger: logger,
Accounting: clientMockAccounting,
Pricer: pricerMock,
Validator: mockValidator,
})
client := retrieval.New(recorder, ps, logger, clientMockAccounting, pricerMock, mockValidator)
client.SetStorer(clientMockStorer)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
v, err := client.RetrieveChunk(ctx, reqAddr)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment