Commit 9371a9bd authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Connection Breaker (#121)

Breaker that back-offs the connect calls in the case of flood of consecutive failed attempts.
parent 886deff1
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package breaker
import (
"errors"
"sync"
"time"
)
const (
// defaults
limit = 100
failInterval = 30 * time.Minute
maxBackoff = time.Hour
backoff = 2 * time.Minute
)
var (
_ Interface = (*breaker)(nil)
// timeNow is used to deterministically mock time.Now() in tests
timeNow = time.Now
// ErrClosed is the special error type that indicates that breaker is closed and that is not executing functions at the moment.
ErrClosed = errors.New("breaker closed")
)
type Interface interface {
// Execute runs f() if the limit number of consecutive failed calls is not reached within fail interval.
// f() call is not locked so it can still be executed concurently.
// Returns `ErrClosed` if the limit is reached or f() result otherwise.
Execute(f func() error) error
}
type breaker struct {
limit int // breaker will not exeucute any more tasks after limit number of consequtive failuers happen
consFailedCalls int // current number of consequtive fails
firstFailedTimestamp time.Time
closedTimestamp time.Time
backoff time.Duration // initial backoff duration
maxBackoff time.Duration
failInterval time.Duration // consequitive failures are counted if they happen withing this interval
mtx sync.Mutex
}
type Options struct {
Limit int
FailInterval time.Duration
StartBackoff time.Duration
MaxBackoff time.Duration
}
func NewBreaker(o Options) Interface {
breaker := &breaker{
limit: o.Limit,
backoff: o.StartBackoff,
maxBackoff: o.MaxBackoff,
failInterval: o.FailInterval,
}
if o.Limit == 0 {
breaker.limit = limit
}
if o.FailInterval == 0 {
breaker.failInterval = failInterval
}
if o.MaxBackoff == 0 {
breaker.maxBackoff = maxBackoff
}
if o.StartBackoff == 0 {
breaker.backoff = backoff
}
return breaker
}
func (b *breaker) Execute(f func() error) error {
if err := b.beforef(); err != nil {
return err
}
return b.afterf(f())
}
func (b *breaker) beforef() error {
b.mtx.Lock()
defer b.mtx.Unlock()
// use timeNow().Sub() instead of time.Since() so it can be deterministically mocked in tests
if b.consFailedCalls >= b.limit {
if b.closedTimestamp.IsZero() || timeNow().Sub(b.closedTimestamp) < b.backoff {
return ErrClosed
}
b.resetFailed()
if newBackoff := b.backoff * 2; newBackoff <= b.maxBackoff {
b.backoff = newBackoff
} else {
b.backoff = b.maxBackoff
}
}
if !b.firstFailedTimestamp.IsZero() && timeNow().Sub(b.firstFailedTimestamp) >= b.failInterval {
b.resetFailed()
}
return nil
}
func (b *breaker) afterf(err error) error {
b.mtx.Lock()
defer b.mtx.Unlock()
if err != nil {
if b.consFailedCalls == 0 {
b.firstFailedTimestamp = timeNow()
}
b.consFailedCalls++
if b.consFailedCalls == b.limit {
b.closedTimestamp = timeNow()
}
return err
}
b.resetFailed()
return nil
}
func (b *breaker) resetFailed() {
b.consFailedCalls = 0
b.firstFailedTimestamp = time.Time{}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package breaker_test
import (
"errors"
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/breaker"
)
func TestExecute(t *testing.T) {
testErr := errors.New("test error")
shouldNotBeCalledErr := errors.New("should not be called")
failInterval := 10 * time.Minute
startBackoff := 1 * time.Minute
initTime := time.Now()
testCases := map[string]struct {
limit int
ferrors []error
iterations int
times []time.Time
expectedErrs []error
}{
"f() returns nil": {
limit: 5,
iterations: 1,
ferrors: []error{nil},
times: nil,
expectedErrs: []error{nil},
},
"f() returns error": {
limit: 5,
ferrors: []error{testErr},
iterations: 1,
times: nil,
expectedErrs: []error{testErr},
},
"Break error": {
limit: 1,
ferrors: []error{testErr, shouldNotBeCalledErr},
iterations: 3,
times: nil,
expectedErrs: []error{testErr, breaker.ErrClosed, breaker.ErrClosed},
},
"Break error - mix iterations": {
limit: 3,
ferrors: []error{testErr, nil, testErr, testErr, testErr, shouldNotBeCalledErr},
iterations: 6,
times: nil,
expectedErrs: []error{testErr, nil, testErr, testErr, testErr, breaker.ErrClosed},
},
"Expiration - return f() error": {
limit: 3,
ferrors: []error{testErr, testErr, testErr, testErr, testErr},
iterations: 5,
times: []time.Time{initTime, initTime, initTime.Add(2 * failInterval), initTime, initTime, initTime, initTime},
expectedErrs: []error{testErr, testErr, testErr, testErr, testErr},
},
"Backoff - close, reopen, close, don't open": {
limit: 1,
ferrors: []error{testErr, shouldNotBeCalledErr, testErr, shouldNotBeCalledErr, testErr, shouldNotBeCalledErr, shouldNotBeCalledErr},
iterations: 7,
times: []time.Time{initTime, initTime, initTime, initTime.Add(startBackoff + time.Second), initTime, initTime, initTime, initTime.Add(2*startBackoff + time.Second), initTime, initTime, initTime, initTime.Add(startBackoff + time.Second)},
expectedErrs: []error{testErr, breaker.ErrClosed, testErr, breaker.ErrClosed, testErr, breaker.ErrClosed, breaker.ErrClosed},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
b := breaker.NewBreaker(breaker.Options{
Limit: tc.limit,
StartBackoff: startBackoff,
FailInterval: failInterval,
})
if tc.times != nil {
timeMock := timeMock{times: tc.times}
breaker.SetTimeNow(timeMock.next)
} else {
breaker.SetTimeNow(time.Now)
}
for i := 0; i < tc.iterations; i++ {
if err := b.Execute(func() error {
if tc.ferrors[i] == shouldNotBeCalledErr {
t.Fatal(tc.ferrors[i])
}
return tc.ferrors[i]
}); err != tc.expectedErrs[i] {
t.Fatalf("expected err: %s, got: %s, iteration %v", tc.expectedErrs[i], err, i)
}
}
})
}
}
type timeMock struct {
times []time.Time
curr int
}
func (t *timeMock) next() time.Time {
defer func() { t.curr++ }()
return t.times[t.curr]
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package breaker
import "time"
func SetTimeNow(f func() time.Time) {
timeNow = f
}
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/breaker"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
...@@ -46,6 +47,7 @@ type Service struct { ...@@ -46,6 +47,7 @@ type Service struct {
addrssbook addressbook.Putter addrssbook addressbook.Putter
peers *peerRegistry peers *peerRegistry
peerHandler func(context.Context, swarm.Address) error peerHandler func(context.Context, swarm.Address) error
conectionBreaker breaker.Interface
logger logging.Logger logger logging.Logger
tracer *tracing.Tracer tracer *tracing.Tracer
} }
...@@ -163,6 +165,7 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -163,6 +165,7 @@ func New(ctx context.Context, o Options) (*Service, error) {
addrssbook: o.Addressbook, addrssbook: o.Addressbook,
logger: o.Logger, logger: o.Logger,
tracer: o.Tracer, tracer: o.Tracer,
conectionBreaker: breaker.NewBreaker(breaker.Options{}), // todo: fill non-default options
} }
// Construct protocols. // Construct protocols.
...@@ -314,7 +317,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm ...@@ -314,7 +317,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
return swarm.Address{}, p2p.ErrAlreadyConnected return swarm.Address{}, p2p.ErrAlreadyConnected
} }
if err := s.host.Connect(ctx, *info); err != nil { if err := s.conectionBreaker.Execute(func() error { return s.host.Connect(ctx, *info) }); err != nil {
return swarm.Address{}, err return swarm.Address{}, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment