Commit 8459e39e authored by luxq's avatar luxq

Merge branch 'ori-master' into master

parents 24e7b114 11d393e2
......@@ -2,7 +2,7 @@
name: Bug report
about: Use this template to create a new bug report
title: General problem description
labels: 'bug'
labels: 'needs-triaging'
assignees: ''
---
......@@ -20,6 +20,9 @@ account of what happened and disclose any possible piece of information related
<!-- How did you expect the application to behave -->
#### Actual behavior
<!-- How did the application behave -->
<!-- Extras: If the problem involves a specific file, providing that file would be helpful; screenshots are welcome too -->
<!-- How did the application behave? -->
<!-- Please help us help you:
- if the problem involves a specific file/dir, providing it might be helpful
- if the issue is related to an API behavior - please provide the exact command (curl/postman etc) used to call the API.
- please try to always provide the node console output preferably in TRACE level
- screenshots are welcome -->
#!/bin/bash
nodes="bootnode-0 bee-0 bee-1 light-0 light-1"
for i in $nodes
do
mkdir -p dump/"$i"
curl -s -o dump/"$i"/addresses.json "$i"-debug.localhost/addresses
curl -s -o dump/"$i"/metrics "$i"-debug.localhost/metrics
curl -s -o dump/"$i"/topology.json "$i"-debug.localhost/topology
curl -s -o dump/"$i"/settlements.json "$i"-debug.localhost/settlements
curl -s -o dump/"$i"/balances.json "$i"-debug.localhost/balances
curl -s -o dump/"$i"/timesettlements.json "$i"-debug.localhost/timesettlements
curl -s -o dump/"$i"/stamps.json "$i"-debug.localhost/stamps
done
kubectl -n local get pods > dump/kubectl_get_pods
kubectl -n local logs -l app.kubernetes.io/part-of=bee --tail -1 --prefix -c bee > dump/kubectl_logs
vertag=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 15)
endpoint=$AWS_ENDPOINT
if [[ "$endpoint" != http* ]]
then
endpoint=https://$endpoint
fi
fname=artifacts_$vertag.tar.gz
tar -cz dump | aws --endpoint-url "$endpoint" s3 cp - s3://"$BUCKET_NAME"/"$fname"
aws --endpoint-url "$endpoint" s3api put-object-acl --bucket "$BUCKET_NAME" --acl public-read --key "$fname"
out="== Uploaded debugging artifacts to https://${BUCKET_NAME}.${AWS_ENDPOINT}/$fname =="
ln=${#out}
while [ "$ln" -gt 0 ]; do printf '=%.0s' '='; ((ln--));done;
echo ""
echo "$out"
ln=${#out}
while [ "$ln" -gt 0 ]; do printf '=%.0s' '='; ((ln--));done;
echo ""
26c26
< BucketDepth = uint8(16)
---
> BucketDepth = uint8(10)
> BucketDepth = uint8(2)
43c43
< var DefaultDepth = uint8(12) // 12 is the testnet depth at the time of merging to master
---
> var DefaultDepth = uint8(5) // 12 is the testnet depth at the time of merging to master
> var DefaultDepth = uint8(2) // 12 is the testnet depth at the time of merging to master
48c48
< var Capacity = exp2(22)
---
> var Capacity = exp2(6)
> var Capacity = exp2(4)
......@@ -113,6 +113,29 @@ jobs:
run: |
beekeeper delete bee-cluster --cluster-name local-clef
make beelocal ACTION=uninstall
- name: Apply patches
run: |
patch pkg/postage/batchstore/reserve.go .github/patches/postagereserve_gc.patch
- name: Prepare testing cluster (storage incentives setup)
run: |
timeout 10m make beelocal OPTS='ci skip-vet'
- name: Set kube config
run: |
mkdir -p ~/.kube
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- name: Set testing cluster (storage incentives setup)
run: |
timeout 10m make deploylocal BEEKEEPER_CLUSTER=local-gc
- name: Test pingpong
id: pingpong-3
run: until beekeeper check --cluster-name local-gc --checks ci-pingpong; do echo "waiting for pingpong..."; sleep .3; done
- name: Test gc
id: gc-chunk-1
run: beekeeper check --cluster-name local-gc --checks=ci-gc
- name: Destroy the cluster
run: |
beekeeper delete bee-cluster --cluster-name local-gc
make beelocal ACTION=uninstall
- name: Retag Docker image and push for cache
if: success()
run: |
......@@ -142,6 +165,11 @@ jobs:
- name: Debug workflow if failed
if: failure()
run: |
export BUCKET_NAME=beekeeper-artifacts
export AWS_ACCESS_KEY_ID=${{ secrets.DO_AWS_ACCESS_KEY_ID }}
export AWS_SECRET_ACCESS_KEY=${{ secrets.DO_AWS_SECRET_ACCESS_KEY }}
export AWS_EC2_METADATA_DISABLED=true
export AWS_ENDPOINT=fra1.digitaloceanspaces.com
export FAILED='no-test'
if ${{ steps.pingpong-1.outcome=='failure' }}; then FAILED=pingpong-1; fi
if ${{ steps.fullconnectivity-1.outcome=='failure' }}; then FAILED=fullconnectivity-1; fi
......@@ -154,8 +182,12 @@ jobs:
if ${{ steps.settlements-2.outcome=='failure' }}; then FAILED=settlements-2; fi
if ${{ steps.pss.outcome=='failure' }}; then FAILED=pss; fi
if ${{ steps.soc.outcome=='failure' }}; then FAILED=soc; fi
if ${{ steps.pingpong-3.outcome=='failure' }}; then FAILED=pingpong-3; fi
if ${{ steps.gc-chunk-1.outcome=='failure' }}; then FAILED=gc-chunk-1; fi
KEYS=$(curl -sSf -X POST https://eu.relay.tunshell.com/api/sessions)
curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** ${{ github.head_ref }}\nFailed -> \`${FAILED}\`\nDebug -> \`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }}
curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** Beekeeper Error\nBranch: ${{ github.head_ref }}\nStep failed:\n \`${FAILED}\`\nDebug shell:\n\`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }}
echo "run the debug shell and run the following to get debugging artifacts:"
echo "curl -s -o debug.sh https://gist.githubusercontent.com/acud/2c219531e832aafbab51feffe5b5e91f/raw/304880f1f8cc819e577d1dd3f1f45df8709c543d/beekeeper_artifacts.sh | bash"
echo "Failed test: ${FAILED}"
echo "Connect to github actions node using"
echo "sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com"
......
......@@ -276,7 +276,7 @@ dockers:
- image_templates:
- "ethersphere/bee:{{ .Version }}-amd64"
- "quay.io/ethersphere/bee:{{ .Version }}-amd64"
use_buildx: true
use: buildx
ids:
- linux
goarch: amd64
......@@ -291,7 +291,7 @@ dockers:
- image_templates:
- "ethersphere/bee:{{ .Version }}-armv7"
- "quay.io/ethersphere/bee:{{ .Version }}-armv7"
use_buildx: true
use: buildx
ids:
- linux
goarch: arm
......@@ -307,7 +307,7 @@ dockers:
- image_templates:
- "ethersphere/bee:{{ .Version }}-arm64v8"
- "quay.io/ethersphere/bee:{{ .Version }}-arm64v8"
use_buildx: true
use: buildx
ids:
- linux
goarch: arm64
......@@ -322,7 +322,7 @@ dockers:
- image_templates:
- "ethersphere/bee:{{ .Version }}-amd64-slim"
- "quay.io/ethersphere/bee:{{ .Version }}-amd64-slim"
use_buildx: true
use: buildx
ids:
- linux-slim
goarch: amd64
......@@ -337,7 +337,7 @@ dockers:
- image_templates:
- "ethersphere/bee:{{ .Version }}-armv7-slim"
- "quay.io/ethersphere/bee:{{ .Version }}-armv7-slim"
use_buildx: true
use: buildx
ids:
- linux-slim
goarch: arm
......@@ -353,7 +353,7 @@ dockers:
- image_templates:
- "ethersphere/bee:{{ .Version }}-arm64v8-slim"
- "quay.io/ethersphere/bee:{{ .Version }}-arm64v8-slim"
use_buildx: true
use: buildx
ids:
- linux-slim
goarch: arm64
......@@ -371,12 +371,14 @@ docker_manifests:
- ethersphere/bee:{{ .Version }}-amd64
- ethersphere/bee:{{ .Version }}-armv7
- ethersphere/bee:{{ .Version }}-arm64v8
skip_push: auto
- name_template: ethersphere/bee:{{ .Major }}.{{ .Minor }}
image_templates:
- ethersphere/bee:{{ .Version }}-amd64
- ethersphere/bee:{{ .Version }}-armv7
- ethersphere/bee:{{ .Version }}-arm64v8
- name_template: ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}
skip_push: auto
- name_template: ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}{{ with .Prerelease }}-{{ . }}{{ end }}
image_templates:
- ethersphere/bee:{{ .Version }}-amd64
- ethersphere/bee:{{ .Version }}-armv7
......@@ -391,17 +393,20 @@ docker_manifests:
- ethersphere/bee:{{ .Version }}-amd64
- ethersphere/bee:{{ .Version }}-armv7
- ethersphere/bee:{{ .Version }}-arm64v8
skip_push: auto
- name_template: quay.io/ethersphere/bee:{{ .Major }}
image_templates:
- quay.io/ethersphere/bee:{{ .Version }}-amd64
- quay.io/ethersphere/bee:{{ .Version }}-armv7
- quay.io/ethersphere/bee:{{ .Version }}-arm64v8
skip_push: auto
- name_template: quay.io/ethersphere/bee:{{ .Major }}.{{ .Minor }}
image_templates:
- quay.io/ethersphere/bee:{{ .Version }}-amd64
- quay.io/ethersphere/bee:{{ .Version }}-armv7
- quay.io/ethersphere/bee:{{ .Version }}-arm64v8
- name_template: quay.io/ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}
skip_push: auto
- name_template: quay.io/ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}{{ with .Prerelease }}-{{ . }}{{ end }}
image_templates:
- quay.io/ethersphere/bee:{{ .Version }}-amd64
- quay.io/ethersphere/bee:{{ .Version }}-armv7
......@@ -416,17 +421,20 @@ docker_manifests:
- quay.io/ethersphere/bee:{{ .Version }}-amd64
- quay.io/ethersphere/bee:{{ .Version }}-armv7
- quay.io/ethersphere/bee:{{ .Version }}-arm64v8
skip_push: auto
- name_template: ethersphere/bee:{{ .Major }}-slim
image_templates:
- ethersphere/bee:{{ .Version }}-amd64-slim
- ethersphere/bee:{{ .Version }}-armv7-slim
- ethersphere/bee:{{ .Version }}-arm64v8-slim
skip_push: auto
- name_template: ethersphere/bee:{{ .Major }}.{{ .Minor }}-slim
image_templates:
- ethersphere/bee:{{ .Version }}-amd64-slim
- ethersphere/bee:{{ .Version }}-armv7-slim
- ethersphere/bee:{{ .Version }}-arm64v8-slim
- name_template: ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}-slim
skip_push: auto
- name_template: ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}{{ with .Prerelease }}-{{ . }}{{ end }}-slim
image_templates:
- ethersphere/bee:{{ .Version }}-amd64-slim
- ethersphere/bee:{{ .Version }}-armv7-slim
......@@ -441,17 +449,20 @@ docker_manifests:
- ethersphere/bee:{{ .Version }}-amd64-slim
- ethersphere/bee:{{ .Version }}-armv7-slim
- ethersphere/bee:{{ .Version }}-arm64v8-slim
skip_push: auto
- name_template: quay.io/ethersphere/bee:{{ .Major }}-slim
image_templates:
- quay.io/ethersphere/bee:{{ .Version }}-amd64-slim
- quay.io/ethersphere/bee:{{ .Version }}-armv7-slim
- quay.io/ethersphere/bee:{{ .Version }}-arm64v8-slim
skip_push: auto
- name_template: quay.io/ethersphere/bee:{{ .Major }}.{{ .Minor }}-slim
image_templates:
- quay.io/ethersphere/bee:{{ .Version }}-amd64-slim
- quay.io/ethersphere/bee:{{ .Version }}-armv7-slim
- quay.io/ethersphere/bee:{{ .Version }}-arm64v8-slim
- name_template: quay.io/ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}-slim
skip_push: auto
- name_template: quay.io/ethersphere/bee:{{ .Major }}.{{ .Minor }}.{{ .Patch }}{{ with .Prerelease }}-{{ . }}{{ end }}-slim
image_templates:
- quay.io/ethersphere/bee:{{ .Version }}-amd64-slim
- quay.io/ethersphere/bee:{{ .Version }}-armv7-slim
......@@ -466,3 +477,4 @@ docker_manifests:
- quay.io/ethersphere/bee:{{ .Version }}-amd64-slim
- quay.io/ethersphere/bee:{{ .Version }}-armv7-slim
- quay.io/ethersphere/bee:{{ .Version }}-arm64v8-slim
skip_push: auto
......@@ -71,6 +71,7 @@ const (
optionWarmUpTime = "warmup-time"
optionNameMainNet = "mainnet"
optionNameRetrievalCaching = "cache-retrieval"
optionNameDevReserveCapacity = "dev-reserve-capacity"
)
func init() {
......@@ -118,6 +119,10 @@ func newCommand(opts ...option) (c *command, err error) {
return nil, err
}
if err := c.initStartDevCmd(); err != nil {
return nil, err
}
if err := c.initInitCmd(); err != nil {
return nil, err
}
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package cmd
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/ethersphere/bee/pkg/node"
"github.com/kardianos/service"
"github.com/spf13/cobra"
)
func (c *command) initStartDevCmd() (err error) {
cmd := &cobra.Command{
Use: "dev",
Short: "Start a Swarm node in development mode",
RunE: func(cmd *cobra.Command, args []string) (err error) {
if len(args) > 0 {
return cmd.Help()
}
v := strings.ToLower(c.config.GetString(optionNameVerbosity))
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %v", err)
}
isWindowsService, err := isWindowsService()
if err != nil {
return fmt.Errorf("failed to determine if we are running in service: %w", err)
}
if isWindowsService {
var err error
logger, err = createWindowsEventLogger(serviceName, logger)
if err != nil {
return fmt.Errorf("failed to create windows logger %w", err)
}
}
beeASCII := `
( * ) (
)\ ) ( * ( /( )\ )
(()/( ( ( ( )\))( )\())(()/( (
/(_)) )\ )\ )\ ((_)()\ ((_)\ /(_)) )\
(_))_ ((_) ((_)((_) (_()((_) ((_)(_))_ ((_)
| \ | __|\ \ / / | \/ | / _ \ | \ | __|
| |) || _| \ V / | |\/| || (_) || |) || _|
|___/ |___| \_/ |_| |_| \___/ |___/ |___|
`
fmt.Println(beeASCII)
fmt.Println()
fmt.Println("Starting in development mode")
fmt.Println()
debugAPIAddr := c.config.GetString(optionNameDebugAPIAddr)
if !c.config.GetBool(optionNameDebugAPIEnable) {
debugAPIAddr = ""
}
// generate signer in here
b, err := node.NewDevBee(logger, &node.DevOptions{
APIAddr: c.config.GetString(optionNameAPIAddr),
DebugAPIAddr: debugAPIAddr,
Logger: logger,
DBOpenFilesLimit: c.config.GetUint64(optionNameDBOpenFilesLimit),
DBBlockCacheCapacity: c.config.GetUint64(optionNameDBBlockCacheCapacity),
DBWriteBufferSize: c.config.GetUint64(optionNameDBWriteBufferSize),
DBDisableSeeksCompaction: c.config.GetBool(optionNameDBDisableSeeksCompaction),
CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins),
ReserveCapacity: c.config.GetUint64(optionNameDevReserveCapacity),
})
if err != nil {
return err
}
// Wait for termination or interrupt signals.
// We want to clean up things at the end.
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGINT, syscall.SIGTERM)
p := &program{
start: func() {
// Block main goroutine until it is interrupted
sig := <-interruptChannel
logger.Debugf("received signal: %v", sig)
logger.Info("shutting down")
},
stop: func() {
// Shutdown
done := make(chan struct{})
go func() {
defer close(done)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := b.Shutdown(ctx); err != nil {
logger.Errorf("shutdown: %v", err)
}
}()
// If shutdown function is blocking too long,
// allow process termination by receiving another signal.
select {
case sig := <-interruptChannel:
logger.Debugf("received signal: %v", sig)
case <-done:
}
},
}
if isWindowsService {
s, err := service.New(p, &service.Config{
Name: serviceName,
DisplayName: "Bee",
Description: "Bee, Swarm client.",
})
if err != nil {
return err
}
if err = s.Run(); err != nil {
return err
}
} else {
// start blocks until some interrupt is received
p.start()
p.stop()
}
return nil
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return c.config.BindPFlags(cmd.Flags())
},
}
cmd.Flags().Bool(optionNameDebugAPIEnable, true, "enable debug HTTP API")
cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address")
cmd.Flags().String(optionNameDebugAPIAddr, ":1635", "debug HTTP API listen address")
cmd.Flags().String(optionNameVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace")
cmd.Flags().Uint64(optionNameDevReserveCapacity, 4194304, "cache reserve capacity")
cmd.Flags().StringSlice(optionCORSAllowedOrigins, []string{}, "origins with CORS headers enabled")
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks")
c.root.AddCommand(cmd)
return nil
}
......@@ -5,15 +5,13 @@ go 1.15
require (
github.com/btcsuite/btcd v0.22.0-beta
github.com/coreos/go-semver v0.3.0
github.com/ethereum/go-ethereum v1.9.23
github.com/ethereum/go-ethereum v1.10.3
github.com/ethersphere/go-price-oracle-abi v0.1.0
github.com/ethersphere/go-storage-incentives-abi v0.3.0
github.com/ethersphere/go-sw3-abi v0.4.0
github.com/ethersphere/langos v1.0.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.5
github.com/google/uuid v1.1.4 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
......@@ -25,13 +23,13 @@ require (
github.com/kardianos/service v1.2.0
github.com/klauspost/cpuid/v2 v2.0.8 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-libp2p v0.14.3
github.com/libp2p/go-libp2p-autonat v0.4.2
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-discovery v0.5.1 // indirect
github.com/libp2p/go-libp2p-peerstore v0.2.7
github.com/libp2p/go-libp2p-quic-transport v0.10.0
github.com/libp2p/go-libp2p-swarm v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.4.2
github.com/libp2p/go-tcp-transport v0.2.3
github.com/libp2p/go-ws-transport v0.4.0
......@@ -54,11 +52,11 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.0
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/uber/jaeger-client-go v2.24.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
github.com/vmihailenco/msgpack/v5 v5.3.4
github.com/wealdtech/go-ens/v3 v3.4.4
github.com/wealdtech/go-ens/v3 v3.4.6
gitlab.com/nolash/go-mockbytes v0.0.7
go.uber.org/atomic v1.8.0
go.uber.org/multierr v1.7.0 // indirect
......@@ -68,7 +66,7 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/ini.v1 v1.57.0 // indirect
gopkg.in/yaml.v2 v2.4.0
......
This diff is collapsed.
openapi: 3.0.3
info:
version: 1.0.0
title: Swarm API
version: 1.1.1
title: Bee API
description: "A list of the currently provided Interfaces to interact with the swarm, implementing file operations and sending messages"
security:
......@@ -164,6 +164,22 @@ paths:
default:
description: Default response
"/chunks/stream":
get:
summary: "Upload stream of chunks"
tags:
- Chunk
parameters:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
responses:
"200":
description: "Returns a Websocket connection on which stream of chunks can be uploaded. Each chunk sent is acknowledged using a binary response `0` which serves as confirmation of upload of single chunk. Chunks should be packaged as binary messages for uploading."
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response
"/bzz":
post:
summary: "Upload file or a collection of files"
......@@ -232,7 +248,8 @@ paths:
"/bzz/{reference}":
patch:
summary: "Reupload a root hash to the network"
summary: "Reupload a root hash to the network; deprecated: use /stewardship/{reference} instead"
deprecated: true
tags:
- BZZ
parameters:
......@@ -849,3 +866,22 @@ paths:
default:
description: Default response
"/stewardship/{reference}":
put:
summary: "Reupload a root hash to the network"
tags:
- Stewardship
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: "Root hash of content (can be of any type: collection, file, chunk)"
responses:
"200":
description: Ok
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
......@@ -286,6 +286,15 @@ components:
items:
$ref: "#/components/schemas/PostageBatch"
DebugPostageBatchesResponse:
type: object
properties:
stamps:
type: array
nullable: true
items:
$ref: "#/components/schemas/DebugPostageBatch"
BatchIDResponse:
type: object
properties:
......@@ -320,6 +329,7 @@ components:
utilization:
type: integer
usable:
description: Indicate that the batch was discovered by the Bee node, but it awaits enough on-chain confirmations before declaring the batch as usable.
type: boolean
label:
type: string
......@@ -333,8 +343,18 @@ components:
type: integer
immutableFlag:
type: boolean
DebugPostageBatch:
allOf:
- $ref: '#/components/schemas/PostageBatch'
- type: object
properties:
exists:
description: Internal debugging property. It indicates if the batch is expired.
type: boolean
batchTTL:
description: The time (in seconds) remaining until the batch expires; -1 signals that the batch never expires; 0 signals that the batch has already expired.
type: integer
StampBucketData:
type: object
......@@ -663,6 +683,12 @@ components:
application/problem+json:
schema:
$ref: "#/components/schemas/ProblemDetails"
"429":
description: Too many requests
content:
application/problem+json:
schema:
$ref: "#/components/schemas/ProblemDetails"
"500":
description: Internal Server Error
content:
......
openapi: 3.0.3
info:
version: 1.0.0
version: 1.1.1
title: Bee Debug API
description: "A list of the currently provided debug interfaces to interact with the bee node"
......@@ -438,7 +438,6 @@ paths:
default:
description: Default response
"/topology":
get:
description: Get topology of known network
......@@ -538,6 +537,8 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/TransactionResponse"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
"429":
$ref: "SwarmCommon.yaml#/components/responses/429"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
......@@ -668,7 +669,7 @@ paths:
default:
description: Default response
"/transaction":
"/transactions":
get:
summary: Get list of pending transactions
tags:
......@@ -685,7 +686,7 @@ paths:
default:
description: Default response
"/transaction/{txHash}":
"/transactions/{txHash}":
get:
summary: Get information about a sent transaction
parameters:
......@@ -771,7 +772,7 @@ paths:
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PostageBatchesResponse"
$ref: "SwarmCommon.yaml#/components/schemas/DebugPostageBatchesResponse"
default:
description: Default response
......@@ -794,7 +795,7 @@ paths:
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PostageBatch"
$ref: "SwarmCommon.yaml#/components/schemas/DebugPostageBatch"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
......@@ -864,6 +865,8 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/BatchIDResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"429":
$ref: "SwarmCommon.yaml#/components/responses/429"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
......
......@@ -2,7 +2,7 @@ version: "3"
services:
clef-1:
image: ethersphere/clef:0.4.12
image: ethersphere/clef:0.6.0
restart: unless-stopped
environment:
- CLEF_CHAINID
......@@ -11,7 +11,7 @@ services:
command: full
bee-1:
image: ethersphere/bee:beta
image: ethersphere/bee:stable
restart: unless-stopped
environment:
- BEE_API_ADDR
......
......@@ -2,7 +2,7 @@
### CLEF
## chain id to use for signing (1=mainnet, 3=ropsten, 4=rinkeby, 5=goerli) (default: 12345)
## chain id to use for signing (100=mainnet(xdai), 5=testnet(goerli)) (default: 12345)
CLEF_CHAINID=5
### BEE
......
......@@ -71,6 +71,7 @@ type testServerOptions struct {
PostageContract postagecontract.Interface
Post postage.Service
Steward steward.Reuploader
WsHeaders http.Header
}
func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) {
......@@ -115,7 +116,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
if o.WsPath != "" {
u := url.URL{Scheme: "ws", Host: ts.Listener.Addr().String(), Path: o.WsPath}
conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
conn, _, err = websocket.DefaultDialer.Dial(u.String(), o.WsHeaders)
if err != nil {
t.Fatalf("dial: %v. url %v", err, u.String())
}
......
......@@ -177,7 +177,7 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request, store
return
}
logger.Debugf("Uploading file Encrypt: %v Filename: %s Filehash: %s FileMtdt: %v",
logger.Debugf("bzz upload file: encrypt %v filename: %s hash: %s metadata: %v",
encrypt, fileName, fr.String(), fileMtdt)
storeSizeFn := []manifest.StoreSizeFunc{}
......@@ -207,7 +207,7 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request, store
}
return
}
logger.Debugf("Manifest Reference: %s", manifestReference.String())
logger.Debugf("bzz upload file: manifest reference: %s", manifestReference.String())
if created {
_, err = tag.DoneSplit(manifestReference)
......@@ -522,6 +522,7 @@ func (s *server) manifestFeed(
return s.feedFactory.NewLookup(*t, f)
}
// bzzPatchHandler endpoint has been deprecated; use stewardship endpoint instead.
func (s *server) bzzPatchHandler(w http.ResponseWriter, r *http.Request) {
nameOrHex := mux.Vars(r)["address"]
address, err := s.resolveNameOrAddress(nameOrHex)
......
......@@ -625,12 +625,3 @@ func TestBzzReupload(t *testing.T) {
t.Fatalf("got address %s want %s", m.addr.String(), addr.String())
}
}
type mockSteward struct {
addr swarm.Address
}
func (m *mockSteward) Reupload(_ context.Context, addr swarm.Address) error {
m.addr = addr
return nil
}
......@@ -6,6 +6,7 @@ package api
import (
"bytes"
"context"
"errors"
"fmt"
"io"
......@@ -29,27 +30,55 @@ type chunkAddressResponse struct {
Reference swarm.Address `json:"reference"`
}
func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
var (
tag *tags.Tag
ctx = r.Context()
err error
)
func (s *server) processUploadRequest(
r *http.Request,
) (ctx context.Context, tag *tags.Tag, putter storage.Putter, err error) {
if h := r.Header.Get(SwarmTagHeader); h != "" {
tag, err = s.getTag(h)
if err != nil {
s.logger.Debugf("chunk upload: get tag: %v", err)
s.logger.Error("chunk upload: get tag")
jsonhttp.BadRequest(w, "cannot get tag")
return
return nil, nil, nil, errors.New("cannot get tag")
}
// add the tag to the context if it exists
ctx = sctx.SetTag(r.Context(), tag)
} else {
ctx = r.Context()
}
batch, err := requestPostageBatchId(r)
if err != nil {
s.logger.Debugf("chunk upload: postage batch id: %v", err)
s.logger.Error("chunk upload: postage batch id")
return nil, nil, nil, errors.New("invalid postage batch id")
}
// increment the StateSplit here since we dont have a splitter for the file upload
putter, err = newStamperPutter(s.storer, s.post, s.signer, batch)
if err != nil {
s.logger.Debugf("chunk upload: putter: %v", err)
s.logger.Error("chunk upload: putter")
switch {
case errors.Is(err, postage.ErrNotFound):
return nil, nil, nil, errors.New("batch not found")
case errors.Is(err, postage.ErrNotUsable):
return nil, nil, nil, errors.New("batch not usable")
}
return nil, nil, nil, err
}
return ctx, tag, putter, nil
}
func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
ctx, tag, putter, err := s.processUploadRequest(r)
if err != nil {
jsonhttp.BadRequest(w, err.Error())
return
}
if tag != nil {
err = tag.Inc(tags.StateSplit)
if err != nil {
s.logger.Debugf("chunk upload: increment tag: %v", err)
......@@ -85,29 +114,6 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
batch, err := requestPostageBatchId(r)
if err != nil {
s.logger.Debugf("chunk upload: postage batch id: %v", err)
s.logger.Error("chunk upload: postage batch id")
jsonhttp.BadRequest(w, "invalid postage batch id")
return
}
putter, err := newStamperPutter(s.storer, s.post, s.signer, batch)
if err != nil {
s.logger.Debugf("chunk upload: putter:%v", err)
s.logger.Error("chunk upload: putter")
switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, nil)
}
return
}
seen, err := putter.Put(ctx, requestModePut(r), chunk)
if err != nil {
s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address())
......@@ -145,6 +151,11 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil {
s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err)
s.logger.Error("chunk upload: creation of pin failed")
err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address())
if err != nil {
s.logger.Debugf("chunk upload: deletion of pin for %s failed: %v", chunk.Address(), err)
s.logger.Error("chunk upload: deletion of pin failed")
}
jsonhttp.InternalServerError(w, nil)
return
}
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"context"
"errors"
"net/http"
"strings"
"time"
"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/websocket"
)
var successWsMsg = []byte{}
func (s *server) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Request) {
ctx, tag, putter, err := s.processUploadRequest(r)
if err != nil {
jsonhttp.BadRequest(w, err.Error())
return
}
upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
CheckOrigin: s.checkOrigin,
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.logger.Debugf("chunk stream handler failed upgrading: %v", err)
s.logger.Error("chunk stream handler: upgrading")
jsonhttp.BadRequest(w, "not a websocket connection")
return
}
s.wsWg.Add(1)
go s.handleUploadStream(
ctx,
c,
tag,
putter,
requestModePut(r),
strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true",
)
}
func (s *server) handleUploadStream(
ctx context.Context,
conn *websocket.Conn,
tag *tags.Tag,
putter storage.Putter,
mode storage.ModePut,
pin bool,
) {
defer s.wsWg.Done()
var (
gone = make(chan struct{})
err error
)
defer func() {
_ = conn.Close()
}()
conn.SetCloseHandler(func(code int, text string) error {
s.logger.Debugf("chunk stream handler: client gone. code %d message %s", code, text)
close(gone)
return nil
})
sendMsg := func(msgType int, buf []byte) error {
err := conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
return err
}
err = conn.WriteMessage(msgType, buf)
if err != nil {
return err
}
return nil
}
sendErrorClose := func(code int, errmsg string) {
err := conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(code, errmsg),
time.Now().Add(writeDeadline),
)
if err != nil {
s.logger.Errorf("chunk stream handler: failed sending close msg")
}
}
for {
select {
case <-s.quit:
// shutdown
sendErrorClose(websocket.CloseGoingAway, "node shutting down")
return
case <-gone:
// client gone
return
default:
// if there is no indication to stop, go ahead and read the next message
}
err = conn.SetReadDeadline(time.Now().Add(readDeadline))
if err != nil {
s.logger.Debugf("chunk stream handler: set read deadline: %v", err)
s.logger.Error("chunk stream handler: set read deadline")
return
}
mt, msg, err := conn.ReadMessage()
if err != nil {
s.logger.Debugf("chunk stream handler: read message error: %v", err)
s.logger.Error("chunk stream handler: read message error")
return
}
if mt != websocket.BinaryMessage {
s.logger.Debug("chunk stream handler: unexpected message received from client", mt)
s.logger.Error("chunk stream handler: unexpected message received from client")
sendErrorClose(websocket.CloseUnsupportedData, "invalid message")
return
}
if tag != nil {
err = tag.Inc(tags.StateSplit)
if err != nil {
s.logger.Debug("chunk stream handler: failed incrementing tag", err)
s.logger.Error("chunk stream handler: failed incrementing tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
}
if len(msg) < swarm.SpanSize {
s.logger.Debug("chunk stream handler: not enough data")
s.logger.Error("chunk stream handler: not enough data")
return
}
chunk, err := cac.NewWithDataSpan(msg)
if err != nil {
s.logger.Debugf("chunk stream handler: create chunk error: %v", err)
s.logger.Error("chunk stream handler: failed creating chunk")
return
}
seen, err := putter.Put(ctx, mode, chunk)
if err != nil {
s.logger.Debugf("chunk stream handler: chunk write error: %v, addr %s", err, chunk.Address())
s.logger.Error("chunk stream handler: chunk write error")
switch {
case errors.Is(err, postage.ErrBucketFull):
sendErrorClose(websocket.CloseInternalServerErr, "batch is overissued")
default:
sendErrorClose(websocket.CloseInternalServerErr, "chunk write error")
}
return
} else if len(seen) > 0 && seen[0] && tag != nil {
err := tag.Inc(tags.StateSeen)
if err != nil {
s.logger.Debugf("chunk stream handler: increment tag", err)
s.logger.Error("chunk stream handler: increment tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
}
if tag != nil {
// indicate that the chunk is stored
err = tag.Inc(tags.StateStored)
if err != nil {
s.logger.Debugf("chunk stream handler: increment tag", err)
s.logger.Error("chunk stream handler: increment tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
}
if pin {
if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil {
s.logger.Debugf("chunk stream handler: creation of pin for %q failed: %v", chunk.Address(), err)
s.logger.Error("chunk stream handler: creation of pin failed")
// since we already increment the pin counter because of the ModePut, we need
// to delete the pin here to prevent the pin counter from never going to 0
err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address())
if err != nil {
s.logger.Debugf("chunk stream handler: deletion of pin for %s failed: %v", chunk.Address(), err)
s.logger.Error("chunk stream handler: deletion of pin failed")
}
sendErrorClose(websocket.CloseInternalServerErr, "failed creating pin")
return
}
}
err = sendMsg(websocket.BinaryMessage, successWsMsg)
if err != nil {
s.logger.Debugf("chunk stream handler: failed sending success msg: %v", err)
s.logger.Error("chunk stream handler: failed sending confirmation")
return
}
}
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"context"
"io/ioutil"
"net/http"
"testing"
"time"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/logging"
pinning "github.com/ethersphere/bee/pkg/pinning/mock"
mockpost "github.com/ethersphere/bee/pkg/postage/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/websocket"
)
func TestChunkUploadStream(t *testing.T) {
wsHeaders := http.Header{}
wsHeaders.Set("Content-Type", "application/octet-stream")
wsHeaders.Set("Swarm-Postage-Batch-Id", batchOkStr)
var (
statestoreMock = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(statestoreMock, logger)
storerMock = mock.NewStorer()
pinningMock = pinning.NewServiceMock()
_, wsConn, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
Pinning: pinningMock,
Tags: tag,
Post: mockpost.New(mockpost.WithAcceptAll()),
WsPath: "/chunks/stream",
WsHeaders: wsHeaders,
})
)
t.Run("upload and verify", func(t *testing.T) {
chsToGet := []swarm.Chunk{}
for i := 0; i < 5; i++ {
ch := testingc.GenerateTestRandomChunk()
err := wsConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = wsConn.WriteMessage(websocket.BinaryMessage, ch.Data())
if err != nil {
t.Fatal(err)
}
err = wsConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
mt, msg, err := wsConn.ReadMessage()
if err != nil {
t.Fatal(err)
}
if mt != websocket.BinaryMessage || !bytes.Equal(msg, api.SuccessWsMsg) {
t.Fatal("invalid response", mt, string(msg))
}
chsToGet = append(chsToGet, ch)
}
for _, c := range chsToGet {
ch, err := storerMock.Get(context.Background(), storage.ModeGetRequest, c.Address())
if err != nil {
t.Fatal("failed to get chunk after upload", err)
}
if !ch.Equal(c) {
t.Fatal("invalid chunk read")
}
}
})
t.Run("close on incorrect msg", func(t *testing.T) {
err := wsConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = wsConn.WriteMessage(websocket.TextMessage, []byte("incorrect msg"))
if err != nil {
t.Fatal(err)
}
err = wsConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
_, _, err = wsConn.ReadMessage()
if err == nil {
t.Fatal("expected failure on read")
}
if cerr, ok := err.(*websocket.CloseError); !ok {
t.Fatal("invalid error on read")
} else if cerr.Text != "invalid message" {
t.Fatalf("incorrect response on error, exp: (invalid message) got (%s)", cerr.Text)
}
})
}
......@@ -42,6 +42,8 @@ var (
FeedMetadataEntryOwner = feedMetadataEntryOwner
FeedMetadataEntryTopic = feedMetadataEntryTopic
FeedMetadataEntryType = feedMetadataEntryType
SuccessWsMsg = successWsMsg
)
func (s *Server) ResolveNameOrAddress(str string) (swarm.Address, error) {
......
......@@ -23,8 +23,9 @@ import (
"github.com/gorilla/websocket"
)
var (
const (
writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close
readDeadline = 4 * time.Second // read deadline. should be smaller than the shutdown timeout on api close
targetMaxLength = 2 // max target length in bytes, in order to prevent grieving by excess computation
)
......
......@@ -63,6 +63,11 @@ func (s *server) setupRouting() {
),
})
handle("/chunks/stream", web.ChainHandlers(
s.newTracingHandler("chunks-stream-upload"),
web.FinalHandlerFunc(s.chunkUploadStreamHandler),
))
handle("/chunks/{addr}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.chunkGetHandler),
})
......@@ -177,6 +182,13 @@ func (s *server) setupRouting() {
})),
)
handle("/stewardship/{address}", jsonhttp.MethodHandler{
"PUT": web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandlerFunc(s.stewardshipPutHandler),
),
})
s.Handler = web.ChainHandlers(
httpaccess.NewHTTPAccessLogHandler(s.logger, logrus.InfoLevel, s.tracer, "api access"),
handlers.CompressHandler,
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/gorilla/mux"
)
// stewardshipPutHandler re-uploads root hash and all of its underlying
// associated chunks to the network.
func (s *server) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) {
nameOrHex := mux.Vars(r)["address"]
address, err := s.resolveNameOrAddress(nameOrHex)
if err != nil {
s.logger.Debugf("stewardship put: parse address %s: %v", nameOrHex, err)
s.logger.Error("stewardship put: parse address")
jsonhttp.NotFound(w, nil)
return
}
err = s.steward.Reupload(r.Context(), address)
if err != nil {
s.logger.Debugf("stewardship put: re-upload %s: %v", address, err)
s.logger.Error("stewardship put: re-upload")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, nil)
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"context"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
smock "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
func TestStewardshipReUpload(t *testing.T) {
var (
logger = logging.New(ioutil.Discard, 0)
mockStatestore = statestore.NewStateStore()
m = &mockSteward{}
storer = smock.NewStorer()
addr = swarm.NewAddress([]byte{31: 128})
)
client, _, _ := newTestServer(t, testServerOptions{
Storer: storer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
Steward: m,
})
jsonhttptest.Request(t, client, http.MethodPut, "/v1/stewardship/"+addr.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
if !m.addr.Equal(addr) {
t.Fatalf("\nhave address: %q\nwant address: %q", m.addr.String(), addr.String())
}
}
type mockSteward struct {
addr swarm.Address
}
func (m *mockSteward) Reupload(_ context.Context, addr swarm.Address) error {
m.addr = addr
return nil
}
......@@ -9,9 +9,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strconv"
"testing"
"time"
"github.com/ethersphere/bee/pkg/logging"
mockpost "github.com/ethersphere/bee/pkg/postage/mock"
......@@ -25,6 +27,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/websocket"
"gitlab.com/nolash/go-mockbytes"
)
......@@ -35,6 +38,7 @@ type fileUploadResponse struct {
func tagsWithIdResource(id uint32) string { return fmt.Sprintf("/tags/%d", id) }
func TestTags(t *testing.T) {
var (
bzzResource = "/bzz"
bytesResource = "/bytes"
......@@ -44,7 +48,7 @@ func TestTags(t *testing.T) {
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
client, _, _ = newTestServer(t, testServerOptions{
client, _, listenAddr = newTestServer(t, testServerOptions{
Storer: mock.NewStorer(),
Tags: tag,
Logger: logger,
......@@ -123,6 +127,56 @@ func TestTags(t *testing.T) {
tagValueTest(t, tr.Uid, 1, 1, 1, 0, 0, 0, swarm.ZeroAddress, client)
})
t.Run("create tag upload chunk stream", func(t *testing.T) {
// create a tag using the API
tr := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tr),
)
wsHeaders := http.Header{}
wsHeaders.Set("Content-Type", "application/octet-stream")
wsHeaders.Set(api.SwarmPostageBatchIdHeader, batchOkStr)
wsHeaders.Set(api.SwarmTagHeader, strconv.FormatUint(uint64(tr.Uid), 10))
u := url.URL{Scheme: "ws", Host: listenAddr, Path: "/chunks/stream"}
wsConn, _, err := websocket.DefaultDialer.Dial(u.String(), wsHeaders)
if err != nil {
t.Fatalf("dial: %v. url %v", err, u.String())
}
for i := 0; i < 5; i++ {
ch := testingc.GenerateTestRandomChunk()
err := wsConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = wsConn.WriteMessage(websocket.BinaryMessage, ch.Data())
if err != nil {
t.Fatal(err)
}
err = wsConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
mt, msg, err := wsConn.ReadMessage()
if err != nil {
t.Fatal(err)
}
if mt != websocket.BinaryMessage || !bytes.Equal(msg, api.SuccessWsMsg) {
t.Fatal("invalid response", mt, string(msg))
}
}
tagValueTest(t, tr.Uid, 5, 5, 0, 0, 0, 0, swarm.ZeroAddress, client)
})
t.Run("list tags", func(t *testing.T) {
// list all current tags
var resp api.ListTagsResponse
......
......@@ -235,6 +235,14 @@ func (s *Service) swapCashoutHandler(w http.ResponseWriter, r *http.Request) {
ctx = sctx.SetGasLimit(ctx, l)
}
if !s.cashOutChequeSem.TryAcquire(1) {
s.logger.Debug("debug api: simultaneous on-chain operations not supported")
s.logger.Error("debug api: simultaneous on-chain operations not supported")
jsonhttp.TooManyRequests(w, "simultaneous on-chain operations not supported")
return
}
defer s.cashOutChequeSem.Release(1)
txHash, err := s.swap.CashCheque(ctx, peer)
if err != nil {
s.logger.Debugf("debug api: cashout peer: cannot cash %s: %v", addr, err)
......
......@@ -9,10 +9,12 @@ package debugapi
import (
"crypto/ecdsa"
"math/big"
"net/http"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
......@@ -30,6 +32,7 @@ import (
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/transaction"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
)
// Service implements http.Handler interface to be used in HTTP server.
......@@ -57,16 +60,22 @@ type Service struct {
corsAllowedOrigins []string
metricsRegistry *prometheus.Registry
lightNodes *lightnode.Container
blockTime *big.Int
// handler is changed in the Configure method
handler http.Handler
handlerMu sync.RWMutex
// The following are semaphores which exists to limit concurrent access
// to some parts of the resources in order to avoid undefined behaviour.
postageCreateSem *semaphore.Weighted
cashOutChequeSem *semaphore.Weighted
}
// New creates a new Debug API Service with only basic routers enabled in order
// to expose /addresses, /health endpoints, Go metrics and pprof. It is useful to expose
// these endpoints before all dependencies are configured and injected to have
// access to basic debugging tools and /health endpoint.
func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address, logger logging.Logger, tracer *tracing.Tracer, corsAllowedOrigins []string, transaction transaction.Service) *Service {
func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address, logger logging.Logger, tracer *tracing.Tracer, corsAllowedOrigins []string, blockTime *big.Int, transaction transaction.Service) *Service {
s := new(Service)
s.publicKey = publicKey
s.pssPublicKey = pssPublicKey
......@@ -74,8 +83,11 @@ func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address
s.logger = logger
s.tracer = tracer
s.corsAllowedOrigins = corsAllowedOrigins
s.blockTime = blockTime
s.metricsRegistry = newMetricsRegistry()
s.transaction = transaction
s.postageCreateSem = semaphore.NewWeighted(1)
s.cashOutChequeSem = semaphore.NewWeighted(1)
s.setRouter(s.newBasicRouter())
......
......@@ -9,6 +9,7 @@ import (
"crypto/rand"
"encoding/hex"
"io/ioutil"
"math/big"
"net/http"
"net/http/httptest"
"net/url"
......@@ -86,7 +87,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
swapserv := swapmock.New(o.SwapOpts...)
transaction := transactionmock.New(o.TransactionOpts...)
ln := lightnode.NewContainer(o.Overlay)
s := debugapi.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins, transaction)
s := debugapi.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins, big.NewInt(2), transaction)
s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, o.BatchStore, o.Post, o.PostageContract)
ts := httptest.NewServer(s)
t.Cleanup(ts.Close)
......@@ -154,7 +155,7 @@ func TestServer_Configure(t *testing.T) {
swapserv := swapmock.New(o.SwapOpts...)
ln := lightnode.NewContainer(o.Overlay)
transaction := transactionmock.New(o.TransactionOpts...)
s := debugapi.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, nil, transaction)
s := debugapi.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, nil, big.NewInt(2), transaction)
ts := httptest.NewServer(s)
t.Cleanup(ts.Close)
......
......@@ -14,8 +14,10 @@ import (
"github.com/ethersphere/bee/pkg/bigint"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/postagecontract"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
"github.com/gorilla/mux"
)
......@@ -66,6 +68,14 @@ func (s *Service) postageCreateHandler(w http.ResponseWriter, r *http.Request) {
immutable, _ = strconv.ParseBool(val[0])
}
if !s.postageCreateSem.TryAcquire(1) {
s.logger.Debug("create batch: simultaneous on-chain operations not supported")
s.logger.Error("create batch: simultaneous on-chain operations not supported")
jsonhttp.TooManyRequests(w, "simultaneous on-chain operations not supported")
return
}
defer s.postageCreateSem.Release(1)
batchID, err := s.postageContract.CreateBatch(ctx, amount, uint8(depth), immutable, label)
if err != nil {
if errors.Is(err, postagecontract.ErrInsufficientFunds) {
......@@ -102,6 +112,7 @@ type postageStampResponse struct {
BlockNumber uint64 `json:"blockNumber"`
ImmutableFlag bool `json:"immutableFlag"`
Exists bool `json:"exists"`
BatchTTL int64 `json:"batchTTL"`
}
type postageStampsResponse struct {
......@@ -123,17 +134,24 @@ type bucketData struct {
func (s *Service) postageGetStampsHandler(w http.ResponseWriter, _ *http.Request) {
resp := postageStampsResponse{}
for _, v := range s.post.StampIssuers() {
exists, err := s.post.BatchExists(v.ID())
exists, err := s.batchStore.Exists(v.ID())
if err != nil {
s.logger.Errorf("get stamp issuer: check batch: %v", err)
s.logger.Debugf("get stamp issuer: check batch: %v", err)
s.logger.Error("get stamp issuer: check batch")
jsonhttp.InternalServerError(w, "unable to check batch")
return
}
batchTTL, err := s.estimateBatchTTL(v.ID())
if err != nil {
s.logger.Debugf("get stamp issuer: estimate batch expiration: %v", err)
s.logger.Error("get stamp issuer: estimate batch expiration")
jsonhttp.InternalServerError(w, "unable to estimate batch expiration")
return
}
resp.Stamps = append(resp.Stamps, postageStampResponse{
BatchID: v.ID(),
Utilization: v.Utilization(),
Usable: s.post.IssuerUsable(v),
Usable: exists && s.post.IssuerUsable(v),
Label: v.Label(),
Depth: v.Depth(),
Amount: bigint.Wrap(v.Amount()),
......@@ -141,6 +159,7 @@ func (s *Service) postageGetStampsHandler(w http.ResponseWriter, _ *http.Request
BlockNumber: v.BlockNumber(),
ImmutableFlag: v.ImmutableFlag(),
Exists: exists,
BatchTTL: batchTTL,
})
}
jsonhttp.OK(w, resp)
......@@ -155,7 +174,7 @@ func (s *Service) postageGetStampBucketsHandler(w http.ResponseWriter, r *http.R
}
id, err := hex.DecodeString(idStr)
if err != nil {
s.logger.Error("get stamp issuer: invalid batchID: %v", err)
s.logger.Debugf("get stamp issuer: invalid batchID: %v", err)
s.logger.Error("get stamp issuer: invalid batchID")
jsonhttp.BadRequest(w, "invalid batchID")
return
......@@ -163,7 +182,7 @@ func (s *Service) postageGetStampBucketsHandler(w http.ResponseWriter, r *http.R
issuer, err := s.post.GetStampIssuer(id)
if err != nil {
s.logger.Error("get stamp issuer: get issuer: %v", err)
s.logger.Debugf("get stamp issuer: get issuer: %v", err)
s.logger.Error("get stamp issuer: get issuer")
jsonhttp.BadRequest(w, "cannot get batch")
return
......@@ -193,38 +212,52 @@ func (s *Service) postageGetStampHandler(w http.ResponseWriter, r *http.Request)
}
id, err := hex.DecodeString(idStr)
if err != nil {
s.logger.Errorf("get stamp issuer: invalid batchID: %v", err)
s.logger.Debugf("get stamp issuer: invalid batchID: %v", err)
s.logger.Error("get stamp issuer: invalid batchID")
jsonhttp.BadRequest(w, "invalid batchID")
return
}
issuer, err := s.post.GetStampIssuer(id)
if err != nil {
s.logger.Errorf("get stamp issuer: get issuer: %v", err)
if err != nil && !errors.Is(err, postage.ErrNotUsable) {
s.logger.Debugf("get stamp issuer: get issuer: %v", err)
s.logger.Error("get stamp issuer: get issuer")
jsonhttp.BadRequest(w, "cannot get issuer")
return
}
exists, err := s.post.BatchExists(id)
exists, err := s.batchStore.Exists(id)
if err != nil {
s.logger.Errorf("get stamp issuer: check batch: %v", err)
s.logger.Debugf("get stamp issuer: check batch: %v", err)
s.logger.Error("get stamp issuer: check batch")
jsonhttp.InternalServerError(w, "unable to check batch")
return
}
batchTTL, err := s.estimateBatchTTL(id)
if err != nil {
s.logger.Debugf("get stamp issuer: estimate batch expiration: %v", err)
s.logger.Error("get stamp issuer: estimate batch expiration")
jsonhttp.InternalServerError(w, "unable to estimate batch expiration")
return
}
resp := postageStampResponse{
BatchID: id,
Utilization: issuer.Utilization(),
Usable: s.post.IssuerUsable(issuer),
Label: issuer.Label(),
Depth: issuer.Depth(),
Amount: bigint.Wrap(issuer.Amount()),
BucketDepth: issuer.BucketDepth(),
BlockNumber: issuer.BlockNumber(),
ImmutableFlag: issuer.ImmutableFlag(),
Exists: exists,
BatchTTL: batchTTL,
}
if issuer != nil {
resp.Utilization = issuer.Utilization()
resp.Usable = exists && s.post.IssuerUsable(issuer)
resp.Label = issuer.Label()
resp.Depth = issuer.Depth()
resp.Amount = bigint.Wrap(issuer.Amount())
resp.BucketDepth = issuer.BucketDepth()
resp.BlockNumber = issuer.BlockNumber()
resp.ImmutableFlag = issuer.ImmutableFlag()
}
jsonhttp.OK(w, &resp)
}
......@@ -247,6 +280,7 @@ func (s *Service) reserveStateHandler(w http.ResponseWriter, _ *http.Request) {
jsonhttp.OK(w, reserveStateResponse{
Radius: state.Radius,
StorageRadius: state.StorageRadius,
Available: state.Available,
Outer: bigint.Wrap(state.Outer),
Inner: bigint.Wrap(state.Inner),
......@@ -263,3 +297,27 @@ func (s *Service) chainStateHandler(w http.ResponseWriter, _ *http.Request) {
CurrentPrice: bigint.Wrap(state.CurrentPrice),
})
}
// estimateBatchTTL estimates the time remaining until the batch expires.
// The -1 signals that the batch never expires.
func (s *Service) estimateBatchTTL(id []byte) (int64, error) {
state := s.batchStore.GetChainState()
batch, err := s.batchStore.Get(id)
switch {
case errors.Is(err, storage.ErrNotFound), len(state.CurrentPrice.Bits()) == 0:
return -1, nil
case err != nil:
return 0, err
}
var (
normalizedBalance = batch.Value
cumulativePayout = state.TotalAmount
pricePerBlock = state.CurrentPrice
)
ttl := new(big.Int).Sub(normalizedBalance, cumulativePayout)
ttl = ttl.Mul(ttl, s.blockTime)
ttl = ttl.Div(ttl, pricePerBlock)
return ttl.Int64(), nil
}
......@@ -22,6 +22,7 @@ import (
mockpost "github.com/ethersphere/bee/pkg/postage/mock"
"github.com/ethersphere/bee/pkg/postage/postagecontract"
contractMock "github.com/ethersphere/bee/pkg/postage/postagecontract/mock"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/sctx"
)
......@@ -194,15 +195,19 @@ func TestPostageCreateStamp(t *testing.T) {
}
func TestPostageGetStamps(t *testing.T) {
si := postage.NewStampIssuer("", "", batchOk, big.NewInt(3), 11, 10, 1000, true)
b := postagetesting.MustNewBatch()
b.Value = big.NewInt(20)
si := postage.NewStampIssuer("", "", b.ID, big.NewInt(3), 11, 10, 1000, true)
mp := mockpost.New(mockpost.WithIssuer(si))
ts := newTestServer(t, testServerOptions{Post: mp})
cs := &postage.ChainState{Block: 10, TotalAmount: big.NewInt(5), CurrentPrice: big.NewInt(2)}
bs := mock.New(mock.WithChainState(cs), mock.WithBatch(b))
ts := newTestServer(t, testServerOptions{Post: mp, BatchStore: bs})
jsonhttptest.Request(t, ts.Client, http.MethodGet, "/stamps", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(&debugapi.PostageStampsResponse{
Stamps: []debugapi.PostageStampResponse{
{
BatchID: batchOk,
BatchID: b.ID,
Utilization: si.Utilization(),
Usable: true,
Label: si.Label(),
......@@ -212,6 +217,7 @@ func TestPostageGetStamps(t *testing.T) {
BlockNumber: si.BlockNumber(),
ImmutableFlag: si.ImmutableFlag(),
Exists: true,
BatchTTL: 15, // ((value-totalAmount)/pricePerBlock)*blockTime=((20-5)/2)*2.
},
},
}),
......@@ -219,14 +225,18 @@ func TestPostageGetStamps(t *testing.T) {
}
func TestPostageGetStamp(t *testing.T) {
si := postage.NewStampIssuer("", "", batchOk, big.NewInt(3), 11, 10, 1000, true)
b := postagetesting.MustNewBatch()
b.Value = big.NewInt(20)
si := postage.NewStampIssuer("", "", b.ID, big.NewInt(3), 11, 10, 1000, true)
mp := mockpost.New(mockpost.WithIssuer(si))
ts := newTestServer(t, testServerOptions{Post: mp})
cs := &postage.ChainState{Block: 10, TotalAmount: big.NewInt(5), CurrentPrice: big.NewInt(2)}
bs := mock.New(mock.WithChainState(cs), mock.WithBatch(b))
ts := newTestServer(t, testServerOptions{Post: mp, BatchStore: bs})
t.Run("ok", func(t *testing.T) {
jsonhttptest.Request(t, ts.Client, http.MethodGet, "/stamps/"+batchOkStr, http.StatusOK,
jsonhttptest.Request(t, ts.Client, http.MethodGet, "/stamps/"+hex.EncodeToString(b.ID), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(&debugapi.PostageStampResponse{
BatchID: batchOk,
BatchID: b.ID,
Utilization: si.Utilization(),
Usable: true,
Label: si.Label(),
......@@ -236,6 +246,7 @@ func TestPostageGetStamp(t *testing.T) {
BlockNumber: si.BlockNumber(),
ImmutableFlag: si.ImmutableFlag(),
Exists: true,
BatchTTL: 15, // ((value-totalAmount)/pricePerBlock)*blockTime=((20-5)/2)*2.
}),
)
})
......
......@@ -12,12 +12,14 @@ package hive
import (
"context"
"encoding/hex"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"sync"
"time"
"golang.org/x/sync/semaphore"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/hive/pb"
......@@ -276,10 +278,9 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
}
// check if the underlay is usable by doing a raw ping using libp2p
_, err = s.streamer.Ping(ctx, multiUnderlay)
if err != nil {
if _, err = s.streamer.Ping(ctx, multiUnderlay); err != nil {
s.metrics.UnreachablePeers.Inc()
s.logger.Warningf("hive: multi address underlay %s not reachable err: %w", multiUnderlay, err)
s.logger.Debugf("hive: peer %s: underlay %s not reachable", hex.EncodeToString(newPeer.Overlay), multiUnderlay)
return
}
......
......@@ -54,7 +54,7 @@ var (
// values needed to adjust subscription trigger
// buffer time.
flipFlopBufferDuration = 150 * time.Millisecond
flipFlopWorstCaseDuration = 20 * time.Second
flipFlopWorstCaseDuration = 10 * time.Second
)
// DB is the local store implementation and holds
......
......@@ -68,16 +68,8 @@ func (db *DB) get(mode storage.ModeGet, addr swarm.Address) (out shed.Item, err
case storage.ModeGetRequest:
db.updateGCItems(out)
case storage.ModeGetPin:
pinnedItem, err := db.pinIndex.Get(item)
if err != nil {
return out, err
}
return pinnedItem, nil
// no updates to indexes
case storage.ModeGetSync:
case storage.ModeGetLookup:
case storage.ModeGetSync, storage.ModeGetLookup:
default:
return out, ErrInvalidMode
}
......
......@@ -75,15 +75,8 @@ func (db *DB) getMulti(mode storage.ModeGet, addrs ...swarm.Address) (out []shed
case storage.ModeGetRequest:
db.updateGCItems(out...)
case storage.ModeGetPin:
err := db.pinIndex.Fill(out)
if err != nil {
return nil, err
}
// no updates to indexes
case storage.ModeGetSync:
case storage.ModeGetLookup:
case storage.ModeGetSync, storage.ModeGetLookup:
default:
return out, ErrInvalidMode
}
......
......@@ -34,7 +34,6 @@ func TestModeGetMulti(t *testing.T) {
storage.ModeGetRequest,
storage.ModeGetSync,
storage.ModeGetLookup,
storage.ModeGetPin,
} {
t.Run(mode.String(), func(t *testing.T) {
db := newTestDB(t, nil)
......@@ -46,17 +45,6 @@ func TestModeGetMulti(t *testing.T) {
t.Fatal(err)
}
if mode == storage.ModeGetPin {
// pin chunks so that it is not returned as not found by pinIndex
for i, ch := range chunks {
err := db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
chunks[i] = ch
}
}
addrs := chunkAddresses(chunks)
got, err := db.GetMulti(context.Background(), mode, addrs...)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mantaray
func (n *Node) NodeType() uint8 {
return n.nodeType
}
......@@ -57,11 +57,13 @@ const (
var (
version01HashBytes []byte
version02HashBytes []byte
zero32 []byte
)
func init() {
initVersion(version01HashString, &version01HashBytes)
initVersion(version02HashString, &version02HashBytes)
zero32 = make([]byte, 32)
}
func initVersion(hash string, bytes *[]byte) {
......@@ -274,8 +276,8 @@ func (n *Node) UnmarshalBinary(data []byte) error {
// the root nodeType information is lost on Unmarshal. This causes issues when we want to
// perform a path 'Walk' on the root. If there is more than 1 fork, the root node type
// is an edge, so we will deduce this information from index byte array
if !bytes.Equal(data[offset:offset+32], make([]byte, 32)) {
n.nodeType = nodeTypeEdge
if !bytes.Equal(data[offset:offset+32], zero32) && !n.IsEdgeType() {
n.makeEdge()
}
n.forks = make(map[byte]*fork)
bb := &bitsForBytes{}
......
......@@ -20,27 +20,33 @@ const testMarshalOutput01 = "52fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bb
const testMarshalOutput02 = "52fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64905954fb18659339d0b25e0fb9723d3cd5d528fb3c8d495fd157bd7b7a210496952fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64952fdfc072102654f163f5f0fa0621d729566c74d10037c4d7bbb0407d1e2c64940fcd3072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64952fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64952e3872548ec012a6e123b60f9177017fb12e57732621d2c1ada267adbe8cc4350f89d6640e3044f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64952fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64850ff9f642182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64952fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64b50fc98072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64952fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64a50ff99622182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64952fdfc072182654f163f5f0f9a621d729566c74d10037c4d7bbb0407d1e2c64d"
var testEntries = []nodeEntry{
var testEntries = []NodeEntry{
{
path: []byte("/"),
metadata: map[string]string{
Path: []byte("/"),
Metadata: map[string]string{
"index-document": "aaaaa",
},
},
{
path: []byte("aaaaa"),
Path: []byte("aaaaa"),
},
{
path: []byte("cc"),
Path: []byte("cc"),
},
{
path: []byte("d"),
Path: []byte("d"),
},
{
path: []byte("ee"),
Path: []byte("ee"),
},
}
type NodeEntry struct {
Path []byte
Entry []byte
Metadata map[string]string
}
func init() {
obfuscationKeyFn = mrand.Read
}
......@@ -98,7 +104,7 @@ func TestUnmarshal01(t *testing.T) {
t.Fatalf("expected %d forks, got %d", len(testEntries), len(n.forks))
}
for _, entry := range testEntries {
prefix := entry.path
prefix := entry.Path
f := n.forks[prefix[0]]
if f == nil {
t.Fatalf("expected to have fork on byte %x", prefix[:1])
......@@ -130,7 +136,7 @@ func TestUnmarshal02(t *testing.T) {
t.Fatalf("expected %d forks, got %d", len(testEntries), len(n.forks))
}
for _, entry := range testEntries {
prefix := entry.path
prefix := entry.Path
f := n.forks[prefix[0]]
if f == nil {
t.Fatalf("expected to have fork on byte %x", prefix[:1])
......@@ -138,9 +144,9 @@ func TestUnmarshal02(t *testing.T) {
if !bytes.Equal(f.prefix, prefix) {
t.Fatalf("expected prefix for byte %x to match %s, got %s", prefix[:1], prefix, f.prefix)
}
if len(entry.metadata) > 0 {
if !reflect.DeepEqual(entry.metadata, f.metadata) {
t.Fatalf("expected metadata for byte %x to match %s, got %s", prefix[:1], entry.metadata, f.metadata)
if len(entry.Metadata) > 0 {
if !reflect.DeepEqual(entry.Metadata, f.metadata) {
t.Fatalf("expected metadata for byte %x to match %s, got %s", prefix[:1], entry.Metadata, f.metadata)
}
}
}
......@@ -159,12 +165,12 @@ func TestMarshal(t *testing.T) {
return b
}
for i := 0; i < len(testEntries); i++ {
c := testEntries[i].path
e := testEntries[i].entry
c := testEntries[i].Path
e := testEntries[i].Entry
if len(e) == 0 {
e = append(make([]byte, 32-len(c)), c...)
}
m := testEntries[i].metadata
m := testEntries[i].Metadata
err := n.Add(ctx, c, e, m, nil)
if err != nil {
t.Fatalf("expected no error, got %v", err)
......
......@@ -181,6 +181,9 @@ func (n *Node) Lookup(ctx context.Context, path []byte, l Loader) ([]byte, error
if err != nil {
return nil, err
}
if !node.IsValueType() && len(path) > 0 {
return nil, notFound(path)
}
return node.entry, nil
}
......@@ -205,6 +208,7 @@ func (n *Node) Add(ctx context.Context, path, entry []byte, metadata map[string]
if len(path) == 0 {
n.entry = entry
n.makeValue()
if len(metadata) > 0 {
n.metadata = metadata
n.makeWithMetadata()
......
......@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mantaray
package mantaray_test
import (
"bytes"
......@@ -10,17 +10,13 @@ import (
"errors"
"strconv"
"testing"
)
type nodeEntry struct {
path []byte
entry []byte
metadata map[string]string
}
"github.com/ethersphere/bee/pkg/manifest/mantaray"
)
func TestNilPath(t *testing.T) {
ctx := context.Background()
n := New()
n := mantaray.New()
_, err := n.Lookup(ctx, nil, nil)
if err != nil {
t.Fatalf("expected no error, got %v", err)
......@@ -29,7 +25,7 @@ func TestNilPath(t *testing.T) {
func TestAddAndLookup(t *testing.T) {
ctx := context.Background()
n := New()
n := mantaray.New()
testCases := [][]byte{
[]byte("aaaaaa"),
[]byte("aaaaab"),
......@@ -91,6 +87,14 @@ func TestAddAndLookupNode(t *testing.T) {
[]byte("robots.txt"),
},
},
{
// mantaray.nodePrefixMaxSize number of '.'
name: "nested-value-node-is-recognized",
toAdd: [][]byte{
[]byte("..............................@"),
[]byte(".............................."),
},
},
{
name: "nested-prefix-is-not-collapsed",
toAdd: [][]byte{
......@@ -127,7 +131,7 @@ func TestAddAndLookupNode(t *testing.T) {
} {
ctx := context.Background()
t.Run(tc.name, func(t *testing.T) {
n := New()
n := mantaray.New()
for i := 0; i < len(tc.toAdd); i++ {
c := tc.toAdd[i]
......@@ -142,13 +146,47 @@ func TestAddAndLookupNode(t *testing.T) {
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if !node.IsValueType() {
t.Fatalf("expected value type, got %v", strconv.FormatInt(int64(node.NodeType()), 2))
}
de := append(make([]byte, 32-len(d)), d...)
if !bytes.Equal(node.entry, de) {
t.Fatalf("expected value %x, got %x", d, node.entry)
if !bytes.Equal(node.Entry(), de) {
t.Fatalf("expected value %x, got %x", d, node.Entry())
}
}
}
})
t.Run(tc.name+"/with load save", func(t *testing.T) {
n := mantaray.New()
for i := 0; i < len(tc.toAdd); i++ {
c := tc.toAdd[i]
e := append(make([]byte, 32-len(c)), c...)
err := n.Add(ctx, c, e, nil, nil)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
}
ls := newMockLoadSaver()
err := n.Save(ctx, ls)
if err != nil {
t.Fatal(err)
}
n2 := mantaray.NewNodeRef(n.Reference())
for j := 0; j < len(tc.toAdd); j++ {
d := tc.toAdd[j]
node, err := n2.LookupNode(ctx, d, ls)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if !node.IsValueType() {
t.Fatalf("expected value type, got %v", strconv.FormatInt(int64(node.nodeType), 2))
t.Fatalf("expected value type, got %v", strconv.FormatInt(int64(node.NodeType()), 2))
}
de := append(make([]byte, 32-len(d)), d...)
if !bytes.Equal(node.Entry(), de) {
t.Fatalf("expected value %x, got %x", d, node.Entry())
}
}
})
......@@ -158,29 +196,29 @@ func TestAddAndLookupNode(t *testing.T) {
func TestRemove(t *testing.T) {
for _, tc := range []struct {
name string
toAdd []nodeEntry
toAdd []mantaray.NodeEntry
toRemove [][]byte
}{
{
name: "simple",
toAdd: []nodeEntry{
toAdd: []mantaray.NodeEntry{
{
path: []byte("/"),
metadata: map[string]string{
Path: []byte("/"),
Metadata: map[string]string{
"index-document": "index.html",
},
},
{
path: []byte("index.html"),
Path: []byte("index.html"),
},
{
path: []byte("img/1.png"),
Path: []byte("img/1.png"),
},
{
path: []byte("img/2.png"),
Path: []byte("img/2.png"),
},
{
path: []byte("robots.txt"),
Path: []byte("robots.txt"),
},
},
toRemove: [][]byte{
......@@ -189,21 +227,21 @@ func TestRemove(t *testing.T) {
},
{
name: "nested-prefix-is-not-collapsed",
toAdd: []nodeEntry{
toAdd: []mantaray.NodeEntry{
{
path: []byte("index.html"),
Path: []byte("index.html"),
},
{
path: []byte("img/1.png"),
Path: []byte("img/1.png"),
},
{
path: []byte("img/2/test1.png"),
Path: []byte("img/2/test1.png"),
},
{
path: []byte("img/2/test2.png"),
Path: []byte("img/2/test2.png"),
},
{
path: []byte("robots.txt"),
Path: []byte("robots.txt"),
},
},
toRemove: [][]byte{
......@@ -213,21 +251,21 @@ func TestRemove(t *testing.T) {
} {
ctx := context.Background()
t.Run(tc.name, func(t *testing.T) {
n := New()
n := mantaray.New()
for i := 0; i < len(tc.toAdd); i++ {
c := tc.toAdd[i].path
e := tc.toAdd[i].entry
c := tc.toAdd[i].Path
e := tc.toAdd[i].Entry
if len(e) == 0 {
e = append(make([]byte, 32-len(c)), c...)
}
m := tc.toAdd[i].metadata
m := tc.toAdd[i].Metadata
err := n.Add(ctx, c, e, m, nil)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
for j := 0; j < i; j++ {
d := tc.toAdd[j].path
d := tc.toAdd[j].Path
m, err := n.Lookup(ctx, d, nil)
if err != nil {
t.Fatalf("expected no error, got %v", err)
......@@ -246,7 +284,7 @@ func TestRemove(t *testing.T) {
t.Fatalf("expected no error, got %v", err)
}
_, err = n.Lookup(ctx, c, nil)
if !errors.Is(err, ErrNotFound) {
if !errors.Is(err, mantaray.ErrNotFound) {
t.Fatalf("expected not found error, got %v", err)
}
}
......@@ -298,7 +336,7 @@ func TestHasPrefix(t *testing.T) {
} {
ctx := context.Background()
t.Run(tc.name, func(t *testing.T) {
n := New()
n := mantaray.New()
for i := 0; i < len(tc.toAdd); i++ {
c := tc.toAdd[i]
......
......@@ -2,13 +2,15 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mantaray
package mantaray_test
import (
"bytes"
"context"
"fmt"
"testing"
"github.com/ethersphere/bee/pkg/manifest/mantaray"
)
func TestWalkNode(t *testing.T) {
......@@ -37,37 +39,45 @@ func TestWalkNode(t *testing.T) {
},
} {
ctx := context.Background()
t.Run(tc.name, func(t *testing.T) {
n := New()
for i := 0; i < len(tc.toAdd); i++ {
c := tc.toAdd[i]
createTree := func(t *testing.T, toAdd [][]byte) *mantaray.Node {
n := mantaray.New()
for i := 0; i < len(toAdd); i++ {
c := toAdd[i]
e := append(make([]byte, 32-len(c)), c...)
err := n.Add(ctx, c, e, nil, nil)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
}
return n
}
walkedCount := 0
walker := func(path []byte, node *Node, err error) error {
walkedCount++
pathExists := func(found []byte, expected [][]byte) bool {
pathFound := false
for i := 0; i < len(tc.expected); i++ {
c := tc.expected[i]
if bytes.Equal(path, c) {
if bytes.Equal(found, c) {
pathFound = true
break
}
}
return pathFound
}
t.Run(tc.name, func(t *testing.T) {
n := createTree(t, tc.toAdd)
if !pathFound {
walkedCount := 0
walker := func(path []byte, node *mantaray.Node, err error) error {
walkedCount++
if !pathExists(path, tc.expected) {
return fmt.Errorf("walkFn returned unknown path: %s", path)
}
return nil
}
// Expect no errors.
......@@ -79,7 +89,40 @@ func TestWalkNode(t *testing.T) {
if len(tc.expected) != walkedCount {
t.Errorf("expected %d nodes, got %d", len(tc.expected), walkedCount)
}
})
t.Run(tc.name+"/with load save", func(t *testing.T) {
n := createTree(t, tc.toAdd)
ls := newMockLoadSaver()
err := n.Save(ctx, ls)
if err != nil {
t.Fatal(err)
}
n2 := mantaray.NewNodeRef(n.Reference())
walkedCount := 0
walker := func(path []byte, node *mantaray.Node, err error) error {
walkedCount++
if !pathExists(path, tc.expected) {
return fmt.Errorf("walkFn returned unknown path: %s", path)
}
return nil
}
// Expect no errors.
err = n2.WalkNode(ctx, []byte{}, ls, walker)
if err != nil {
t.Fatalf("no error expected, found: %s", err)
}
if len(tc.expected) != walkedCount {
t.Errorf("expected %d nodes, got %d", len(tc.expected), walkedCount)
}
})
}
}
......@@ -97,6 +140,8 @@ func TestWalk(t *testing.T) {
[]byte("img/test/"),
[]byte("img/test/oho.png"),
[]byte("img/test/old/test.png"),
// file with same prefix but not a directory prefix
[]byte("img/test/old/test.png.backup"),
[]byte("robots.txt"),
},
expected: [][]byte{
......@@ -106,39 +151,50 @@ func TestWalk(t *testing.T) {
[]byte("img/test/oho.png"),
[]byte("img/test/old"),
[]byte("img/test/old/test.png"),
[]byte("img/test/old/test.png.backup"),
[]byte("robots.txt"),
},
},
} {
ctx := context.Background()
t.Run(tc.name, func(t *testing.T) {
n := New()
for i := 0; i < len(tc.toAdd); i++ {
c := tc.toAdd[i]
createTree := func(t *testing.T, toAdd [][]byte) *mantaray.Node {
n := mantaray.New()
for i := 0; i < len(toAdd); i++ {
c := toAdd[i]
e := append(make([]byte, 32-len(c)), c...)
err := n.Add(ctx, c, e, nil, nil)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
}
return n
}
walkedCount := 0
walker := func(path []byte, isDir bool, err error) error {
walkedCount++
pathExists := func(found []byte, expected [][]byte) bool {
pathFound := false
for i := 0; i < len(tc.expected); i++ {
c := tc.expected[i]
if bytes.Equal(path, c) {
if bytes.Equal(found, c) {
pathFound = true
break
}
}
return pathFound
}
t.Run(tc.name, func(t *testing.T) {
n := createTree(t, tc.toAdd)
walkedCount := 0
if !pathFound {
walker := func(path []byte, isDir bool, err error) error {
walkedCount++
if !pathExists(path, tc.expected) {
return fmt.Errorf("walkFn returned unknown path: %s", path)
}
......@@ -155,5 +211,41 @@ func TestWalk(t *testing.T) {
}
})
t.Run(tc.name+"/with load save", func(t *testing.T) {
n := createTree(t, tc.toAdd)
ls := newMockLoadSaver()
err := n.Save(ctx, ls)
if err != nil {
t.Fatal(err)
}
n2 := mantaray.NewNodeRef(n.Reference())
walkedCount := 0
walker := func(path []byte, isDir bool, err error) error {
walkedCount++
if !pathExists(path, tc.expected) {
return fmt.Errorf("walkFn returned unknown path: %s", path)
}
return nil
}
// Expect no errors.
err = n2.Walk(ctx, []byte{}, ls, walker)
if err != nil {
t.Fatalf("no error expected, found: %s", err)
}
if len(tc.expected) != walkedCount {
t.Errorf("expected %d nodes, got %d", len(tc.expected), walkedCount)
}
})
}
}
This diff is collapsed.
......@@ -241,7 +241,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
return nil, fmt.Errorf("eth address: %w", err)
}
// set up basic debug api endpoints for debugging and /health endpoint
debugAPIService = debugapi.New(*publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins, transactionService)
debugAPIService = debugapi.New(*publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins, big.NewInt(int64(o.BlockTime)), transactionService)
debugAPIListener, err := net.Listen("tcp", o.DebugAPIAddr)
if err != nil {
......@@ -362,7 +362,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
lightNodes := lightnode.NewContainer(swarmAddress)
senderMatcher := transaction.NewMatcher(swapBackend, types.NewEIP155Signer(big.NewInt(chainID)), stateStore)
senderMatcher := transaction.NewMatcher(swapBackend, types.NewEIP2930Signer(big.NewInt(chainID)), stateStore)
p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, senderMatcher, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey,
......@@ -653,7 +653,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler)
}
pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer, warmupTime)
pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, validStamp, tagService, logger, tracer, warmupTime)
b.pusherCloser = pusherService
pullStorage := pullstorage.New(storer)
......
......@@ -545,6 +545,124 @@ func TestTopologyNotifier(t *testing.T) {
waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1)
}
// TestTopologyAnnounce checks that announcement
// works correctly for full nodes and light nodes.
func TestTopologyAnnounce(t *testing.T) {
var (
mtx sync.Mutex
ctx = context.Background()
ab1, ab2, ab3 = addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore())
announceCalled = false
announceToCalled = false
n1a = func(context.Context, swarm.Address, bool) error {
mtx.Lock()
announceCalled = true
mtx.Unlock()
return nil
}
n1at = func(context.Context, swarm.Address, swarm.Address, bool) error {
mtx.Lock()
announceToCalled = true
mtx.Unlock()
return nil
}
)
// test setup: 2 full nodes and one light
// light connect to full(1), then full(2)
// connects to full(1), check that full(1)
// tried to announce full(2) to light.
notifier1 := mockAnnouncingNotifier(n1a, n1at)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab1,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s1.SetPickyNotifier(notifier1)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab2,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s3, overlay3 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab3,
libp2pOpts: libp2p.Options{
FullNode: false,
},
})
addr := serviceUnderlayAddress(t, s1)
// s3 (light) connects to s1 (full)
_, err := s3.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s3, overlay1)
expectPeersEventually(t, s1, overlay3)
called := false
for i := 0; i < 20; i++ {
mtx.Lock()
called = announceCalled
mtx.Unlock()
if called {
break
}
time.Sleep(50 * time.Millisecond)
}
if !called {
t.Error("expected announce to be called")
}
for i := 0; i < 10; i++ {
mtx.Lock()
called = announceToCalled
mtx.Unlock()
if called {
break
}
time.Sleep(50 * time.Millisecond)
}
if announceToCalled {
t.Error("announceTo called but should not")
}
// check address book entries are there
checkAddressbook(t, ab3, overlay1, addr)
// s2 (full) connects to s1 (full)
_, err = s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2, overlay3)
for i := 0; i < 20; i++ {
mtx.Lock()
called = announceToCalled
mtx.Unlock()
if called {
break
}
time.Sleep(50 * time.Millisecond)
}
if !called {
t.Error("expected announceTo to be called")
}
}
func TestTopologyOverSaturated(t *testing.T) {
var (
mtx sync.Mutex
......@@ -773,9 +891,11 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address
}
type notifiee struct {
connected func(context.Context, p2p.Peer, bool) error
disconnected func(p2p.Peer)
connected cFunc
disconnected dFunc
pick bool
announce announceFunc
announceTo announceToFunc
}
func (n *notifiee) Connected(c context.Context, p p2p.Peer, f bool) error {
......@@ -790,21 +910,30 @@ func (n *notifiee) Pick(p p2p.Peer) bool {
return n.pick
}
func (n *notifiee) Announce(context.Context, swarm.Address, bool) error {
return nil
func (n *notifiee) Announce(ctx context.Context, a swarm.Address, full bool) error {
return n.announce(ctx, a, full)
}
func (n *notifiee) AnnounceTo(ctx context.Context, a, b swarm.Address, full bool) error {
return n.announceTo(ctx, a, b, full)
}
func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick}
return &notifiee{connected: c, disconnected: d, pick: pick, announce: noopAnnounce, announceTo: noopAnnounceTo}
}
func mockAnnouncingNotifier(a announceFunc, at announceToFunc) p2p.PickyNotifier {
return &notifiee{connected: noopCf, disconnected: noopDf, pick: true, announce: a, announceTo: at}
}
type (
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
announceFunc func(context.Context, swarm.Address, bool) error
announceToFunc func(context.Context, swarm.Address, swarm.Address, bool) error
)
var noopCf = func(_ context.Context, _ p2p.Peer, _ bool) error {
return nil
}
var noopDf = func(p p2p.Peer) {}
var noopCf = func(context.Context, p2p.Peer, bool) error { return nil }
var noopDf = func(p2p.Peer) {}
var noopAnnounce = func(context.Context, swarm.Address, bool) error { return nil }
var noopAnnounceTo = func(context.Context, swarm.Address, swarm.Address, bool) error { return nil }
......@@ -8,6 +8,8 @@ import (
"context"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
libp2pm "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
)
......@@ -23,3 +25,9 @@ func (s *Service) NewStreamForPeerID(peerID libp2ppeer.ID, protocolName, protoco
type StaticAddressResolver = staticAddressResolver
var NewStaticAddressResolver = newStaticAddressResolver
func WithHostFactory(factory func(context.Context, ...libp2pm.Option) (host.Host, error)) Options {
return Options{
hostFactory: factory,
}
}
......@@ -23,6 +23,7 @@ import (
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p"
......@@ -82,6 +83,7 @@ type lightnodes interface {
Disconnected(p2p.Peer)
Count() int
RandomPeer(swarm.Address) (swarm.Address, error)
EachPeer(pf topology.EachPeerFunc) error
}
type Options struct {
......@@ -93,6 +95,7 @@ type Options struct {
LightNodeLimit int
WelcomeMessage string
Transaction []byte
hostFactory func(context.Context, ...libp2p.Option) (host.Host, error)
}
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, swapBackend handshake.SenderMatcher, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) {
......@@ -181,14 +184,19 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
opts = append(opts, transports...)
h, err := libp2p.New(ctx, opts...)
if o.hostFactory == nil {
// Use the default libp2p host creation
o.hostFactory = libp2p.New
}
h, err := o.hostFactory(ctx, opts...)
if err != nil {
return nil, err
}
// Support same non default security and transport options as
// original host.
dialer, err := libp2p.New(ctx, append(transports, security)...)
dialer, err := o.hostFactory(ctx, append(transports, security)...)
if err != nil {
return nil, err
}
......@@ -224,7 +232,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// the addresses used are not dialable and hence should be cleaned up. We should create
// this host with the same transports and security options to be able to dial to other
// peers.
pingDialer, err := libp2p.New(ctx, append(transports, security, libp2p.NoListenAddrs)...)
pingDialer, err := o.hostFactory(ctx, append(transports, security, libp2p.NoListenAddrs)...)
if err != nil {
return nil, err
}
......@@ -390,8 +398,8 @@ func (s *Service) handleIncoming(stream network.Stream) {
return
}
}
} else if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
} else {
if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
......@@ -405,6 +413,18 @@ func (s *Service) handleIncoming(stream network.Stream) {
_ = s.Disconnect(overlay)
return
}
// when a full node connects, we gossip about it to the
// light nodes so that they can also have a chance at building
// a solid topology.
_ = s.lightNodes.EachPeer(func(addr swarm.Address, _ uint8) (bool, bool, error) {
go func(addressee, peer swarm.Address, fullnode bool) {
if err := s.notifier.AnnounceTo(s.ctx, addressee, peer, fullnode); err != nil {
s.logger.Debugf("stream handler: notifier.Announce to light node %s %s: %v", addressee.String(), peer.String(), err)
}
}(addr, peer.Address, i.FullNode)
return false, false, nil
})
}
}
s.metrics.HandledStreamCount.Inc()
......@@ -475,8 +495,10 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if err := ss.Handler(ctx, p2p.Peer{Address: overlay, FullNode: full}, stream); err != nil {
var de *p2p.DisconnectError
if errors.As(err, &de) {
logger.Tracef("libp2p handler(%s): disconnecting %s", p.Name, overlay.String())
_ = stream.Reset()
_ = s.Disconnect(overlay)
logger.Tracef("handler(%s): disconnecting %s due to disconnect error", p.Name, overlay.String())
}
var bpe *p2p.BlockPeerError
......@@ -486,7 +508,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err)
logger.Errorf("unable to blocklist peer %v", peerID)
}
logger.Tracef("blocklisted a peer %s", peerID)
logger.Tracef("handler(%s): blocklisted %s", p.Name, overlay.String())
}
// count unexpected requests
if errors.Is(err, p2p.ErrUnexpected) {
......@@ -529,6 +551,7 @@ func (s *Service) NATManager() basichost.NATManager {
}
func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error {
s.logger.Tracef("libp2p blocklist: peer %s for %v", overlay.String(), duration)
if err := s.blocklist.Add(overlay, duration); err != nil {
s.metrics.BlocklistedPeerErrCount.Inc()
_ = s.Disconnect(overlay)
......
......@@ -14,6 +14,10 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
libp2pm "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/multiformats/go-multistream"
)
......@@ -379,21 +383,26 @@ func TestConnectDisconnectEvents(t *testing.T) {
}
func TestPing(t *testing.T) {
t.Skip("test flaking")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
s1, _ := newService(t, 1, libp2pServiceOpts{})
s2, _ := newService(t, 1, libp2pServiceOpts{})
s1, _ := newService(t, 1, libp2pServiceOpts{
libp2pOpts: libp2p.WithHostFactory(
func(ctx context.Context, _ ...libp2pm.Option) (host.Host, error) {
return bhost.NewHost(ctx, swarmt.GenSwarm(t, ctx), &bhost.HostOpts{EnablePing: true})
},
),
})
defer s1.Close()
// Wait for listeners to start. There are times when the test fails unexpectedly
// during CI and we suspect it is due to the listeners not starting in time. The
// sleep here ensures CPU is given up for any goroutines which are not getting
// scheduled. Ideally we should explicitly check the TCP status on the port
// where the libp2p.Host is started before assuming the host is up. This seems like
// a bit of an overkill here unless the test starts flaking.
time.Sleep(time.Second)
s2, _ := newService(t, 1, libp2pServiceOpts{
libp2pOpts: libp2p.WithHostFactory(
func(ctx context.Context, _ ...libp2pm.Option) (host.Host, error) {
return bhost.NewHost(ctx, swarmt.GenSwarm(t, ctx), &bhost.HostOpts{EnablePing: true})
},
),
})
defer s2.Close()
addr := serviceUnderlayAddress(t, s1)
......
......@@ -50,7 +50,8 @@ type PickyNotifier interface {
type Notifier interface {
Connected(context.Context, Peer, bool) error
Disconnected(Peer)
Announce(context.Context, swarm.Address, bool) error
Announce(ctx context.Context, peer swarm.Address, fullnode bool) error
AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error
}
// DebugService extends the Service with method used for debugging.
......
......@@ -39,9 +39,6 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
// ErrBatchNotFound is returned when the postage batch is not found or expired
var ErrBatchNotFound = errors.New("postage batch not found or expired")
// DefaultDepth is the initial depth for the reserve
var DefaultDepth = uint8(12) // 12 is the testnet depth at the time of merging to master
......@@ -179,7 +176,7 @@ func (s *store) evictExpired() error {
return true, err
}
s.rs.Available += multiplier * exp2(b.Radius-s.rs.Radius-1)
s.rs.Available += multiplier * exp2(uint(b.Radius-s.rs.Radius-1))
// if batch has no value then delete it
if b.Value.Cmp(s.cs.TotalAmount) <= 0 {
......@@ -236,7 +233,7 @@ func (rs *reserveState) change(oldv, newv *big.Int, oldDepth, newDepth uint8) (i
// size returns the number of chunks the local node is responsible
// to store in its reserve.
func (rs *reserveState) size(depth uint8, t tier) int64 {
size := exp2(depth - rs.Radius - 1)
size := exp2(uint(depth - rs.Radius - 1))
switch t {
case inner:
return size
......@@ -354,7 +351,7 @@ func (s *store) evictOuter(last *postage.Batch) error {
return true, nil
}
// unreserve outer PO of the lowest priority batch until capacity is back to positive
s.rs.Available += exp2(b.Depth - s.rs.Radius - 1)
s.rs.Available += exp2(uint(b.Depth) - uint(s.rs.Radius) - 1)
s.rs.Outer.Set(b.Value)
return false, s.unreserveFn(b.ID, s.rs.Radius)
})
......@@ -409,13 +406,6 @@ func (u *UnreserveItem) UnmarshalBinary(b []byte) error {
}
// exp2 returns the e-th power of 2
func exp2(e uint8) int64 {
if e == 0 {
return 1
}
b := int64(2)
for i := uint8(1); i < e; i++ {
b *= 2
}
return b
func exp2(e uint) int64 {
return 1 << e
}
......@@ -7,6 +7,7 @@ package mock
import (
"errors"
"math/big"
"sync"
"github.com/ethersphere/bee/pkg/postage"
)
......@@ -22,7 +23,9 @@ func (f optionFunc) apply(r *mockPostage) { f(r) }
// New creates a new mock postage service.
func New(o ...Option) postage.Service {
m := &mockPostage{}
m := &mockPostage{
issuersMap: make(map[string]*postage.StampIssuer),
}
for _, v := range o {
v.apply(m)
}
......@@ -37,20 +40,33 @@ func WithAcceptAll() Option {
}
func WithIssuer(s *postage.StampIssuer) Option {
return optionFunc(func(m *mockPostage) { m.i = s })
return optionFunc(func(m *mockPostage) {
m.issuersMap = map[string]*postage.StampIssuer{string(s.ID()): s}
})
}
type mockPostage struct {
i *postage.StampIssuer
issuersMap map[string]*postage.StampIssuer
issuerLock sync.Mutex
acceptAll bool
}
func (m *mockPostage) Add(s *postage.StampIssuer) {
m.i = s
m.issuerLock.Lock()
defer m.issuerLock.Unlock()
m.issuersMap[string(s.ID())] = s
}
func (m *mockPostage) StampIssuers() []*postage.StampIssuer {
return []*postage.StampIssuer{m.i}
m.issuerLock.Lock()
defer m.issuerLock.Unlock()
issuers := []*postage.StampIssuer{}
for _, v := range m.issuersMap {
issuers = append(issuers, v)
}
return issuers
}
func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, error) {
......@@ -58,22 +74,20 @@ func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, error) {
return postage.NewStampIssuer("test fallback", "test identity", id, big.NewInt(3), 24, 6, 1000, true), nil
}
if m.i != nil {
return m.i, nil
}
m.issuerLock.Lock()
defer m.issuerLock.Unlock()
i, exists := m.issuersMap[string(id)]
if !exists {
return nil, errors.New("stampissuer not found")
}
return i, nil
}
func (m *mockPostage) IssuerUsable(_ *postage.StampIssuer) bool {
return true
}
// BatchExists returns always true.
func (m *mockPostage) BatchExists(_ []byte) (bool, error) {
return true, nil
}
func (m *mockPostage) Handle(_ *postage.Batch) {}
func (m *mockPostage) Close() error {
......
......@@ -34,7 +34,6 @@ type Service interface {
StampIssuers() []*StampIssuer
GetStampIssuer([]byte) (*StampIssuer, error)
IssuerUsable(*StampIssuer) bool
BatchExists([]byte) (bool, error)
BatchCreationListener
io.Closer
}
......@@ -124,11 +123,6 @@ func (ps *service) IssuerUsable(st *StampIssuer) bool {
return true
}
// BatchExists returns true if the batch referenced by the given id exists.
func (ps *service) BatchExists(id []byte) (bool, error) {
return ps.postageStore.Exists(id)
}
// GetStampIssuer finds a stamp issuer by batch ID.
func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, error) {
ps.lock.Lock()
......
......@@ -229,7 +229,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
chunk := swarm.NewChunk(addr, delivery.Data)
if chunk, err = s.validStamp(chunk, delivery.Stamp); err != nil {
s.logger.Debugf("unverified chunk: %w", err)
s.logger.Debugf("unverified chunk: %v", err)
continue
}
......
......@@ -19,6 +19,7 @@ import (
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -34,6 +35,7 @@ type Service struct {
networkID uint64
storer storage.Storer
pushSyncer pushsync.PushSyncer
validStamp postage.ValidStampFn
depther topology.NeighborhoodDepther
logger logging.Logger
tag *tags.Tags
......@@ -54,11 +56,12 @@ var (
ErrShallowReceipt = errors.New("shallow recipt")
)
func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service {
func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, validStamp postage.ValidStampFn, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service {
service := &Service{
networkID: networkID,
storer: storer,
pushSyncer: pushSyncer,
validStamp: validStamp,
depther: depther,
tag: tagger,
logger: logger,
......@@ -121,6 +124,27 @@ LOOP:
break
}
// If the stamp is invalid, the chunk is not synced with the network
// since other nodes would reject the chunk, so the chunk is marked as
// synced which makes it available to the node but not to the network
stampBytes, err := ch.Stamp().MarshalBinary()
if err != nil {
s.logger.Errorf("pusher: stamp marshal: %w", err)
if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
s.logger.Errorf("pusher: set sync: %w", err)
}
continue
}
_, err = s.validStamp(ch, stampBytes)
if err != nil {
s.logger.Warningf("pusher: stamp with batch ID %x is no longer valid, skipping syncing for chunk %s: %v", ch.Stamp().BatchID(), ch.Address().String(), err)
if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
s.logger.Errorf("pusher: set sync: %w", err)
}
continue
}
if span == nil {
mtx.Lock()
span, logger, ctx = s.tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger)
......@@ -167,10 +191,13 @@ LOOP:
if err == nil {
s.metrics.TotalSynced.Inc()
s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
// only print this if there was no error while sending the chunk
if wantSelf {
logger.Tracef("pusher: chunk %s stays here, i'm the closest node", ch.Address().String())
} else {
po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes())
logger.Tracef("pusher: pushed chunk %s to node %s, receipt depth %d", ch.Address().String(), storerPeer.String(), po)
s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
}
delete(retryCounter, ch.Address().ByteString())
} else {
s.metrics.TotalErrors.Inc()
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/postage"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/localstore"
......@@ -32,6 +33,9 @@ import (
// no of times to retry to see if we have received response from pushsync
var noOfRetries = 20
var block = common.HexToHash("0x1").Bytes()
var defaultMockValidStamp = func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch, nil
}
// Wrap the actual storer to intercept the modeSet that the pusher will call when a valid receipt is received
type Store struct {
......@@ -91,7 +95,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
return receipt, nil
})
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
......@@ -107,7 +111,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(50 * time.Millisecond)
......@@ -148,7 +152,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
......@@ -157,7 +161,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(50 * time.Millisecond)
......@@ -186,7 +190,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
return nil, errors.New("invalid receipt")
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer))
defer storer.Close()
defer p.Close()
......@@ -195,7 +199,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
......@@ -234,7 +238,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
......@@ -243,7 +247,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
......@@ -290,7 +294,7 @@ func TestPusherClose(t *testing.T) {
return receipt, nil
})
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
chunk := testingc.GenerateTestRandomChunk()
......@@ -397,7 +401,7 @@ func TestPusherRetryShallow(t *testing.T) {
// create the pivot peer pusher with depth 31, this makes
// sure that virtually any receipt generated by the random
// key will be considered too shallow
_, ps, storer := createPusher(t, pivotPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(31))
_, ps, storer := createPusher(t, pivotPeer, pushSyncService, defaultMockValidStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(31))
defer ps.Close()
// generate a chunk at PO 1 with closestPeer, meaning that we get a
......@@ -424,7 +428,56 @@ func TestPusherRetryShallow(t *testing.T) {
t.Fatalf("timed out waiting for retries. got %d want %d", c, *pusher.RetryCount)
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
// TestChunkWithInvalidStampSkipped tests that chunks with invalid stamps are skipped in pusher
func TestChunkWithInvalidStampSkipped(t *testing.T) {
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
BlockHash: block,
}
return receipt, nil
})
validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return nil, errors.New("valid stamp error")
}
_, p, storer := createPusher(t, triggerPeer, pushSyncService, validStamp, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()
chunk := testingc.GenerateTestRandomChunk()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// Check if the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(50 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSync, storer)
if err == nil {
break
}
}
if err != nil {
t.Fatal(err)
}
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, validStamp postage.ValidStampFn, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, nil, logger)
......@@ -442,7 +495,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(1, pusherStorer, peerSuggester, pushSyncService, mtags, logger, nil, 0)
pusherService := pusher.New(1, pusherStorer, peerSuggester, pushSyncService, validStamp, mtags, logger, nil, 0)
return mtags, pusherService, pusherStorer
}
......
......@@ -40,13 +40,17 @@ const (
const (
maxPeers = 3
maxAttempts = 16
skipPeerExpiration = time.Minute
)
var (
ErrOutOfDepthReplication = errors.New("replication outside of the neighborhood")
ErrNoPush = errors.New("could not push chunk")
ErrWarmup = errors.New("node warmup time not complete")
defaultTTL = 20 * time.Second // request time to live
sanctionWait = 5 * time.Minute
timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
)
type PushSyncer interface {
......@@ -79,10 +83,6 @@ type PushSync struct {
skipList *peerSkipList
}
var defaultTTL = 20 * time.Second // request time to live
var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
func New(address swarm.Address, blockHash []byte, streamer p2p.StreamerDisconnecter, storer storage.Putter, topology topology.Driver, tagger *tags.Tags, isFullNode bool, unwrap func(swarm.Chunk), validStamp postage.ValidStampFn, logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer, warmupTime time.Duration) *PushSync {
ps := &PushSync{
address: address,
......@@ -287,22 +287,15 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
BlockHash: r.BlockHash}, nil
}
type pushResult struct {
receipt *pb.Receipt
err error
attempted bool
}
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllowed bool, origin swarm.Address) (*pb.Receipt, error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish()
defer ps.skipList.PruneExpired()
var (
skipPeers []swarm.Address
allowedRetries = 1
resultC = make(chan *pushResult)
includeSelf = ps.isFullNode
skipPeers []swarm.Address
)
if retryAllowed {
......@@ -312,7 +305,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
for i := maxAttempts; allowedRetries > 0 && i > 0; i-- {
// find the next closest peer
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, skipPeers...)
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, append(append([]swarm.Address{}, ps.skipList.ChunkSkipPeers(ch.Address())...), skipPeers...)...)
if err != nil {
// ClosestPeer can return ErrNotFound in case we are not connected to any peers
// in which case we should return immediately.
......@@ -352,21 +345,13 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
}
return nil, fmt.Errorf("closest peer: %w", err)
}
skipPeers = append(skipPeers, peer)
if ps.skipList.ShouldSkip(peer) {
ps.metrics.TotalSkippedPeers.Inc()
continue
}
ps.metrics.TotalSendAttempts.Inc()
go func(peer swarm.Address, ch swarm.Chunk) {
ctxd, canceld := context.WithTimeout(ctx, defaultTTL)
defer canceld()
r, attempted, err := ps.pushPeer(ctxd, peer, ch, retryAllowed)
// attempted is true if we get past accounting and actually attempt
// to send the request to the peer. If we dont get past accounting, we
// should not count the retry and try with a different peer again
......@@ -374,38 +359,29 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
allowedRetries--
}
if err != nil {
logger.Debugf("could not push to peer %s: %v", peer, err)
// if the node has warmed up AND no other closer peer has been tried
if ps.warmedUp() && !ps.skipList.HasChunk(ch.Address()) {
ps.skipList.Add(peer, ch.Address(), skipPeerExpiration)
var timeToSkip time.Duration
switch {
case errors.Is(err, accounting.ErrOverdraft):
skipPeers = append(skipPeers, peer)
default:
timeToSkip = sanctionWait
}
select {
case resultC <- &pushResult{err: err, attempted: attempted}:
case <-ctx.Done():
}
return
}
select {
case resultC <- &pushResult{receipt: r}:
case <-ctx.Done():
}
}(peer, ch)
logger.Debugf("pushsync: could not push to peer %s: %v", peer, err)
select {
case r := <-resultC:
// receipt received for chunk
if r.receipt != nil {
ps.skipList.PruneChunk(ch.Address())
return r.receipt, nil
// if the node has warmed up AND no other closer peer has been tried
if ps.warmedUp() && timeToSkip > 0 {
ps.skipList.Add(ch.Address(), peer, timeToSkip)
}
if r.err != nil && r.attempted {
ps.metrics.TotalFailedSendAttempts.Inc()
if allowedRetries > 0 {
continue
}
case <-ctx.Done():
return nil, ctx.Err()
return nil, err
}
ps.skipList.PruneChunk(ch.Address())
return r, nil
}
return nil, ErrNoPush
......@@ -537,7 +513,9 @@ func (ps *PushSync) pushToNeighbour(peer swarm.Address, ch swarm.Chunk, origin b
return
}
err = ps.accounting.Credit(peer, receiptPrice, origin)
if err = ps.accounting.Credit(peer, receiptPrice, origin); err != nil {
return
}
}
func (ps *PushSync) warmedUp() bool {
......@@ -546,56 +524,45 @@ func (ps *PushSync) warmedUp() bool {
type peerSkipList struct {
sync.Mutex
chunks map[string]struct{}
skipExpiration map[string]time.Time
// key is chunk address, value is map of peer address to expiration
skip map[string]map[string]time.Time
}
func newPeerSkipList() *peerSkipList {
return &peerSkipList{
chunks: make(map[string]struct{}),
skipExpiration: make(map[string]time.Time),
skip: make(map[string]map[string]time.Time),
}
}
func (l *peerSkipList) Add(peer, chunk swarm.Address, expire time.Duration) {
func (l *peerSkipList) Add(chunk, peer swarm.Address, expire time.Duration) {
l.Lock()
defer l.Unlock()
l.skipExpiration[peer.ByteString()] = time.Now().Add(expire)
l.chunks[chunk.ByteString()] = struct{}{}
if _, ok := l.skip[chunk.ByteString()]; !ok {
l.skip[chunk.ByteString()] = make(map[string]time.Time)
}
l.skip[chunk.ByteString()][peer.ByteString()] = time.Now().Add(expire)
}
func (l *peerSkipList) ShouldSkip(peer swarm.Address) bool {
func (l *peerSkipList) ChunkSkipPeers(ch swarm.Address) (peers []swarm.Address) {
l.Lock()
defer l.Unlock()
peerStr := peer.ByteString()
if exp, has := l.skipExpiration[peerStr]; has {
// entry is expired
if exp.Before(time.Now()) {
delete(l.skipExpiration, peerStr)
return false
} else {
return true
if p, ok := l.skip[ch.ByteString()]; ok {
for peer, exp := range p {
if time.Now().Before(exp) {
peers = append(peers, swarm.NewAddress([]byte(peer)))
}
}
return false
}
func (l *peerSkipList) HasChunk(chunk swarm.Address) bool {
l.Lock()
defer l.Unlock()
_, has := l.chunks[chunk.ByteString()]
return has
}
return peers
}
func (l *peerSkipList) PruneChunk(chunk swarm.Address) {
l.Lock()
defer l.Unlock()
delete(l.chunks, chunk.ByteString())
delete(l.skip, chunk.ByteString())
}
func (l *peerSkipList) PruneExpired() {
......@@ -604,9 +571,17 @@ func (l *peerSkipList) PruneExpired() {
now := time.Now()
for k, v := range l.skipExpiration {
if v.Before(now) {
delete(l.skipExpiration, k)
for k, v := range l.skip {
kc := len(v)
for kk, vv := range v {
if vv.Before(now) {
delete(v, kk)
kc--
}
}
if kc == 0 {
// prune the chunk too
delete(l.skip, k)
}
}
}
......@@ -486,8 +486,12 @@ func TestPushChunkToNextClosest(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if ta2.Get(tags.StateSent) != 2 {
t.Fatalf("tags error")
// the write to the first peer might succeed or
// fail, so it is not guaranteed that two increments
// are made to Sent. expect >= 1
if tg := ta2.Get(tags.StateSent); tg == 0 {
t.Fatalf("tags error got %d want >= 1", tg)
}
balance, err := pivotAccounting.Balance(peer2)
......@@ -779,8 +783,8 @@ func TestSignsReceipt(t *testing.T) {
t.Fatal("receipt block hash do not match")
}
}
func TestPeerSkipList(t *testing.T) {
func TestPeerSkipList(t *testing.T) {
skipList := pushsync.NewPeerSkipList()
addr1 := testingc.GenerateTestRandomChunk().Address()
......@@ -788,25 +792,16 @@ func TestPeerSkipList(t *testing.T) {
skipList.Add(addr1, addr2, time.Millisecond*10)
if !skipList.ShouldSkip(addr1) {
if !skipList.ChunkSkipPeers(addr1)[0].Equal(addr2) {
t.Fatal("peer should be skipped")
}
if !skipList.HasChunk(addr2) {
t.Fatal("chunk is missing")
}
time.Sleep(time.Millisecond * 11)
skipList.PruneExpired()
if skipList.ShouldSkip(addr1) {
t.Fatal("peer should be not be skipped")
}
skipList.PruneChunk(addr2)
if skipList.HasChunk(addr2) {
t.Fatal("chunk should be missing")
if len(skipList.ChunkSkipPeers(addr1)) != 0 {
t.Fatal("entry should be pruned")
}
}
......
......@@ -262,7 +262,7 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount, checkAllo
}
currentTime := s.timeNow().Unix()
if currentTime == lastTime.Timestamp {
if currentTime == lastTime.CheckTimestamp {
return nil, 0, ErrSettlementTooSoon
}
......
......@@ -198,7 +198,7 @@ func peerKey(peer swarm.Address) string {
// chequebookPeerKey computes the key where to store the peer for a chequebook.
func chequebookPeerKey(chequebook common.Address) string {
return fmt.Sprintf("%s%s", peerChequebookPrefix, chequebook)
return fmt.Sprintf("%s%x", peerChequebookPrefix, chequebook)
}
// peerBeneficiaryKey computes the key where to store the beneficiary for a peer.
......@@ -208,7 +208,7 @@ func peerBeneficiaryKey(peer swarm.Address) string {
// beneficiaryPeerKey computes the key where to store the peer for a beneficiary.
func beneficiaryPeerKey(peer common.Address) string {
return fmt.Sprintf("%s%s", beneficiaryPeerPrefix, peer)
return fmt.Sprintf("%s%x", beneficiaryPeerPrefix, peer)
}
func peerDeductedByKey(peer swarm.Address) string {
......
......@@ -470,3 +470,22 @@ func TestChequebookWithdrawInsufficientFunds(t *testing.T) {
t.Fatalf("got wrong error. wanted %v, got %v", chequebook.ErrInsufficientFunds, err)
}
}
func TestStateStoreKeys(t *testing.T) {
address := common.HexToAddress("0xabcd")
expected := "swap_cashout_000000000000000000000000000000000000abcd"
if chequebook.CashoutActionKey(address) != expected {
t.Fatalf("wrong cashout action key. wanted %s, got %s", expected, chequebook.CashoutActionKey(address))
}
expected = "swap_chequebook_last_issued_cheque_000000000000000000000000000000000000abcd"
if chequebook.LastIssuedChequeKey(address) != expected {
t.Fatalf("wrong last issued cheque key. wanted %s, got %s", expected, chequebook.LastIssuedChequeKey(address))
}
expected = "swap_chequebook_last_received_cheque__000000000000000000000000000000000000abcd"
if chequebook.LastReceivedChequeKey(address) != expected {
t.Fatalf("wrong last received cheque key. wanted %s, got %s", expected, chequebook.LastReceivedChequeKey(address))
}
}
package chequebook
var (
LastIssuedChequeKey = lastIssuedChequeKey
LastReceivedChequeKey = lastReceivedChequeKey
CashoutActionKey = cashoutActionKey
)
package swap
var (
PeerKey = peerKey
ChequebookPeerKey = chequebookPeerKey
PeerBeneficiaryKey = peerBeneficiaryKey
BeneficiaryPeerKey = beneficiaryPeerKey
PeerDeductedByKey = peerDeductedByKey
PeerDeductedForKey = peerDeductedForKey
)
......@@ -750,3 +750,38 @@ func TestCashoutStatus(t *testing.T) {
t.Fatalf("go wrong status. wanted %v, got %v", expectedStatus, returnedStatus)
}
}
func TestStateStoreKeys(t *testing.T) {
address := common.HexToAddress("0xabcd")
swarmAddress := swarm.MustParseHexAddress("deff")
expected := "swap_chequebook_peer_deff"
if swap.PeerKey(swarmAddress) != expected {
t.Fatalf("wrong peer key. wanted %s, got %s", expected, swap.PeerKey(swarmAddress))
}
expected = "swap_peer_chequebook_000000000000000000000000000000000000abcd"
if swap.ChequebookPeerKey(address) != expected {
t.Fatalf("wrong peer key. wanted %s, got %s", expected, swap.ChequebookPeerKey(address))
}
expected = "swap_peer_beneficiary_deff"
if swap.PeerBeneficiaryKey(swarmAddress) != expected {
t.Fatalf("wrong peer beneficiary key. wanted %s, got %s", expected, swap.PeerBeneficiaryKey(swarmAddress))
}
expected = "swap_beneficiary_peer_000000000000000000000000000000000000abcd"
if swap.BeneficiaryPeerKey(address) != expected {
t.Fatalf("wrong beneficiary peer key. wanted %s, got %s", expected, swap.BeneficiaryPeerKey(address))
}
expected = "swap_deducted_by_peer_deff"
if swap.PeerDeductedByKey(swarmAddress) != expected {
t.Fatalf("wrong peer deducted by key. wanted %s, got %s", expected, swap.PeerDeductedByKey(swarmAddress))
}
expected = "swap_deducted_for_peer_deff"
if swap.PeerDeductedForKey(swarmAddress) != expected {
t.Fatalf("wrong peer deducted for key. wanted %s, got %s", expected, swap.PeerDeductedForKey(swarmAddress))
}
}
......@@ -14,6 +14,10 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/syndtr/goleveldb/leveldb"
ldberr "github.com/syndtr/goleveldb/leveldb/errors"
ldb "github.com/syndtr/goleveldb/leveldb"
ldbs "github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
......@@ -25,6 +29,24 @@ type store struct {
logger logging.Logger
}
func NewInMemoryStateStore(l logging.Logger) (storage.StateStorer, error) {
ldb, err := ldb.Open(ldbs.NewMemStorage(), nil)
if err != nil {
return nil, err
}
s := &store{
db: ldb,
logger: l,
}
if err := migrate(s); err != nil {
return nil, err
}
return s, nil
}
// NewStateStore creates a new persistent state storage.
func NewStateStore(path string, l logging.Logger) (storage.StateStorer, error) {
db, err := leveldb.OpenFile(path, nil)
......@@ -46,26 +68,34 @@ func NewStateStore(path string, l logging.Logger) (storage.StateStorer, error) {
logger: l,
}
if err := migrate(s); err != nil {
return nil, err
}
return s, nil
}
func migrate(s *store) error {
sn, err := s.getSchemaName()
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
_ = s.Close()
return nil, fmt.Errorf("get schema name: %w", err)
return fmt.Errorf("get schema name: %w", err)
}
// new statestore - put schema key with current name
if err := s.putSchemaName(dbSchemaCurrent); err != nil {
_ = s.Close()
return nil, fmt.Errorf("put schema name: %w", err)
return fmt.Errorf("put schema name: %w", err)
}
sn = dbSchemaCurrent
}
if err = s.migrate(sn); err != nil {
_ = s.Close()
return nil, fmt.Errorf("migrate: %w", err)
return fmt.Errorf("migrate: %w", err)
}
return s, nil
return nil
}
// Get retrieves a value of the requested key. If no results are found,
......
......@@ -20,6 +20,8 @@ import (
"errors"
"fmt"
"strings"
"github.com/ethereum/go-ethereum/common"
)
var (
......@@ -35,6 +37,7 @@ const (
dbSchemaCleanInterval = "clean-interval"
dbSchemaNoStamp = "no-stamp"
dbSchemaFlushBlock = "flushblock"
dbSchemaSwapAddr = "swapaddr"
)
var (
......@@ -54,6 +57,7 @@ var schemaMigrations = []migration{
{name: dbSchemaCleanInterval, fn: migrateGrace},
{name: dbSchemaNoStamp, fn: migrateStamp},
{name: dbSchemaFlushBlock, fn: migrateFB},
{name: dbSchemaSwapAddr, fn: migrateSwap},
}
func migrateFB(s *store) error {
......@@ -108,6 +112,45 @@ func migrateGrace(s *store) error {
return nil
}
func migrateSwap(s *store) error {
migratePrefix := func(prefix string) error {
keys, err := collectKeys(s, prefix)
if err != nil {
return err
}
for _, key := range keys {
split := strings.SplitAfter(key, prefix)
if len(split) != 2 {
return errors.New("no peer in key")
}
addr := common.BytesToAddress([]byte(split[1]))
fixed := fmt.Sprintf("%s%x", prefix, addr)
var chequebookAddress common.Address
if err = s.Get(key, &chequebookAddress); err != nil {
return err
}
if err = s.Put(fixed, chequebookAddress); err != nil {
return err
}
if err = s.Delete(key); err != nil {
return err
}
}
return nil
}
if err := migratePrefix("swap_peer_chequebook_"); err != nil {
return err
}
return migratePrefix("swap_beneficiary_peer_")
}
func (s *store) migrate(schemaName string) error {
migrations, err := getMigrations(schemaName, dbSchemaCurrent, schemaMigrations, s)
if err != nil {
......
......@@ -18,10 +18,13 @@ package leveldb
import (
"errors"
"fmt"
"io/ioutil"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
)
func TestOneMigration(t *testing.T) {
......@@ -282,3 +285,58 @@ func TestMigrationErrorTo(t *testing.T) {
t.Errorf("migration ran but shouldnt have")
}
}
func TestMigrationSwap(t *testing.T) {
dir := t.TempDir()
logger := logging.New(ioutil.Discard, 0)
// start the fresh statestore with the sanctuary schema name
db, err := NewStateStore(dir, logger)
if err != nil {
t.Fatal(err)
}
defer db.Close()
address := common.HexToAddress("0xabcd")
storedAddress := common.HexToAddress("0xffff")
legacyKey1 := fmt.Sprintf("swap_peer_chequebook_%s", address[:])
legacyKey2 := fmt.Sprintf("swap_beneficiary_peer_%s", address[:])
if err = db.Put(legacyKey1, storedAddress); err != nil {
t.Fatal(err)
}
if err = db.Put(legacyKey2, storedAddress); err != nil {
t.Fatal(err)
}
if err = migrateSwap(db.(*store)); err != nil {
t.Fatal(err)
}
var retrievedAddress common.Address
if err = db.Get("swap_peer_chequebook_000000000000000000000000000000000000abcd", &retrievedAddress); err != nil {
t.Fatal(err)
}
if retrievedAddress != storedAddress {
t.Fatalf("got wrong address. wanted %x, got %x", storedAddress, retrievedAddress)
}
if err = db.Get("swap_beneficiary_peer_000000000000000000000000000000000000abcd", &retrievedAddress); err != nil {
t.Fatal(err)
}
if retrievedAddress != storedAddress {
t.Fatalf("got wrong address. wanted %x, got %x", storedAddress, retrievedAddress)
}
if err = db.Get(legacyKey1, &retrievedAddress); err != storage.ErrNotFound {
t.Fatalf("legacyKey1 not deleted. got error %v", err)
}
if err = db.Get(legacyKey2, &retrievedAddress); err != storage.ErrNotFound {
t.Fatalf("legacyKey2 not deleted. got error %v", err)
}
}
......@@ -33,8 +33,6 @@ func (m ModeGet) String() string {
return "Sync"
case ModeGetLookup:
return "Lookup"
case ModeGetPin:
return "PinLookup"
case ModeGetRequestPin:
return "RequestPin"
default:
......@@ -50,8 +48,6 @@ const (
ModeGetSync
// ModeGetLookup: when accessed to lookup a a chunk in feeds or other places
ModeGetLookup
// ModeGetPin: used when a pinned chunk is accessed
ModeGetPin
// ModeGetRequestPin represents request for retrieval of pinned chunk.
ModeGetRequestPin
)
......
This diff is collapsed.
......@@ -671,6 +671,32 @@ func TestDiscoveryHooks(t *testing.T) {
waitBcast(t, disc, p3, p1, p2)
}
func TestAnnounceTo(t *testing.T) {
var (
conns int32
_, kad, ab, disc, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
p1, p2 = test.RandomAddress(), test.RandomAddress()
)
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()
// first add a peer from AddPeers, wait for the connection
addOne(t, signer, kad, ab, p1)
waitConn(t, &conns)
if err := kad.AnnounceTo(context.Background(), p1, p2, true); err != nil {
t.Fatal(err)
}
waitBcast(t, disc, p1, p2)
if err := kad.AnnounceTo(context.Background(), p1, p2, false); err == nil {
t.Fatal("expected error")
}
}
func TestBackoff(t *testing.T) {
// cheat and decrease the timer
defer func(t time.Duration) {
......
......@@ -152,6 +152,10 @@ func (m *Mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}
func (m *Mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}
func (m *Mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
channel := make(chan struct{}, 1)
var closeOnce sync.Once
......
......@@ -96,6 +96,10 @@ PICKPEER:
return addr, nil
}
func (c *Container) EachPeer(pf topology.EachPeerFunc) error {
return c.connectedPeers.EachBin(pf)
}
func (c *Container) PeerInfo() topology.BinInfo {
return topology.BinInfo{
BinPopulation: uint(c.connectedPeers.Length()),
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -237,3 +237,7 @@ func (*mockSigner) Hash(tx *types.Transaction) common.Hash {
func (*mockSigner) Equal(types.Signer) bool {
return false
}
func (*mockSigner) ChainID() *big.Int {
return big.NewInt(0)
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment