Commit 75fe1591 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Init topology driver (#96)

Init topology driver on startup
parent f97f041b
...@@ -17,7 +17,13 @@ import ( ...@@ -17,7 +17,13 @@ import (
const keyPrefix = "addressbook_entry_" const keyPrefix = "addressbook_entry_"
var _ GetPutter = (*store)(nil) var _ Interface = (*store)(nil)
type Interface interface {
GetPutter
Overlays() ([]swarm.Address, error)
Multiaddresses() ([]ma.Multiaddr, error)
}
type GetPutter interface { type GetPutter interface {
Getter Getter
...@@ -36,7 +42,7 @@ type store struct { ...@@ -36,7 +42,7 @@ type store struct {
store storage.StateStorer store storage.StateStorer
} }
func New(storer storage.StateStorer) GetPutter { func New(storer storage.StateStorer) Interface {
return &store{ return &store{
store: storer, store: storer,
} }
......
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
book "github.com/ethersphere/bee/pkg/addressbook" ab "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/hive" "github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/hive/pb" "github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
...@@ -28,16 +28,11 @@ import ( ...@@ -28,16 +28,11 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
type AddressExporter interface {
Overlays() ([]swarm.Address, error)
Multiaddresses() ([]ma.Multiaddr, error)
}
func TestBroadcastPeers(t *testing.T) { func TestBroadcastPeers(t *testing.T) {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
statestore := mock.NewStateStore() statestore := mock.NewStateStore()
addressbook := book.New(statestore) addressbook := ab.New(statestore)
// populate all expected and needed random resources for 2 full batches // populate all expected and needed random resources for 2 full batches
// tests cases that uses fewer resources can use sub-slices of this data // tests cases that uses fewer resources can use sub-slices of this data
...@@ -110,12 +105,7 @@ func TestBroadcastPeers(t *testing.T) { ...@@ -110,12 +105,7 @@ func TestBroadcastPeers(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
addressbookclean := book.New(mock.NewStateStore()) addressbookclean := ab.New(mock.NewStateStore())
exporter, ok := addressbookclean.(AddressExporter)
if !ok {
t.Fatal("could not type assert AddressExporter")
}
// create a hive server that handles the incoming stream // create a hive server that handles the incoming stream
server := hive.New(hive.Options{ server := hive.New(hive.Options{
...@@ -160,13 +150,13 @@ func TestBroadcastPeers(t *testing.T) { ...@@ -160,13 +150,13 @@ func TestBroadcastPeers(t *testing.T) {
} }
} }
expectOverlaysEventually(t, exporter, tc.wantOverlays) expectOverlaysEventually(t, addressbookclean, tc.wantOverlays)
expectMultiaddresessEventually(t, exporter, tc.wantMultiAddresses) expectMultiaddresessEventually(t, addressbookclean, tc.wantMultiAddresses)
}) })
} }
} }
func expectOverlaysEventually(t *testing.T, exporter AddressExporter, wantOverlays []swarm.Address) { func expectOverlaysEventually(t *testing.T, exporter ab.Interface, wantOverlays []swarm.Address) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
var stringOverlays []string var stringOverlays []string
var stringWantOverlays []string var stringWantOverlays []string
...@@ -199,7 +189,7 @@ func expectOverlaysEventually(t *testing.T, exporter AddressExporter, wantOverla ...@@ -199,7 +189,7 @@ func expectOverlaysEventually(t *testing.T, exporter AddressExporter, wantOverla
t.Errorf("Overlays got %v, want %v", o, wantOverlays) t.Errorf("Overlays got %v, want %v", o, wantOverlays)
} }
func expectMultiaddresessEventually(t *testing.T, exporter AddressExporter, wantMultiaddresses []ma.Multiaddr) { func expectMultiaddresessEventually(t *testing.T, exporter ab.Interface, wantMultiaddresses []ma.Multiaddr) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
var stringMultiaddresses []string var stringMultiaddresses []string
m, err := exporter.Multiaddresses() m, err := exporter.Multiaddresses()
......
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/pkg/statestore/leveldb" "github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock" mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/full" "github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
...@@ -297,6 +298,33 @@ func NewBee(o Options) (*Bee, error) { ...@@ -297,6 +298,33 @@ func NewBee(o Options) (*Bee, error) {
} }
wg.Wait() wg.Wait()
overlays, err := addressbook.Overlays()
if err != nil {
return nil, fmt.Errorf("addressbook overlays: %w", err)
}
jobsC := make(chan struct{}, 16)
for _, o := range overlays {
jobsC <- struct{}{}
wg.Add(1)
go func(overlay swarm.Address) {
defer func() {
<-jobsC
}()
defer wg.Done()
if err := topologyDriver.AddPeer(p2pCtx, overlay); err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("topology add peer fail %s: %v", overlay, err)
logger.Errorf("topology add peer %s", overlay)
return
}
}(o)
}
wg.Wait()
return b, nil return b, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment