Commit 00a7f60d authored by Janos Guljas's avatar Janos Guljas

Initial commit

parents
name: Go
on: [push, pull_request]
jobs:
build:
name: Build
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.x
- name: Set up env
run: |
echo "::set-env name=GOPATH::$(go env GOPATH)"
echo "::add-path::$(go env GOPATH)/bin"
shell: bash
- name: Checkout
uses: actions/checkout@v1
with:
fetch-depth: 1
- name: Cache Go modules
uses: actions/cache@v1
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.OS }}-build-${{ env.cache-name }}-
${{ runner.OS }}-build-
${{ runner.OS }}-
- name: Set up GolangCI-Lint
run: go get -u github.com/golangci/golangci-lint/cmd/golangci-lint
working-directory: ~
- name: GolangCI-Lint
run: golangci-lint run
- name: Vet
run: go vet -v ./...
- name: Build
env:
CGO_ENABLED: 0
run: go build -ldflags "-s -w" ./...
- name: Test
run: go test -v -race ./...
# Swarm Bee
This is an experiment to abstract libp2p as underlay networking for Ethereum Swarm.
Work in progress. This is by no means the final abstraction.
## Usage
In one terminal:
```sh
go run ./cmd/bee
```
Copy one of the multiaddresses.
In another terminal
```sh
go run ./cmd/bee -target COPIED_ADDRESS
```
Ping pong messages should be exchanged.
## Structure
- cmd/bee - a simple application integrating p2p and pingpong service
- pkg/p2p - p2p abstraction
- pkg/p2p/libp2p - p2p implementation using libp2p
- pkg/p2p/protobuf - protobuf message encoding and decoding functions
- pkg/pingpong - p2p protocol implementation example
## TODO
- Mock testing for pingpong service as the example
- Identity with private keys
- Figure out routing (whether to use libp2p Routing or to abstract hive on top of p2p package)
- Listener configurations (ipv4, ipv6, dns, tcp, ws, quic)
package main
import (
"context"
"flag"
"fmt"
"log"
"github.com/janos/bee/pkg/p2p/libp2p"
"github.com/janos/bee/pkg/pingpong"
"github.com/multiformats/go-multiaddr"
)
var target = flag.String("target", "", "")
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//var idht *dht.IpfsDHT
s, err := libp2p.New(ctx, libp2p.Options{
// Routing: func(h host.Host) (r routing.PeerRouting, err error) {
// idht, err = dht.New(ctx, h)
// return idht, err
// },
})
if err != nil {
log.Fatal("p2p service: ", err)
}
pingPong, err := pingpong.New(s)
if err != nil {
log.Fatal("pingpong service: ", err)
}
addrs, err := s.Addresses()
if err != nil {
log.Fatal("get server addresses: ", err)
}
for _, addr := range addrs {
fmt.Println(addr)
}
if *target != "" {
for i := 1; i <= 10; i++ {
addr, err := multiaddr.NewMultiaddr(*target)
if err != nil {
log.Fatal("parse target address: ", err)
}
peerID, err := s.Connect(ctx, addr)
if err != nil {
log.Fatal("connect to target: ", err)
}
rtt, err := pingPong.Ping(ctx, peerID, "hey", "there", ",", "how are", "you", "?")
if err != nil {
log.Fatal("ping target: ", err)
}
fmt.Println("RTT", i, rtt)
}
}
select {}
}
module github.com/janos/bee
go 1.13
require (
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-log v1.0.1 // indirect
github.com/libp2p/go-conn-security v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.5.0
github.com/libp2p/go-libp2p-autonat-svc v0.1.0
github.com/libp2p/go-libp2p-connmgr v0.2.1
github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-host v0.1.0
github.com/libp2p/go-libp2p-interface-connmgr v0.1.0 // indirect
github.com/libp2p/go-libp2p-interface-pnet v0.1.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.5.0
github.com/libp2p/go-libp2p-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-net v0.1.0 // indirect
github.com/libp2p/go-libp2p-protocol v0.1.0
github.com/libp2p/go-libp2p-quic-transport v0.2.2
github.com/libp2p/go-libp2p-routing v0.1.0
github.com/libp2p/go-libp2p-secio v0.2.1
github.com/libp2p/go-libp2p-tls v0.1.2
github.com/libp2p/go-libp2p-transport v0.1.0 // indirect
github.com/libp2p/go-stream-muxer v0.1.0 // indirect
github.com/multiformats/go-multiaddr v0.2.0
github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible // indirect
github.com/whyrusleeping/go-smux-multistream v2.0.2+incompatible // indirect
github.com/whyrusleeping/go-smux-yamux v2.0.9+incompatible // indirect
github.com/whyrusleeping/yamux v1.2.0 // indirect
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099
)
This diff is collapsed.
package libp2p
import (
"context"
"fmt"
"time"
"github.com/janos/bee/pkg/p2p"
"github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
protocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
libp2ptls "github.com/libp2p/go-libp2p-tls"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
)
var _ p2p.Service = new(Service)
type Service struct {
host host.Host
}
type Options struct {
Port int
ListenIPv4 string
PrivKey []byte
Routing func(host.Host) (routing.PeerRouting, error)
}
func New(ctx context.Context, o Options) (*Service, error) {
ipV4Addr := o.ListenIPv4
if ipV4Addr == "" {
ipV4Addr = "0.0.0.0"
}
opts := []libp2p.Option{
// Use the keypair we generated
//libp2p.Identity(priv),
// Multiple listen addresses
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/%s/tcp/%v", ipV4Addr, o.Port), // regular tcp connections
fmt.Sprintf("/ip4/%s/udp/%v/quic", ipV4Addr, o.Port), // a UDP endpoint for the QUIC transport
),
// support TLS connections
libp2p.Security(libp2ptls.ID, libp2ptls.New),
// support secio connections
libp2p.Security(secio.ID, secio.New),
// support QUIC - experimental
libp2p.Transport(libp2pquic.NewTransport),
// support any other default transports (TCP)
libp2p.DefaultTransports,
// Let's prevent our peer from having too many
// connections by attaching a connection manager.
libp2p.ConnectionManager(connmgr.NewConnManager(
100, // Lowwater
400, // HighWater,
time.Minute, // GracePeriod
)),
// Attempt to open ports using uPNP for NATed hosts.
libp2p.NATPortMap(),
// Let this host use the DHT to find other hosts
//libp2p.Routing(o.Routing),
// Let this host use relays and advertise itself on relays if
// it finds it is behind NAT. Use libp2p.Relay(options...) to
// enable active relays and more.
//libp2p.EnableAutoRelay(),
}
h, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, err
}
// If you want to help other peers to figure out if they are behind
// NATs, you can launch the server-side of AutoNAT too (AutoRelay
// already runs the client)
if _, err = autonat.NewAutoNATService(ctx, h,
// Support same non default security and transport options as
// original host.
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(secio.ID, secio.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultTransports,
); err != nil {
return nil, fmt.Errorf("autonat: %w", err)
}
return &Service{host: h}, nil
}
func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
for _, ss := range p.StreamSpecs {
id := protocol.ID(p2p.NewSwarmStreamName(p.Name, ss.Name, ss.Version))
matcher, err := helpers.MultistreamSemverMatcher(id)
if err != nil {
return fmt.Errorf("match semver %s: %w", id, err)
}
s.host.SetStreamHandlerMatch(id, matcher, func(s network.Stream) {
ss.Handler(p2p.Peer{
Addr: s.Conn().RemoteMultiaddr(),
Stream: stream{s},
})
})
}
return nil
}
func (s *Service) Addresses() (addrs []string, err error) {
// Build host multiaddress
hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", s.host.ID().Pretty()))
if err != nil {
return nil, err
}
// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
for _, addr := range s.host.Addrs() {
addrs = append(addrs, addr.Encapsulate(hostAddr).String())
}
return addrs, nil
}
func (s *Service) Connect(ctx context.Context, addr multiaddr.Multiaddr) (peerID string, err error) {
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
return "", err
}
s.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
return info.ID.String(), nil
}
func (s *Service) NewStream(ctx context.Context, peerID, protocolName, streamName, version string) (p2p.Stream, error) {
id, err := peer.Decode(peerID)
if err != nil {
return nil, fmt.Errorf("decode peer id %q: %w", peerID, err)
}
swarmStreamName := p2p.NewSwarmStreamName(protocolName, streamName, version)
st, err := s.host.NewStream(ctx, id, protocol.ID(swarmStreamName))
if err != nil {
return nil, fmt.Errorf("create stream %q to %q: %w", swarmStreamName, peerID, err)
}
return stream{st}, nil
}
type stream struct {
network.Stream
}
func (s stream) FullClose() error {
return helpers.FullClose(s)
}
package p2p
import (
"context"
"io"
ma "github.com/multiformats/go-multiaddr"
)
type Service interface {
AddProtocol(ProtocolSpec) error
Connect(ctx context.Context, addr ma.Multiaddr) (peerID string, err error)
NewStream(ctx context.Context, peerID, protocol, stream, version string) (Stream, error)
}
type Stream interface {
io.ReadWriter
// Close closes the stream for writing. Reading will still work (that
// is, the remote side can still write).
io.Closer
// Reset closes both ends of the stream. Use this to tell the remote
// side to hang up and go away.
Reset() error
// Gracefully terminate stream on both ends.
FullClose() error
}
type Peer struct {
Addr ma.Multiaddr
Stream Stream
}
type ProtocolSpec struct {
Name string
StreamSpecs []StreamSpec
}
type StreamSpec struct {
Name string
Version string
Handler func(Peer)
}
func NewSwarmStreamName(protocol, stream, version string) string {
return "/swarm/" + protocol + "/" + stream + "/" + version
}
package protobuf
import (
ggio "github.com/gogo/protobuf/io"
"github.com/janos/bee/pkg/p2p"
)
const delimitedReaderMaxSize = 128 * 1024 // max message size
func NewRW(s p2p.Stream) (w ggio.Writer, r ggio.Reader) {
r = ggio.NewDelimitedReader(s, delimitedReaderMaxSize)
w = ggio.NewDelimitedWriter(s)
return w, r
}
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. pingpong.proto"
package pingpong
import (
"context"
"fmt"
"io"
"log"
"time"
"github.com/janos/bee/pkg/p2p"
"github.com/janos/bee/pkg/p2p/protobuf"
)
const (
protocolName = "pingpong"
streamName = "pingpong"
streamVersion = "1.0.0"
)
type Service struct {
p2p p2p.Service
}
func New(p2ps p2p.Service) (s *Service, err error) {
s = &Service{
p2p: p2ps,
}
if err := p2ps.AddProtocol(p2p.ProtocolSpec{
Name: protocolName,
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Version: streamVersion,
Handler: s.Handler,
},
},
}); err != nil {
return nil, err
}
return s, nil
}
func (s *Service) Handler(p p2p.Peer) {
w, r := protobuf.NewRW(p.Stream)
defer p.Stream.FullClose()
var ping Ping
for {
if err := r.ReadMsg(&ping); err != nil {
if err == io.EOF {
break
}
log.Printf("pingpong handler: read message: %v\n", err)
return
}
log.Printf("got ping: %q\n", ping.Greeting)
if err := w.WriteMsg(&Pong{
Response: ping.Greeting,
}); err != nil {
log.Printf("pingpong handler: write message: %v\n", err)
return
}
}
}
func (s *Service) Ping(ctx context.Context, peerID string, msgs ...string) (rtt time.Duration, err error) {
stream, err := s.p2p.NewStream(ctx, peerID, protocolName, streamName, streamVersion)
if err != nil {
return 0, fmt.Errorf("new stream: %w", err)
}
defer stream.FullClose()
w, r := protobuf.NewRW(stream)
var pong Pong
start := time.Now()
for _, msg := range msgs {
if err := w.WriteMsg(&Ping{
Greeting: msg,
}); err != nil {
return 0, fmt.Errorf("stream write: %w", err)
}
if err := r.ReadMsg(&pong); err != nil {
if err == io.EOF {
break
}
return 0, err
}
log.Printf("got pong: %q\n", pong.Response)
}
return time.Since(start) / time.Duration(len(msgs)), nil
}
This diff is collapsed.
syntax = "proto3";
package pingpong;
message Ping {
string Greeting = 1;
}
message Pong {
string Response = 1;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment