Commit a51aaf38 authored by Svetomir Smiljkovic's avatar Svetomir Smiljkovic

Merge branch 'master' of https://github.com/janos/bee

parents 131090ce 638065c8
...@@ -15,26 +15,24 @@ Docker image `janos/bee`. ...@@ -15,26 +15,24 @@ Docker image `janos/bee`.
## Usage (experimental api) ## Usage (experimental api)
Execute the commands in two terminals to start `node 1` and `node 2`: Execute the command terminals to start `node 1`:
```sh ```sh
bee start --api-addr :8501 --p2p-addr :30401 bee start --api-addr :8501 --p2p-addr :30401
``` ```
Use one of the multiaddresses as bootnode for `node 2` in order to connect them:
```sh ```sh
bee start --api-addr :8502 --p2p-addr :30402 bee start --api-addr :8502 --p2p-addr :30402 --bootnode /ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
``` ```
Copy one of the multiaddresses from one running instance. Use the last part of `node 1` multiaddress to ping it using `node 2` by making an HTTP request to `localhost:{PORT2}/pingpong/{ID1}` like:
Make an HTTP request to `localhost:{PORT1}/pingpong/{MULTIADDRESS2}` like:
```sh ```sh
curl localhost:8501/pingpong/ip4/127.0.0.1/tcp/60304/p2p/Qmdao2FbfSK8ZcFxuUVmVDPUJifgRmbofNWH21WQESZm7x curl localhost:8502/pingpong/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
``` ```
Ping pong messages should be exchanged from `node 1` (listening on `PORT1`) to `node 2` (with multiaddress `MULTIADDRESS2`).
## Structure ## Structure
- cmd/bee - a simple application integrating p2p and pingpong service - cmd/bee - a simple application integrating p2p and pingpong service
......
...@@ -3,12 +3,18 @@ package cmd ...@@ -3,12 +3,18 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net" "net"
"net/http" "net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/janos/bee/pkg/api" "github.com/janos/bee/pkg/api"
"github.com/janos/bee/pkg/debugapi"
"github.com/janos/bee/pkg/p2p/libp2p" "github.com/janos/bee/pkg/p2p/libp2p"
"github.com/janos/bee/pkg/pingpong" "github.com/janos/bee/pkg/pingpong"
) )
...@@ -20,6 +26,8 @@ func (c *command) initStartCmd() (err error) { ...@@ -20,6 +26,8 @@ func (c *command) initStartCmd() (err error) {
optionNameP2PAddr = "p2p-addr" optionNameP2PAddr = "p2p-addr"
optionNameP2PDisableWS = "p2p-disable-ws" optionNameP2PDisableWS = "p2p-disable-ws"
optionNameP2PDisableQUIC = "p2p-disable-quic" optionNameP2PDisableQUIC = "p2p-disable-quic"
optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode"
) )
cmd := &cobra.Command{ cmd := &cobra.Command{
...@@ -40,6 +48,7 @@ func (c *command) initStartCmd() (err error) { ...@@ -40,6 +48,7 @@ func (c *command) initStartCmd() (err error) {
Addr: c.config.GetString(optionNameP2PAddr), Addr: c.config.GetString(optionNameP2PAddr),
DisableWS: c.config.GetBool(optionNameP2PDisableWS), DisableWS: c.config.GetBool(optionNameP2PDisableWS),
DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC), DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
// Routing: func(h host.Host) (r routing.PeerRouting, err error) { // Routing: func(h host.Host) (r routing.PeerRouting, err error) {
// idht, err = dht.New(ctx, h) // idht, err = dht.New(ctx, h)
// return idht, err // return idht, err
...@@ -66,26 +75,93 @@ func (c *command) initStartCmd() (err error) { ...@@ -66,26 +75,93 @@ func (c *command) initStartCmd() (err error) {
cmd.Println(addr) cmd.Println(addr)
} }
h := api.New(api.Options{ // API server
apiListener, err := net.Listen("tcp", c.config.GetString(optionNameAPIAddr))
if err != nil {
return fmt.Errorf("api listener: %w", err)
}
apiServer := &http.Server{Handler: api.New(api.Options{
P2P: p2ps, P2P: p2ps,
Pingpong: pingPong, Pingpong: pingPong,
}) })}
go func() {
cmd.Println("api address:", apiListener.Addr())
if err := apiServer.Serve(apiListener); err != nil && err != http.ErrServerClosed {
log.Println("api server:", err)
}
}()
l, err := net.Listen("tcp", c.config.GetString(optionNameAPIAddr)) // Debug API server
debugAPIListener, err := net.Listen("tcp", c.config.GetString(optionNameDebugAPIAddr))
if err != nil { if err != nil {
return fmt.Errorf("listen TCP: %w", err) return fmt.Errorf("debug api listener: %w", err)
} }
debugAPIServer := &http.Server{Handler: debugapi.New(debugapi.Options{})}
cmd.Println("http address:", l.Addr()) go func() {
cmd.Println("debug api address:", debugAPIListener.Addr())
return http.Serve(l, h) if err := debugAPIServer.Serve(debugAPIListener); err != nil && err != http.ErrServerClosed {
log.Println("debug api server:", err)
}
}()
// Wait for termination or interrupt signals.
// We want to clean up things at the end.
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGINT, syscall.SIGTERM)
// Block main goroutine until it is interrupted
sig := <-interruptChannel
log.Println("received signal:", sig)
// Shutdown
done := make(chan struct{})
go func() {
defer func() {
if err := recover(); err != nil {
log.Println("shutdown panic:", err)
}
}()
defer close(done)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err := apiServer.Shutdown(ctx); err != nil {
log.Println("api server shutdown:", err)
}
if err := debugAPIServer.Shutdown(ctx); err != nil {
log.Println("debug api server shutdown:", err)
}
if err := p2ps.Close(); err != nil {
log.Println("p2p server shutdown:", err)
}
}()
// If shutdown function is blocking too long,
// allow process termination by receiving another signal.
select {
case sig := <-interruptChannel:
log.Printf("received signal: %v\n", sig)
case <-done:
}
return nil
}, },
} }
cmd.Flags().String(optionNameAPIAddr, ":8500", "HTTP API listen address") cmd.Flags().String(optionNameAPIAddr, ":8500", "HTTP API listen address")
cmd.Flags().String(optionNameP2PAddr, ":30399", "P2P listen address") cmd.Flags().String(optionNameP2PAddr, ":30399", "P2P listen address")
cmd.Flags().Bool(optionNameP2PDisableWS, false, "Disable P2P WebSocket protocol") cmd.Flags().Bool(optionNameP2PDisableWS, false, "disable P2P WebSocket protocol")
cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "Disable P2P QUIC protocol") cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "disable P2P QUIC protocol")
cmd.Flags().StringSlice(optionNameBootnodes, nil, "initial nodes to connect to")
cmd.Flags().String(optionNameDebugAPIAddr, ":6060", "Debug HTTP API listen address")
if err := c.config.BindPFlags(cmd.Flags()); err != nil { if err := c.config.BindPFlags(cmd.Flags()); err != nil {
return err return err
......
...@@ -5,6 +5,7 @@ go 1.13 ...@@ -5,6 +5,7 @@ go 1.13
require ( require (
github.com/gogo/protobuf v1.3.1 github.com/gogo/protobuf v1.3.1
github.com/gorilla/handlers v1.4.2 github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.3
github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/ipfs/go-log v1.0.1 // indirect github.com/ipfs/go-log v1.0.1 // indirect
github.com/libp2p/go-libp2p v0.5.0 github.com/libp2p/go-libp2p v0.5.0
......
This diff is collapsed.
...@@ -3,34 +3,18 @@ package api ...@@ -3,34 +3,18 @@ package api
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strings"
"github.com/multiformats/go-multiaddr" "github.com/gorilla/mux"
) )
func (s *server) pingpongHandler(w http.ResponseWriter, r *http.Request) { func (s *server) pingpongHandler(w http.ResponseWriter, r *http.Request) {
target := strings.TrimPrefix(r.URL.Path, "/pingpong") peerID := mux.Vars(r)["peer-id"]
addr, err := multiaddr.NewMultiaddr(target)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintln(w, "invalid address", target, err)
return
}
ctx := r.Context() ctx := r.Context()
peerID, err := s.P2P.Connect(ctx, addr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintln(w, "connect error", addr, err)
return
}
rtt, err := s.Pingpong.Ping(ctx, peerID, "hey", "there", ",", "how are", "you", "?") rtt, err := s.Pingpong.Ping(ctx, peerID, "hey", "there", ",", "how are", "you", "?")
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintln(w, "ping error", addr, err) fmt.Fprintln(w, "ping error", peerID, err)
return return
} }
......
...@@ -5,17 +5,18 @@ import ( ...@@ -5,17 +5,18 @@ import (
"net/http" "net/http"
"github.com/gorilla/handlers" "github.com/gorilla/handlers"
"github.com/gorilla/mux"
"resenje.org/web" "resenje.org/web"
) )
func (s *server) setupRouting() { func (s *server) setupRouting() {
baseRouter := http.NewServeMux() baseRouter := mux.NewRouter()
baseRouter.HandleFunc("/robots.txt", func(w http.ResponseWriter, r *http.Request) { baseRouter.HandleFunc("/robots.txt", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "User-agent: *\nDisallow: /") fmt.Fprintln(w, "User-agent: *\nDisallow: /")
}) })
baseRouter.HandleFunc("/pingpong/", s.pingpongHandler) baseRouter.HandleFunc("/pingpong/{peer-id}", s.pingpongHandler)
s.Handler = web.ChainHandlers( s.Handler = web.ChainHandlers(
handlers.CompressHandler, handlers.CompressHandler,
......
package debugapi
import (
"net/http"
)
type server struct {
Options
http.Handler
}
type Options struct{}
func New(o Options) http.Handler {
s := &server{
Options: o,
}
s.setupRouting()
return s
}
package debugapi
import (
"expvar"
"net/http"
"net/http/pprof"
"github.com/gorilla/handlers"
"resenje.org/web"
)
func (s *server) setupRouting() {
internalBaseRouter := http.NewServeMux()
internalRouter := http.NewServeMux()
internalBaseRouter.Handle("/", web.ChainHandlers(
handlers.CompressHandler,
web.NoCacheHeadersHandler,
web.FinalHandler(internalRouter),
))
internalRouter.Handle("/", http.NotFoundHandler())
internalRouter.HandleFunc("/health", s.statusHandler)
internalRouter.HandleFunc("/readiness", s.statusHandler)
internalRouter.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
internalRouter.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
internalRouter.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
internalRouter.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
internalRouter.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
internalRouter.Handle("/debug/vars", expvar.Handler())
s.Handler = internalBaseRouter
}
package debugapi
import (
"fmt"
"net/http"
)
func (s *server) statusHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"status":"ok"}`)
}
...@@ -33,6 +33,7 @@ type Options struct { ...@@ -33,6 +33,7 @@ type Options struct {
Addr string Addr string
DisableWS bool DisableWS bool
DisableQUIC bool DisableQUIC bool
Bootnodes []string
// PrivKey []byte // PrivKey []byte
// Routing func(host.Host) (routing.PeerRouting, error) // Routing func(host.Host) (routing.PeerRouting, error)
} }
...@@ -128,7 +129,21 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -128,7 +129,21 @@ func New(ctx context.Context, o Options) (*Service, error) {
return nil, fmt.Errorf("autonat: %w", err) return nil, fmt.Errorf("autonat: %w", err)
} }
return &Service{host: h}, nil s := &Service{host: h}
// TODO: be more resilient on connection errors and connect in parallel
for _, a := range o.Bootnodes {
addr, err := ma.NewMultiaddr(a)
if err != nil {
return nil, fmt.Errorf("bootnode %s: %w", a, err)
}
if _, err := s.Connect(ctx, addr); err != nil {
return nil, fmt.Errorf("connect to bootnode %s: %w", a, err)
}
}
return s, nil
} }
func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
...@@ -192,3 +207,7 @@ func (s *Service) NewStream(ctx context.Context, peerID, protocolName, streamNam ...@@ -192,3 +207,7 @@ func (s *Service) NewStream(ctx context.Context, peerID, protocolName, streamNam
} }
return st, nil return st, nil
} }
func (s *Service) Close() error {
return s.host.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment