Commit 984dca0a authored by acud's avatar acud Committed by GitHub

refactor: improve shutdown sequence (#1926)

parent 67fa76eb
......@@ -157,7 +157,7 @@ func (s *server) Close() error {
select {
case <-done:
case <-time.After(5 * time.Second):
case <-time.After(1 * time.Second):
return errors.New("api shutting down with open websockets")
}
......
......@@ -21,6 +21,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
......@@ -66,6 +67,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/kademlia"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing"
......@@ -79,6 +81,7 @@ import (
type Bee struct {
p2pService io.Closer
p2pHalter p2p.Halter
p2pCancel context.CancelFunc
apiCloser io.Closer
apiServer *http.Server
......@@ -90,6 +93,7 @@ type Bee struct {
stateStoreCloser io.Closer
localstoreCloser io.Closer
topologyCloser io.Closer
topologyHalter topology.Halter
pusherCloser io.Closer
pullerCloser io.Closer
pullSyncCloser io.Closer
......@@ -313,6 +317,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
return nil, fmt.Errorf("p2p service: %w", err)
}
b.p2pService = p2ps
b.p2pHalter = p2ps
// localstore depends on batchstore
var path string
......@@ -439,6 +444,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, metricsDB, logger, kademlia.Options{Bootnodes: bootnodes, StandaloneMode: o.Standalone, BootnodeMode: o.BootnodeMode})
b.topologyCloser = kad
b.topologyHalter = kad
hive.SetAddPeersHandler(kad.AddPeers)
p2ps.SetPickyNotifier(kad)
batchStore.SetRadiusSetter(kad)
......@@ -699,6 +705,14 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
func (b *Bee) Shutdown(ctx context.Context) error {
var mErr error
// halt kademlia while shutting down other
// components.
b.topologyHalter.Halt()
// halt p2p layer from accepting new connections
// while shutting down other components
b.p2pHalter.Halt()
// tryClose is a convenient closure which decrease
// repetitive io.Closer tryClose procedure.
tryClose := func(c io.Closer, errMsg string) {
......@@ -737,15 +751,46 @@ func (b *Bee) Shutdown(ctx context.Context) error {
if b.recoveryHandleCleanup != nil {
b.recoveryHandleCleanup()
}
var wg sync.WaitGroup
wg.Add(4)
go func() {
defer wg.Done()
tryClose(b.pssCloser, "pss")
}()
go func() {
defer wg.Done()
tryClose(b.pusherCloser, "pusher")
}()
go func() {
defer wg.Done()
tryClose(b.pullerCloser, "puller")
tryClose(b.pullSyncCloser, "pull sync")
tryClose(b.pssCloser, "pss")
}()
b.p2pCancel()
go func() {
defer wg.Done()
tryClose(b.pullSyncCloser, "pull sync")
}()
wg.Wait()
tryClose(b.p2pService, "p2p server")
wg.Add(3)
go func() {
defer wg.Done()
tryClose(b.transactionMonitorCloser, "transaction monitor")
}()
go func() {
defer wg.Done()
tryClose(b.listenerCloser, "listener")
}()
go func() {
defer wg.Done()
tryClose(b.postageServiceCloser, "postage service")
}()
wg.Wait()
if c := b.ethClientCloser; c != nil {
c()
......@@ -753,11 +798,9 @@ func (b *Bee) Shutdown(ctx context.Context) error {
tryClose(b.tracerCloser, "tracer")
tryClose(b.tagsCloser, "tag persistence")
tryClose(b.listenerCloser, "listener")
tryClose(b.postageServiceCloser, "postage service")
tryClose(b.topologyCloser, "topology driver")
tryClose(b.stateStoreCloser, "statestore")
tryClose(b.localstoreCloser, "localstore")
tryClose(b.topologyCloser, "topology driver")
tryClose(b.errorLogWriter, "error log writer")
tryClose(b.resolverCloser, "resolver service")
......
......@@ -69,6 +69,7 @@ type Service struct {
logger logging.Logger
tracer *tracing.Tracer
ready chan struct{}
halt chan struct{}
lightNodes lightnodes
lightNodeLimit int
protocolsmu sync.RWMutex
......@@ -239,6 +240,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
tracer: tracer,
connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options
ready: make(chan struct{}),
halt: make(chan struct{}),
lightNodes: lightNodes,
}
......@@ -270,9 +272,14 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
func (s *Service) handleIncoming(stream network.Stream) {
select {
case <-s.ready:
case <-s.halt:
go func() { _ = stream.Reset() }()
return
case <-s.ctx.Done():
go func() { _ = stream.Reset() }()
return
}
peerID := stream.Conn().RemotePeer()
handshakeStream := NewStream(stream)
i, err := s.handshakeService.Handle(s.ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID)
......@@ -695,9 +702,8 @@ func (s *Service) disconnected(address swarm.Address) {
peer := p2p.Peer{Address: address}
peerID, found := s.peers.peerID(address)
if !found {
s.logger.Debugf("libp2p disconnected: cannot find peerID for overlay: %s", address.String())
} else {
if found {
// peerID might not always be found on shutdown
full, found := s.peers.fullnode(peerID)
if found {
peer.FullNode = full
......@@ -806,3 +812,7 @@ func (s *Service) GetWelcomeMessage() string {
func (s *Service) Ready() {
close(s.ready)
}
func (s *Service) Halt() {
close(s.halt)
}
......@@ -166,6 +166,8 @@ func (s *Service) GetWelcomeMessage() string {
return s.welcomeMessage
}
func (s *Service) Halt() {}
func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error {
if s.blocklistFunc == nil {
return errors.New("function blocklist not configured")
......
......@@ -26,6 +26,7 @@ type Service interface {
BlocklistedPeers() ([]Peer, error)
Addresses() ([]ma.Multiaddr, error)
SetPickyNotifier(PickyNotifier)
Halter
}
type Disconnecter interface {
......@@ -35,6 +36,11 @@ type Disconnecter interface {
Blocklist(overlay swarm.Address, duration time.Duration) error
}
type Halter interface {
// Halt new incoming connections while shutting down
Halt()
}
// PickyNotifer can decide whether a peer should be picked
type PickyNotifier interface {
Pick(Peer) bool
......
......@@ -252,6 +252,11 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
go func() {
err := listenf()
if err != nil {
if errors.Is(err, context.Canceled) {
// context cancelled is returned on shutdown,
// therefore we do nothing here
return
}
l.logger.Errorf("failed syncing event listener, shutting down node err: %v", err)
if l.shutdowner != nil {
err = l.shutdowner.Shutdown(context.Background())
......
......@@ -422,14 +422,6 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
func (p *Puller) Close() error {
p.logger.Info("puller shutting down")
close(p.quit)
p.syncPeersMtx.Lock()
defer p.syncPeersMtx.Unlock()
for i := uint8(0); i < p.bins; i++ {
binPeers := p.syncPeers[i]
for _, peer := range binPeers {
peer.gone()
}
}
cc := make(chan struct{})
go func() {
defer close(cc)
......
......@@ -514,7 +514,7 @@ func (s *Syncer) Close() error {
select {
case <-cc:
case <-time.After(10 * time.Second):
case <-time.After(5 * time.Second):
s.logger.Warning("pull syncer shutting down with running goroutines")
}
return nil
......
......@@ -91,6 +91,7 @@ type Kad struct {
bootnode bool // indicates whether the node is working in bootnode mode
collector *metrics.Collector
quit chan struct{} // quit channel
halt chan struct{} // halt channel
done chan struct{} // signal that `manage` has quit
wg sync.WaitGroup
waitNext *waitnext.WaitNext
......@@ -135,6 +136,7 @@ func New(
bootnode: o.BootnodeMode,
collector: metrics.NewCollector(metricsDB),
quit: make(chan struct{}),
halt: make(chan struct{}),
done: make(chan struct{}),
wg: sync.WaitGroup{},
}
......@@ -429,6 +431,7 @@ func (k *Kad) notifyManageLoop() {
func (k *Kad) manage() {
defer k.wg.Done()
defer close(k.done)
defer k.logger.Debugf("kademlia manage loop exited")
ctx, cancel := context.WithCancel(context.Background())
go func() {
......@@ -455,6 +458,9 @@ func (k *Kad) manage() {
start := time.Now()
select {
case <-k.halt:
// halt stops dial-outs while shutting down
return
case <-k.quit:
return
default:
......@@ -476,6 +482,11 @@ func (k *Kad) manage() {
)
if k.connectedPeers.Length() == 0 {
select {
case <-k.halt:
continue
default:
}
k.logger.Debug("kademlia: no connected peers, trying bootnodes")
k.connectBootnodes(ctx)
}
......@@ -538,7 +549,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
connected++
// connect to max 3 bootnodes
return connected >= 3, nil
}); err != nil {
}); err != nil && !errors.Is(err, context.Canceled) {
k.logger.Debugf("discover fail %s: %v", addr, err)
k.logger.Warningf("discover to bootnode %s", addr)
return
......@@ -717,10 +728,8 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e
// about lightnodes to others.
continue
}
k.wg.Add(1)
go func(connectedPeer swarm.Address) {
defer k.wg.Done()
if err := k.discovery.BroadcastPeers(context.Background(), connectedPeer, peer); err != nil {
if err := k.discovery.BroadcastPeers(ctx, connectedPeer, peer); err != nil {
k.logger.Debugf("could not gossip peer %s to peer %s: %v", peer, connectedPeer, err)
}
}(connectedPeer)
......@@ -1170,6 +1179,13 @@ func (k *Kad) String() string {
return string(b)
}
// Halt stops outgoing connections from happening.
// This is needed while we shut down, so that further topology
// changes do not happen while we shut down.
func (k *Kad) Halt() {
close(k.halt)
}
// Close shuts down kademlia.
func (k *Kad) Close() error {
k.logger.Info("kademlia shutting down")
......@@ -1193,6 +1209,7 @@ func (k *Kad) Close() error {
k.logger.Warning("kademlia manage loop did not shut down properly")
}
k.logger.Info("kademlia persisting peer metrics")
if err := k.collector.Finalize(time.Now()); err != nil {
k.logger.Debugf("kademlia: unable to finalize open sessions: %v", err)
}
......
......@@ -182,9 +182,8 @@ func (m *Mock) ResetPeers() {
m.eachPeerRev = nil
}
func (m *Mock) Close() error {
panic("not implemented") // TODO: Implement
}
func (d *Mock) Halt() {}
func (m *Mock) Close() error { return nil }
func (m *Mock) Snapshot() *topology.KadParams {
panic("not implemented") // TODO: Implement
......
......@@ -193,9 +193,8 @@ func (d *mock) Snapshot() *topology.KadParams {
return new(topology.KadParams)
}
func (d *mock) Close() error {
return nil
}
func (d *mock) Halt() {}
func (d *mock) Close() error { return nil }
type Option interface {
apply(*mock)
......
......@@ -28,6 +28,7 @@ type Driver interface {
NeighborhoodDepth() uint8
SubscribePeersChange() (c <-chan struct{}, unsubscribe func())
io.Closer
Halter
Snapshot() *KadParams
}
......@@ -130,3 +131,9 @@ type KadParams struct {
Bins KadBins `json:"bins"` // individual bin info
LightNodes BinInfo `json:"lightNodes"` // light nodes bin info
}
type Halter interface {
// Halt the topology from initiating new connections
// while allowing it to still run.
Halt()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment