Commit b7af2ac8 authored by Matthew Slipper's avatar Matthew Slipper

op-node: Monitor static peers in a background process

Currently, connections with static peers are not re-established in the event of disconnection. This causes us to have to restart replicas whenever we restart the sequencer. This PR adds a background process to monitor static peers for connectedness, and automatically reconnect when necessary.
parent eb68d839
...@@ -4,8 +4,11 @@ import ( ...@@ -4,8 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
"github.com/libp2p/go-libp2p/core/network"
lconf "github.com/libp2p/go-libp2p/config" lconf "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
...@@ -34,6 +37,11 @@ type extraHost struct { ...@@ -34,6 +37,11 @@ type extraHost struct {
host.Host host.Host
gater ConnectionGater gater ConnectionGater
connMgr connmgr.ConnManager connMgr connmgr.ConnManager
log log.Logger
staticPeers []*peer.AddrInfo
quitC chan struct{}
} }
func (e *extraHost) ConnectionGater() ConnectionGater { func (e *extraHost) ConnectionGater() ConnectionGater {
...@@ -44,6 +52,73 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager { ...@@ -44,6 +52,73 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager {
return e.connMgr return e.connMgr
} }
func (e *extraHost) Close() error {
close(e.quitC)
return e.Host.Close()
}
func (e *extraHost) initStaticPeers() {
for _, addr := range e.staticPeers {
e.Peerstore().AddAddrs(addr.ID, addr.Addrs, time.Hour*24*7)
// We protect the peer, so the connection manager doesn't decide to prune it.
// We tag it with "static" so other protects/unprotects with different tags don't affect this protection.
e.connMgr.Protect(addr.ID, "static")
// Try to dial the node in the background
go func(addr *peer.AddrInfo) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
if err := e.dialStaticPeer(ctx, addr); err != nil {
e.log.Warn("error dialing static peer", "peer", addr.ID, "err", err)
}
}(addr)
}
}
func (e *extraHost) dialStaticPeer(ctx context.Context, addr *peer.AddrInfo) error {
e.log.Info("dialing static peer", "peer", addr.ID, "addrs", addr.Addrs)
if _, err := e.Network().DialPeer(ctx, addr.ID); err != nil {
return err
}
return nil
}
func (e *extraHost) monitorStaticPeers() {
tick := time.NewTicker(time.Minute)
defer tick.Stop()
for {
select {
case <-tick.C:
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
var wg sync.WaitGroup
e.log.Debug("polling static peers")
for _, addr := range e.staticPeers {
connectedness := e.Network().Connectedness(addr.ID)
e.log.Trace("static peer connectedness", "peer", addr.ID, "connectedness", connectedness)
if connectedness == network.Connected {
continue
}
wg.Add(1)
go func(addr *peer.AddrInfo) {
e.log.Warn("static peer disconnected, reconnecting", "peer", addr.ID)
if err := e.dialStaticPeer(ctx, addr); err != nil {
e.log.Warn("error reconnecting to static peer", "peer", addr.ID, "err", err)
}
wg.Done()
}(addr)
}
wg.Wait()
cancel()
case <-e.quitC:
return
}
}
}
var _ ExtraHostFeatures = (*extraHost)(nil) var _ ExtraHostFeatures = (*extraHost)(nil)
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) { func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) {
...@@ -142,26 +217,28 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host, ...@@ -142,26 +217,28 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, peerAddr := range conf.StaticPeers {
staticPeers := make([]*peer.AddrInfo, len(conf.StaticPeers))
for i, peerAddr := range conf.StaticPeers {
addr, err := peer.AddrInfoFromP2pAddr(peerAddr) addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil { if err != nil {
return nil, fmt.Errorf("bad peer address: %w", err) return nil, fmt.Errorf("bad peer address: %w", err)
} }
h.Peerstore().AddAddrs(addr.ID, addr.Addrs, time.Hour*24*7) staticPeers[i] = addr
// We protect the peer, so the connection manager doesn't decide to prune it.
// We tag it with "static" so other protects/unprotects with different tags don't affect this protection.
connMngr.Protect(addr.ID, "static")
// Try to dial the node in the background
go func() {
log.Info("Dialing static peer", "peer", addr.ID, "addrs", addr.Addrs)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
if _, err := h.Network().DialPeer(ctx, addr.ID); err != nil {
log.Warn("Failed to dial static peer", "peer", addr.ID, "addrs", addr.Addrs)
}
}()
} }
out := &extraHost{Host: h, connMgr: connMngr}
out := &extraHost{
Host: h,
connMgr: connMngr,
log: log,
staticPeers: staticPeers,
quitC: make(chan struct{}),
}
out.initStaticPeers()
if len(conf.StaticPeers) > 0 {
go out.monitorStaticPeers()
}
// Only add the connection gater if it offers the full interface we're looking for. // Only add the connection gater if it offers the full interface we're looking for.
if g, ok := connGtr.(ConnectionGater); ok { if g, ok := connGtr.(ConnectionGater); ok {
out.gater = g out.gater = g
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment