Commit f801af89 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Rebuild topology when back-off happens (#146)

* use breaker.Backoff to handle bad network in topology driver
parent fc13e143
This diff is collapsed.
......@@ -13,6 +13,7 @@ import (
"net/http"
"path/filepath"
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
......@@ -51,6 +52,7 @@ type Bee struct {
tracerCloser io.Closer
stateStoreCloser io.Closer
localstoreCloser io.Closer
topologyCloser io.Closer
}
type Options struct {
......@@ -168,6 +170,7 @@ func NewBee(o Options) (*Bee, error) {
}
topologyDriver := full.New(hive, addressbook, p2ps, logger, address)
b.topologyCloser = topologyDriver
hive.SetPeerAddedHandler(topologyDriver.AddPeer)
p2ps.SetPeerAddedHandler(topologyDriver.AddPeer)
addrs, err := p2ps.Addresses()
......@@ -276,51 +279,13 @@ func NewBee(o Options) (*Bee, error) {
b.debugAPIServer = debugAPIServer
}
// Connect bootnodes
var wg sync.WaitGroup
for _, a := range o.Bootnodes {
wg.Add(1)
go func(aa string) {
defer wg.Done()
addr, err := ma.NewMultiaddr(aa)
if err != nil {
logger.Debugf("multiaddress fail %s: %v", aa, err)
logger.Errorf("connect to bootnode %s", aa)
return
}
overlay, err := p2ps.Connect(p2pCtx, addr)
if err != nil {
logger.Debugf("connect fail %s: %v", aa, err)
logger.Errorf("connect to bootnode %s", aa)
return
}
err = addressbook.Put(overlay, addr)
if err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("addressboook error persisting %s %s: %v", aa, overlay, err)
logger.Errorf("persisting node %s", aa)
return
}
if err := topologyDriver.AddPeer(p2pCtx, overlay); err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("topology add peer fail %s %s: %v", aa, overlay, err)
logger.Errorf("connect to bootnode %s", aa)
return
}
}(a)
}
wg.Wait()
overlays, err := addressbook.Overlays()
if err != nil {
return nil, fmt.Errorf("addressbook overlays: %w", err)
}
var count int32
var wg sync.WaitGroup
jobsC := make(chan struct{}, 16)
for _, o := range overlays {
jobsC <- struct{}{}
......@@ -337,11 +302,53 @@ func NewBee(o Options) (*Bee, error) {
logger.Errorf("topology add peer %s", overlay)
return
}
atomic.AddInt32(&count, 1)
}(o)
}
wg.Wait()
// Connect bootnodes if no nodes from the addressbook was sucesufully added to topology
if count == 0 {
for _, a := range o.Bootnodes {
wg.Add(1)
go func(a string) {
defer wg.Done()
addr, err := ma.NewMultiaddr(a)
if err != nil {
logger.Debugf("multiaddress fail %s: %v", a, err)
logger.Errorf("connect to bootnode %s", a)
return
}
overlay, err := p2ps.Connect(p2pCtx, addr)
if err != nil {
logger.Debugf("connect fail %s: %v", a, err)
logger.Errorf("connect to bootnode %s", a)
return
}
err = addressbook.Put(overlay, addr)
if err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("addressboook error persisting %s %s: %v", a, overlay, err)
logger.Errorf("persisting node %s", a)
return
}
if err := topologyDriver.AddPeer(p2pCtx, overlay); err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("topology add peer fail %s %s: %v", a, overlay, err)
logger.Errorf("connect to bootnode %s", a)
return
}
}(a)
}
wg.Wait()
}
return b, nil
}
......@@ -384,5 +391,9 @@ func (b *Bee) Shutdown(ctx context.Context) error {
return fmt.Errorf("localstore: %w", err)
}
if err := b.topologyCloser.Close(); err != nil {
return fmt.Errorf("topology driver: %w", err)
}
return b.errorLogWriter.Close()
}
......@@ -7,14 +7,41 @@ package p2p
import (
"errors"
"fmt"
"time"
)
// ErrPeerNotFound should be returned by p2p service methods when the requested
// peer is not found.
var ErrPeerNotFound = errors.New("peer not found")
var (
// ErrPeerNotFound should be returned by p2p service methods when the requested
// peer is not found.
ErrPeerNotFound = errors.New("peer not found")
// ErrAlreadyConnected is returned if connect was called for already connected node.
ErrAlreadyConnected = errors.New("already connected")
)
// ConnectionBackoffError indicates that connection calls will not be executed until `tryAfter` timetamp.
// The reason is provided in the wrappped error.
type ConnectionBackoffError struct {
tryAfter time.Time
err error
}
// NewConnectionBackoffError creates new `ConnectionBackoffError` with provided underlying error and `tryAfter` timestamp.
func NewConnectionBackoffError(err error, tryAfter time.Time) error {
return &ConnectionBackoffError{err: err, tryAfter: tryAfter}
}
// TryAfter returns a tryAfter timetamp.
func (e *ConnectionBackoffError) TryAfter() time.Time {
return e.tryAfter
}
// Unwrap returns an underlying error.
func (e *ConnectionBackoffError) Unwrap() error { return e.err }
// ErrAlreadyConnected is returned if connect was called for already connected node
var ErrAlreadyConnected = errors.New("already connected")
// Error implements function of the standard go error interface.
func (e *ConnectionBackoffError) Error() string {
return e.err.Error()
}
// DisconnectError is an error that is specifically handled inside p2p. If returned by specific protocol
// handler it causes peer disconnect.
......
......@@ -21,7 +21,7 @@ const (
var (
_ Interface = (*breaker)(nil)
// timeNow is used to deterministically mock time.Now() in tests
// timeNow is used to deterministically mock time.Now() in tests.
timeNow = time.Now
// ErrClosed is the special error type that indicates that breaker is closed and that is not executing functions at the moment.
......@@ -33,6 +33,9 @@ type Interface interface {
// f() call is not locked so it can still be executed concurently.
// Returns `ErrClosed` if the limit is reached or f() result otherwise.
Execute(f func() error) error
// ClosedUntil retuns the timestamp when the breaker will become open again.
ClosedUntil() time.Time
}
type breaker struct {
......@@ -88,6 +91,17 @@ func (b *breaker) Execute(f func() error) error {
return b.afterf(f())
}
func (b *breaker) ClosedUntil() time.Time {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.consFailedCalls >= b.limit {
return b.closedTimestamp.Add(b.backoff)
}
return timeNow()
}
func (b *breaker) beforef() error {
b.mtx.Lock()
defer b.mtx.Unlock()
......
......@@ -100,6 +100,35 @@ func TestExecute(t *testing.T) {
}
}
func TestClosedUntil(t *testing.T) {
timestamp := time.Now()
startBackoff := 1 * time.Minute
testError := errors.New("test error")
timeMock := timeMock{times: []time.Time{timestamp, timestamp, timestamp}}
breaker.SetTimeNow(timeMock.next)
b := breaker.NewBreaker(breaker.Options{
Limit: 1,
StartBackoff: startBackoff,
})
notClosed := b.ClosedUntil()
if notClosed != timestamp {
t.Fatalf("expected: %s, got: %s", timestamp, notClosed)
}
if err := b.Execute(func() error {
return testError
}); err != testError {
t.Fatalf("expected nil got %s", err)
}
closed := b.ClosedUntil()
if closed != timestamp.Add(startBackoff) {
t.Fatalf("expected: %s, got: %s", timestamp.Add(startBackoff), notClosed)
}
}
type timeMock struct {
times []time.Time
curr int
......
......@@ -35,7 +35,9 @@ import (
"github.com/multiformats/go-multistream"
)
var _ p2p.Service = (*Service)(nil)
var (
_ p2p.Service = (*Service)(nil)
)
type Service struct {
ctx context.Context
......@@ -169,7 +171,6 @@ func New(ctx context.Context, o Options) (*Service, error) {
}
// Construct protocols.
id := protocol.ID(p2p.NewSwarmStreamName(handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName))
matcher, err := s.protocolSemverMatcher(id)
if err != nil {
......@@ -278,7 +279,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
s.metrics.HandledStreamCount.Inc()
if err := ss.Handler(ctx, p2p.Peer{Address: overlay}, stream); err != nil {
var e *p2p.DisconnectError
if errors.Is(err, e) {
if errors.As(err, &e) {
// todo: test connection close and refactor
_ = s.Disconnect(overlay)
}
......@@ -318,6 +319,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
}
if err := s.conectionBreaker.Execute(func() error { return s.host.Connect(ctx, *info) }); err != nil {
if errors.Is(err, breaker.ErrClosed) {
return swarm.Address{}, p2p.NewConnectionBackoffError(err, s.conectionBreaker.ClosedUntil())
}
return swarm.Address{}, err
}
......
......@@ -31,17 +31,18 @@ var _ topology.Driver = (*driver)(nil)
// - Every peer which is added to the driver gets broadcasted to every other peer regardless of its address.
// - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (Peerer interface).
type driver struct {
base swarm.Address // the base address for this node
base swarm.Address // the base address for this node
discovery discovery.Driver
addressBook addressbook.GetPutter
addressBook addressbook.Interface
p2pService p2p.Service
receivedPeers map[string]struct{} // track already received peers. Note: implement cleanup or expiration if needed to stop infinite grow
mtx sync.Mutex // guards received peers
backoffActive bool
logger logging.Logger
mtx sync.Mutex
quit chan struct{}
}
func New(disc discovery.Driver, addressBook addressbook.GetPutter, p2pService p2p.Service, logger logging.Logger, baseAddress swarm.Address) topology.Driver {
func New(disc discovery.Driver, addressBook addressbook.Interface, p2pService p2p.Service, logger logging.Logger, baseAddress swarm.Address) topology.Driver {
return &driver{
base: baseAddress,
discovery: disc,
......@@ -49,12 +50,13 @@ func New(disc discovery.Driver, addressBook addressbook.GetPutter, p2pService p2
p2pService: p2pService,
receivedPeers: make(map[string]struct{}),
logger: logger,
quit: make(chan struct{}),
}
}
// AddPeer adds a new peer to the topology driver.
// The peer would be subsequently broadcasted to all connected peers.
// All conneceted peers are also broadcasted to the new peer.
// All connected peers are also broadcasted to the new peer.
func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
d.mtx.Lock()
if _, ok := d.receivedPeers[addr.ByteString()]; ok {
......@@ -77,6 +79,14 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
if !isConnected(addr, connectedPeers) {
peerAddr, err := d.p2pService.Connect(ctx, ma)
if err != nil {
d.mtx.Lock()
delete(d.receivedPeers, addr.ByteString())
d.mtx.Unlock()
var e *p2p.ConnectionBackoffError
if errors.As(err, &e) {
d.backoff(e.TryAfter())
return err
}
return err
}
......@@ -107,11 +117,7 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
return nil
}
if err := d.discovery.BroadcastPeers(context.Background(), addr, connectedAddrs...); err != nil {
return err
}
return nil
return d.discovery.BroadcastPeers(context.Background(), addr, connectedAddrs...)
}
// ClosestPeer returns the closest connected peer we have in relation to a
......@@ -149,6 +155,60 @@ func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) {
return closest, nil
}
func (d *driver) Close() error {
close(d.quit)
return nil
}
func (d *driver) backoff(tryAfter time.Time) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.backoffActive {
return
}
d.backoffActive = true
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
select {
case <-done:
case <-d.quit:
}
}()
go func() {
defer func() { close(done) }()
select {
case <-time.After(time.Until(tryAfter)):
d.mtx.Lock()
d.backoffActive = false
d.mtx.Unlock()
addresses, _ := d.addressBook.Overlays()
for _, addr := range addresses {
select {
case <-d.quit:
return
default:
if err := d.AddPeer(ctx, addr); err != nil {
var e *p2p.ConnectionBackoffError
if errors.As(err, &e) {
d.backoff(e.TryAfter())
return
}
}
}
}
case <-d.quit:
return
}
}()
}
func isConnected(addr swarm.Address, connectedPeers []p2p.Peer) bool {
for _, p := range connectedPeers {
if p.Address.Equal(addr) {
......
......@@ -52,6 +52,7 @@ func TestAddPeer(t *testing.T) {
}))
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
......@@ -82,6 +83,7 @@ func TestAddPeer(t *testing.T) {
}))
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
err := fullDriver.AddPeer(context.Background(), overlay)
if !errors.Is(err, topology.ErrNotFound) {
t.Fatalf("full conn driver returned err %v", err)
......@@ -106,6 +108,7 @@ func TestAddPeer(t *testing.T) {
}))
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal("error creating multiaddr")
......@@ -153,6 +156,7 @@ func TestAddPeer(t *testing.T) {
}))
fullDriver := full.New(discovery, ab, p2ps, logger, overlay)
defer fullDriver.Close()
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
......@@ -213,6 +217,7 @@ func TestClosestPeer(t *testing.T) {
}))
fullDriver := full.New(discovery, ab, p2ps, logger, baseOverlay)
defer fullDriver.Close()
for _, tc := range []struct {
chunkAddress swarm.Address // chunk address to test
......
......@@ -65,6 +65,10 @@ func (d *mock) ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err erro
return d.closestPeer, d.closestPeerErr
}
func (d *mock) Close() error {
return nil
}
type Option interface {
apply(*mock)
}
......
......@@ -7,6 +7,7 @@ package topology
import (
"context"
"errors"
"io"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -17,6 +18,7 @@ var ErrWantSelf = errors.New("node wants self")
type Driver interface {
PeerAdder
ClosestPeerer
io.Closer
}
type PeerAdder interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment