Commit f2f02256 authored by Janos Guljas's avatar Janos Guljas

add Bee node

parent 1277299c
...@@ -6,10 +6,6 @@ package cmd ...@@ -6,10 +6,6 @@ package cmd
import ( import (
"context" "context"
"fmt"
"log"
"net"
"net/http"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
...@@ -18,11 +14,9 @@ import ( ...@@ -18,11 +14,9 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/janos/bee/pkg/api"
"github.com/janos/bee/pkg/debugapi"
"github.com/janos/bee/pkg/logging" "github.com/janos/bee/pkg/logging"
"github.com/janos/bee/pkg/node"
"github.com/janos/bee/pkg/p2p/libp2p" "github.com/janos/bee/pkg/p2p/libp2p"
"github.com/janos/bee/pkg/pingpong"
) )
func (c *command) initStartCmd() (err error) { func (c *command) initStartCmd() (err error) {
...@@ -51,96 +45,23 @@ func (c *command) initStartCmd() (err error) { ...@@ -51,96 +45,23 @@ func (c *command) initStartCmd() (err error) {
logger := logging.New(cmd.OutOrStdout()) logger := logging.New(cmd.OutOrStdout())
logger.SetLevel(logrus.TraceLevel) logger.SetLevel(logrus.TraceLevel)
errorLogWriter := logger.WriterLevel(logrus.ErrorLevel) b, err := node.NewBee(node.Options{
defer errorLogWriter.Close() APIAddr: c.config.GetString(optionNameAPIAddr),
DebugAPIAddr: c.config.GetString(optionNameDebugAPIAddr),
ctx, cancel := context.WithCancel(context.Background()) LibP2POptions: libp2p.Options{
defer cancel() Addr: c.config.GetString(optionNameP2PAddr),
DisableWS: c.config.GetBool(optionNameP2PDisableWS),
// Construct P2P service. DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC),
p2ps, err := libp2p.New(ctx, libp2p.Options{ Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
Addr: c.config.GetString(optionNameP2PAddr), NetworkID: c.config.GetInt(optionNameNetworkID),
DisableWS: c.config.GetBool(optionNameP2PDisableWS), ConnectionsLow: c.config.GetInt(optionNameConnectionsLow),
DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC), ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes), ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace),
NetworkID: c.config.GetInt(optionNameNetworkID), },
ConnectionsLow: c.config.GetInt(optionNameConnectionsLow), Logger: logger,
ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh),
ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace),
}) })
if err != nil { if err != nil {
return fmt.Errorf("p2p service: %w", err) return err
}
// Construct protocols.
pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps,
Logger: logger,
})
// Add protocols to the P2P service.
if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
return fmt.Errorf("pingpong service: %w", err)
}
addrs, err := p2ps.Addresses()
if err != nil {
return fmt.Errorf("get server addresses: %w", err)
}
for _, addr := range addrs {
logger.Infof("address: %s", addr)
}
// API server
apiService := api.New(api.Options{
P2P: p2ps,
Pingpong: pingPong,
})
apiListener, err := net.Listen("tcp", c.config.GetString(optionNameAPIAddr))
if err != nil {
return fmt.Errorf("api listener: %w", err)
}
apiServer := &http.Server{
Handler: apiService,
ErrorLog: log.New(errorLogWriter, "", 0),
}
go func() {
logger.Infof("api address: %s", apiListener.Addr())
if err := apiServer.Serve(apiListener); err != nil && err != http.ErrServerClosed {
logger.Errorf("api server: %v", err)
}
}()
var debugAPIServer *http.Server
if addr := c.config.GetString(optionNameDebugAPIAddr); addr != "" {
// Debug API server
debugAPIService := debugapi.New(debugapi.Options{})
// register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
debugAPIService.MustRegisterMetrics(apiService.Metrics()...)
debugAPIListener, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("debug api listener: %w", err)
}
debugAPIServer := &http.Server{
Handler: debugAPIService,
ErrorLog: log.New(errorLogWriter, "", 0),
}
go func() {
logger.Infof("debug api address: %s", debugAPIListener.Addr())
if err := debugAPIServer.Serve(debugAPIListener); err != nil && err != http.ErrServerClosed {
logger.Errorf("debug api server: %v", err)
}
}()
} }
// Wait for termination or interrupt signals. // Wait for termination or interrupt signals.
...@@ -156,28 +77,13 @@ func (c *command) initStartCmd() (err error) { ...@@ -156,28 +77,13 @@ func (c *command) initStartCmd() (err error) {
// Shutdown // Shutdown
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer func() {
if err := recover(); err != nil {
logger.Errorf("shutdown panic: %v", err)
}
}()
defer close(done) defer close(done)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
if err := apiServer.Shutdown(ctx); err != nil { if err := b.Shutdown(ctx); err != nil {
logger.Errorf("api server shutdown: %v", err) logger.Errorf("shutdown: %v", err)
}
if debugAPIServer != nil {
if err := debugAPIServer.Shutdown(ctx); err != nil {
logger.Errorf("debug api server shutdown: %v", err)
}
}
if err := p2ps.Close(); err != nil {
logger.Errorf("p2p server shutdown: %v", err)
} }
}() }()
...@@ -198,7 +104,7 @@ func (c *command) initStartCmd() (err error) { ...@@ -198,7 +104,7 @@ func (c *command) initStartCmd() (err error) {
cmd.Flags().Bool(optionNameP2PDisableWS, false, "disable P2P WebSocket protocol") cmd.Flags().Bool(optionNameP2PDisableWS, false, "disable P2P WebSocket protocol")
cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "disable P2P QUIC protocol") cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "disable P2P QUIC protocol")
cmd.Flags().StringSlice(optionNameBootnodes, nil, "initial nodes to connect to") cmd.Flags().StringSlice(optionNameBootnodes, nil, "initial nodes to connect to")
cmd.Flags().String(optionNameDebugAPIAddr, ":6060", "Debug HTTP API listen address") cmd.Flags().String(optionNameDebugAPIAddr, "", "debug HTTP API listen address, e.g. 127.0.0.1:6060")
cmd.Flags().Int(optionNameNetworkID, 1, "ID of the Swarm network") cmd.Flags().Int(optionNameNetworkID, 1, "ID of the Swarm network")
cmd.Flags().Int(optionNameConnectionsLow, 200, "low watermark governing the number of connections that'll be maintained") cmd.Flags().Int(optionNameConnectionsLow, 200, "low watermark governing the number of connections that'll be maintained")
cmd.Flags().Int(optionNameConnectionsHigh, 400, "high watermark governing the number of connections that'll be maintained") cmd.Flags().Int(optionNameConnectionsHigh, 400, "high watermark governing the number of connections that'll be maintained")
......
...@@ -22,5 +22,6 @@ require ( ...@@ -22,5 +22,6 @@ require (
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5 github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.6.1 github.com/spf13/viper v1.6.1
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
resenje.org/web v0.4.0 resenje.org/web v0.4.0
) )
...@@ -530,6 +530,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ ...@@ -530,6 +530,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package node
import (
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/janos/bee/pkg/api"
"github.com/janos/bee/pkg/debugapi"
"github.com/janos/bee/pkg/p2p/libp2p"
"github.com/janos/bee/pkg/pingpong"
)
type Bee struct {
p2pServer io.Closer
p2pCancel context.CancelFunc
apiServer *http.Server
debugAPIServer *http.Server
errorLogWriter *io.PipeWriter
}
type Options struct {
APIAddr string
DebugAPIAddr string
LibP2POptions libp2p.Options
Logger *logrus.Logger
}
func NewBee(o Options) (*Bee, error) {
logger := o.Logger
p2pCtx, p2pCancel := context.WithCancel(context.Background())
b := &Bee{
p2pCancel: p2pCancel,
errorLogWriter: logger.WriterLevel(logrus.ErrorLevel),
}
// Construct P2P service.
p2ps, err := libp2p.New(p2pCtx, o.LibP2POptions)
if err != nil {
return nil, fmt.Errorf("p2p service: %w", err)
}
b.p2pServer = p2ps
// Construct protocols.
pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps,
Logger: logger,
})
// Add protocols to the P2P service.
if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
return nil, fmt.Errorf("pingpong service: %w", err)
}
addrs, err := p2ps.Addresses()
if err != nil {
return nil, fmt.Errorf("get server addresses: %w", err)
}
for _, addr := range addrs {
logger.Infof("address: %s", addr)
}
var apiService api.Service
if o.APIAddr != "" {
// API server
apiService = api.New(api.Options{
P2P: p2ps,
Pingpong: pingPong,
})
apiListener, err := net.Listen("tcp", o.APIAddr)
if err != nil {
return nil, fmt.Errorf("api listener: %w", err)
}
apiServer := &http.Server{
Handler: apiService,
ErrorLog: log.New(b.errorLogWriter, "", 0),
}
go func() {
logger.Infof("api address: %s", apiListener.Addr())
if err := apiServer.Serve(apiListener); err != nil && err != http.ErrServerClosed {
logger.Errorf("api server: %v", err)
}
}()
b.apiServer = apiServer
}
if o.DebugAPIAddr != "" {
// Debug API server
debugAPIService := debugapi.New(debugapi.Options{})
// register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
if apiService != nil {
debugAPIService.MustRegisterMetrics(apiService.Metrics()...)
}
debugAPIListener, err := net.Listen("tcp", o.DebugAPIAddr)
if err != nil {
return nil, fmt.Errorf("debug api listener: %w", err)
}
debugAPIServer := &http.Server{
Handler: debugAPIService,
ErrorLog: log.New(b.errorLogWriter, "", 0),
}
go func() {
logger.Infof("debug api address: %s", debugAPIListener.Addr())
if err := debugAPIServer.Serve(debugAPIListener); err != nil && err != http.ErrServerClosed {
logger.Errorf("debug api server: %v", err)
}
}()
b.debugAPIServer = debugAPIServer
}
return b, nil
}
func (b *Bee) Shutdown(ctx context.Context) error {
var eg errgroup.Group
if b.apiServer != nil {
eg.Go(func() error {
if err := b.apiServer.Shutdown(ctx); err != nil {
return fmt.Errorf("api server: %w", err)
}
return nil
})
}
if b.debugAPIServer != nil {
eg.Go(func() error {
if err := b.debugAPIServer.Shutdown(ctx); err != nil {
return fmt.Errorf("debug api server: %w", err)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
b.p2pCancel()
if err := b.p2pServer.Close(); err != nil {
return fmt.Errorf("p2p server: %w", err)
}
return b.errorLogWriter.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment