Commit c71539e0 authored by Petar Radovic's avatar Petar Radovic

hive, initial api, in progress

parent 339dafdc
...@@ -5,9 +5,10 @@ ...@@ -5,9 +5,10 @@
package hive package hive
import ( import (
"sync"
"time" "time"
"golang.org/x/sync/errgroup"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -19,43 +20,48 @@ const ( ...@@ -19,43 +20,48 @@ const (
streamVersion = "1.0.0" streamVersion = "1.0.0"
) )
// PeerSuggester suggests a peer to connect to
type PeerSuggester interface {
Subscribe() (c <-chan swarm.Address, unsubscribe func())
}
type PeerTracker interface {
AddPeer(address swarm.Address) error
Peers() (peers []p2p.Peer, err error)
}
// SaturationTracker tracks weather the saturation has changed
type SaturationTracker interface {
Subscribe() (c <-chan struct{}, unsubscribe func())
Depth() (int, error)
}
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
logger logging.Logger logger logging.Logger
peerSuggester PeerSuggester peerSuggester PeerSuggester
peerTracker PeerTracker
saturationTracker SaturationTracker saturationTracker SaturationTracker
quit chan struct{}
tickInterval time.Duration
done chan struct{}
wg sync.WaitGroup
} }
type Options struct { type Options struct {
Streamer p2p.Streamer Streamer p2p.Streamer
Logger logging.Logger Logger logging.Logger
PeerSuggester PeerSuggester PeerSuggester PeerSuggester
PeerTracker PeerTracker
SaturationTracker SaturationTracker SaturationTracker SaturationTracker
TickInterval time.Duration TickInterval time.Duration
} }
// PeerSuggester suggests a peer to connect to
type PeerSuggester interface {
SuggestPeer(addr swarm.Address) (peerAddress swarm.Address, err error)
}
// SaturationTracker tracks weather the saturation has changed
type SaturationTracker interface {
SaturationChanged() (saturationDepth int, changed bool)
}
func New(o Options) *Service { func New(o Options) *Service {
return &Service{ return &Service{
streamer: o.Streamer, streamer: o.Streamer,
logger: o.Logger, logger: o.Logger,
tickInterval: o.TickInterval,
peerSuggester: o.PeerSuggester, peerSuggester: o.PeerSuggester,
saturationTracker: o.SaturationTracker, saturationTracker: o.SaturationTracker,
done: make(chan struct{}), peerTracker: o.PeerTracker,
quit: make(chan struct{}),
} }
} }
...@@ -72,36 +78,79 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -72,36 +78,79 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
} }
} }
func (s *Service) Start() error { // todo:
s.wg.Add(1) // Hive is a skeleton of a hive function that should be called when the general hive service is started
go func() { // this is an attempt to run these loops outside of the peer level, to avoid having them for every peer
t := time.NewTicker(s.tickInterval) func (s *Service) Start() {
defer t.Stop() peerSuggestEvents, unsubPS := s.peerSuggester.Subscribe()
defer s.wg.Done() saturationChangeSignal, unsubSC := s.saturationTracker.Subscribe()
defer unsubPS()
for { defer unsubSC()
select {
case <-t.C: // todo: maybe refactor and listen separately on these events, this is just a skeleton for now
s.hive() for {
case <-s.done: select {
// todo: handle channel close and similar stuff, depending on the implementation of publishers
case ps := <-peerSuggestEvents:
if err := s.peerTracker.AddPeer(ps); err != nil {
// todo: handle err
return
}
case <-saturationChangeSignal:
depth, err := s.saturationTracker.Depth()
if err != nil {
// todo: handle err
return
}
if err := s.notifyAllPeers(depth); err != nil {
// todo: handle err
return return
} }
}
}()
return nil // todo: cancel, quit, etc...
case <-s.quit:
return
}
}
} }
func (s *Service) Stop() error { // Init is called when the new peer is being initialized.
close(s.done) // This should happen after overlay handshake is finished.
s.wg.Wait() func (s *Service) Init(peer p2p.Peer) error {
return nil depth, err := s.saturationTracker.Depth()
if err != nil {
// todo: handle err
return err
}
// todo: handle err
return s.notifyPeer(peer, depth)
} }
func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) error { func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) error {
// todo:
return nil return nil
} }
func (s *Service) hive() { func (s *Service) notifyAllPeers(depth int) error {
// todo: peers, err := s.peerTracker.Peers()
if err != nil {
return err
}
var eg errgroup.Group
for _, peer := range peers {
peer := peer
eg.Go(func() error {
return s.notifyPeer(peer, depth)
})
}
return eg.Wait()
}
func (s *Service) notifyPeer(peer p2p.Peer, depth int) error {
// todo: hive notify msg
return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment