Commit 772f8237 authored by luxq's avatar luxq

Merge branch 'ori-master' into master

parents 8459e39e 4f7a6228
...@@ -15,13 +15,12 @@ do ...@@ -15,13 +15,12 @@ do
done done
kubectl -n local get pods > dump/kubectl_get_pods kubectl -n local get pods > dump/kubectl_get_pods
kubectl -n local logs -l app.kubernetes.io/part-of=bee --tail -1 --prefix -c bee > dump/kubectl_logs kubectl -n local logs -l app.kubernetes.io/part-of=bee --tail -1 --prefix -c bee > dump/kubectl_logs
vertag=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 15)
endpoint=$AWS_ENDPOINT endpoint=$AWS_ENDPOINT
if [[ "$endpoint" != http* ]] if [[ "$endpoint" != http* ]]
then then
endpoint=https://$endpoint endpoint=https://$endpoint
fi fi
fname=artifacts_$vertag.tar.gz fname=artifacts_${VERTAG}.tar.gz
tar -cz dump | aws --endpoint-url "$endpoint" s3 cp - s3://"$BUCKET_NAME"/"$fname" tar -cz dump | aws --endpoint-url "$endpoint" s3 cp - s3://"$BUCKET_NAME"/"$fname"
aws --endpoint-url "$endpoint" s3api put-object-acl --bucket "$BUCKET_NAME" --acl public-read --key "$fname" aws --endpoint-url "$endpoint" s3api put-object-acl --bucket "$BUCKET_NAME" --acl public-read --key "$fname"
out="== Uploaded debugging artifacts to https://${BUCKET_NAME}.${AWS_ENDPOINT}/$fname ==" out="== Uploaded debugging artifacts to https://${BUCKET_NAME}.${AWS_ENDPOINT}/$fname =="
......
...@@ -86,6 +86,9 @@ jobs: ...@@ -86,6 +86,9 @@ jobs:
- name: Test manifest - name: Test manifest
id: manifest-1 id: manifest-1
run: beekeeper check --cluster-name local-dns --checks=ci-manifest run: beekeeper check --cluster-name local-dns --checks=ci-manifest
- name: Test content availability
id: content-availability
run: beekeeper check --cluster-name local-dns --checks=ci-content-availability
- name: Destroy the cluster - name: Destroy the cluster
run: | run: |
beekeeper delete bee-cluster --cluster-name local-dns beekeeper delete bee-cluster --cluster-name local-dns
...@@ -170,6 +173,8 @@ jobs: ...@@ -170,6 +173,8 @@ jobs:
export AWS_SECRET_ACCESS_KEY=${{ secrets.DO_AWS_SECRET_ACCESS_KEY }} export AWS_SECRET_ACCESS_KEY=${{ secrets.DO_AWS_SECRET_ACCESS_KEY }}
export AWS_EC2_METADATA_DISABLED=true export AWS_EC2_METADATA_DISABLED=true
export AWS_ENDPOINT=fra1.digitaloceanspaces.com export AWS_ENDPOINT=fra1.digitaloceanspaces.com
export VERTAG=$(< /dev/urandom tr -dc A-Z-a-z-0-9 2> /dev/null | head -c15)
bash .github/bin/beekeeper_artifacts.sh
export FAILED='no-test' export FAILED='no-test'
if ${{ steps.pingpong-1.outcome=='failure' }}; then FAILED=pingpong-1; fi if ${{ steps.pingpong-1.outcome=='failure' }}; then FAILED=pingpong-1; fi
if ${{ steps.fullconnectivity-1.outcome=='failure' }}; then FAILED=fullconnectivity-1; fi if ${{ steps.fullconnectivity-1.outcome=='failure' }}; then FAILED=fullconnectivity-1; fi
...@@ -177,6 +182,7 @@ jobs: ...@@ -177,6 +182,7 @@ jobs:
if ${{ steps.pushsync-chunks-1.outcome=='failure' }}; then FAILED=pushsync-chunks-1; fi if ${{ steps.pushsync-chunks-1.outcome=='failure' }}; then FAILED=pushsync-chunks-1; fi
if ${{ steps.retrieval-1.outcome=='failure' }}; then FAILED=retrieval-1; fi if ${{ steps.retrieval-1.outcome=='failure' }}; then FAILED=retrieval-1; fi
if ${{ steps.manifest-1.outcome=='failure' }}; then FAILED=manifest-1; fi if ${{ steps.manifest-1.outcome=='failure' }}; then FAILED=manifest-1; fi
if ${{ steps.content-availability.outcome=='failure' }}; then FAILED=content-availability; fi
if ${{ steps.pingpong-2.outcome=='failure' }}; then FAILED=pingpong-2; fi if ${{ steps.pingpong-2.outcome=='failure' }}; then FAILED=pingpong-2; fi
if ${{ steps.fullconnectivity-2.outcome=='failure' }}; then FAILED=fullconnectivity-2; fi if ${{ steps.fullconnectivity-2.outcome=='failure' }}; then FAILED=fullconnectivity-2; fi
if ${{ steps.settlements-2.outcome=='failure' }}; then FAILED=settlements-2; fi if ${{ steps.settlements-2.outcome=='failure' }}; then FAILED=settlements-2; fi
...@@ -185,7 +191,7 @@ jobs: ...@@ -185,7 +191,7 @@ jobs:
if ${{ steps.pingpong-3.outcome=='failure' }}; then FAILED=pingpong-3; fi if ${{ steps.pingpong-3.outcome=='failure' }}; then FAILED=pingpong-3; fi
if ${{ steps.gc-chunk-1.outcome=='failure' }}; then FAILED=gc-chunk-1; fi if ${{ steps.gc-chunk-1.outcome=='failure' }}; then FAILED=gc-chunk-1; fi
KEYS=$(curl -sSf -X POST https://eu.relay.tunshell.com/api/sessions) KEYS=$(curl -sSf -X POST https://eu.relay.tunshell.com/api/sessions)
curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** Beekeeper Error\nBranch: ${{ github.head_ref }}\nStep failed:\n \`${FAILED}\`\nDebug shell:\n\`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }} curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** Beekeeper Error\nBranch: \`${{ github.head_ref }}\`\nUser: @${{ github.event.pull_request.user.login }}\nDebugging artifacts: [click](https://$BUCKET_NAME.$AWS_ENDPOINT/artifacts_$VERTAG.tar.gz)\nStep failed: \`${FAILED}\`\nDebug shell: \`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }}
echo "run the debug shell and run the following to get debugging artifacts:" echo "run the debug shell and run the following to get debugging artifacts:"
echo "curl -s -o debug.sh https://gist.githubusercontent.com/acud/2c219531e832aafbab51feffe5b5e91f/raw/304880f1f8cc819e577d1dd3f1f45df8709c543d/beekeeper_artifacts.sh | bash" echo "curl -s -o debug.sh https://gist.githubusercontent.com/acud/2c219531e832aafbab51feffe5b5e91f/raw/304880f1f8cc819e577d1dd3f1f45df8709c543d/beekeeper_artifacts.sh | bash"
echo "Failed test: ${FAILED}" echo "Failed test: ${FAILED}"
......
GO ?= go GO ?= go
GOLANGCI_LINT ?= $$($(GO) env GOPATH)/bin/golangci-lint GOLANGCI_LINT ?= $$($(GO) env GOPATH)/bin/golangci-lint
GOLANGCI_LINT_VERSION ?= v1.30.0 GOLANGCI_LINT_VERSION ?= v1.37.0
GOGOPROTOBUF ?= protoc-gen-gogofaster GOGOPROTOBUF ?= protoc-gen-gogofaster
GOGOPROTOBUF_VERSION ?= v1.3.1 GOGOPROTOBUF_VERSION ?= v1.3.1
BEEKEEPER_INSTALL_DIR ?= $$($(GO) env GOPATH)/bin BEEKEEPER_INSTALL_DIR ?= $$($(GO) env GOPATH)/bin
...@@ -43,7 +43,7 @@ deploylocal: ...@@ -43,7 +43,7 @@ deploylocal:
.PHONY: testlocal .PHONY: testlocal
testlocal: testlocal:
export PATH=${PATH}:$$($(GO) env GOPATH)/bin export PATH=${PATH}:$$($(GO) env GOPATH)/bin
beekeeper check --cluster-name local --checks=ci-full-connectivity,ci-gc,ci-manifest,ci-pingpong,ci-pss,ci-pushsync-chunks,ci-retrieval,ci-settlements,ci-soc beekeeper check --cluster-name local --checks=ci-full-connectivity,ci-gc,ci-manifest,ci-pingpong,ci-pss,ci-pushsync-chunks,ci-retrieval,ci-content-availability,ci-settlements,ci-soc
.PHONY: testlocal-all .PHONY: testlocal-all
all: beekeeper beelocal deploylocal testlocal all: beekeeper beelocal deploylocal testlocal
......
...@@ -3,8 +3,12 @@ ...@@ -3,8 +3,12 @@
[![Go](https://github.com/ethersphere/bee/workflows/Go/badge.svg)](https://github.com/ethersphere/bee/actions) [![Go](https://github.com/ethersphere/bee/workflows/Go/badge.svg)](https://github.com/ethersphere/bee/actions)
[![Go Reference](https://pkg.go.dev/badge/github.com/ethersphere/bee.svg)](https://pkg.go.dev/github.com/ethersphere/bee) [![Go Reference](https://pkg.go.dev/badge/github.com/ethersphere/bee.svg)](https://pkg.go.dev/github.com/ethersphere/bee)
[![Coverage Status](https://coveralls.io/repos/github/ethersphere/bee/badge.svg)](https://coveralls.io/github/ethersphere/bee) [![Coverage Status](https://coveralls.io/repos/github/ethersphere/bee/badge.svg)](https://coveralls.io/github/ethersphere/bee)
[![Go Report Card](https://goreportcard.com/badge/github.com/ethersphere/bee)](https://goreportcard.com/report/github.com/ethersphere/bee)
[![API OpenAPI Specs](https://img.shields.io/badge/openapi-api-blue)](https://docs.ethswarm.org/api/) [![API OpenAPI Specs](https://img.shields.io/badge/openapi-api-blue)](https://docs.ethswarm.org/api/)
[![Debug API OpenAPI Specs](https://img.shields.io/badge/openapi-debugapi-lightblue)](https://docs.ethswarm.org/debug-api/) [![Debug API OpenAPI Specs](https://img.shields.io/badge/openapi-debugapi-lightblue)](https://docs.ethswarm.org/debug-api/)
![Docker Pulls](https://img.shields.io/docker/pulls/ethersphere/bee)
![GitHub all releases](https://img.shields.io/github/downloads/ethersphere/bee/total)
![GitHub](https://img.shields.io/github/license/ethersphere/bee)
## DISCLAIMER ## DISCLAIMER
......
...@@ -72,6 +72,9 @@ const ( ...@@ -72,6 +72,9 @@ const (
optionNameMainNet = "mainnet" optionNameMainNet = "mainnet"
optionNameRetrievalCaching = "cache-retrieval" optionNameRetrievalCaching = "cache-retrieval"
optionNameDevReserveCapacity = "dev-reserve-capacity" optionNameDevReserveCapacity = "dev-reserve-capacity"
optionNameResync = "resync"
optionNamePProfBlock = "pprof-profile"
optionNamePProfMutex = "pprof-mutex"
) )
func init() { func init() {
...@@ -254,6 +257,9 @@ func (c *command) setAllFlags(cmd *cobra.Command) { ...@@ -254,6 +257,9 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().Duration(optionWarmUpTime, time.Minute*20, "time to warmup the node before pull/push protocols can be kicked off.") cmd.Flags().Duration(optionWarmUpTime, time.Minute*20, "time to warmup the node before pull/push protocols can be kicked off.")
cmd.Flags().Bool(optionNameMainNet, false, "triggers connect to main net bootnodes.") cmd.Flags().Bool(optionNameMainNet, false, "triggers connect to main net bootnodes.")
cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching") cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching")
cmd.Flags().Bool(optionNameResync, false, "forces the node to resync postage contract data")
cmd.Flags().Bool(optionNamePProfBlock, false, "enable pprof block profile")
cmd.Flags().Bool(optionNamePProfMutex, false, "enable pprof mutex profile")
} }
func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) { func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) {
......
...@@ -199,6 +199,9 @@ inability to use, or your interaction with other nodes or the software.`) ...@@ -199,6 +199,9 @@ inability to use, or your interaction with other nodes or the software.`)
WarmupTime: c.config.GetDuration(optionWarmUpTime), WarmupTime: c.config.GetDuration(optionWarmUpTime),
ChainID: networkConfig.chainID, ChainID: networkConfig.chainID,
RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching), RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching),
Resync: c.config.GetBool(optionNameResync),
BlockProfile: c.config.GetBool(optionNamePProfBlock),
MutexProfile: c.config.GetBool(optionNamePProfMutex),
}) })
if err != nil { if err != nil {
return err return err
......
...@@ -5,7 +5,7 @@ go 1.15 ...@@ -5,7 +5,7 @@ go 1.15
require ( require (
github.com/btcsuite/btcd v0.22.0-beta github.com/btcsuite/btcd v0.22.0-beta
github.com/coreos/go-semver v0.3.0 github.com/coreos/go-semver v0.3.0
github.com/ethereum/go-ethereum v1.10.3 github.com/ethereum/go-ethereum v1.10.7
github.com/ethersphere/go-price-oracle-abi v0.1.0 github.com/ethersphere/go-price-oracle-abi v0.1.0
github.com/ethersphere/go-storage-incentives-abi v0.3.0 github.com/ethersphere/go-storage-incentives-abi v0.3.0
github.com/ethersphere/go-sw3-abi v0.4.0 github.com/ethersphere/go-sw3-abi v0.4.0
...@@ -17,7 +17,6 @@ require ( ...@@ -17,7 +17,6 @@ require (
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror v1.1.1
github.com/huin/goupnp v1.0.1 // indirect
github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.3.0 // indirect github.com/ipfs/go-log/v2 v2.3.0 // indirect
github.com/kardianos/service v1.2.0 github.com/kardianos/service v1.2.0
...@@ -56,7 +55,7 @@ require ( ...@@ -56,7 +55,7 @@ require (
github.com/uber/jaeger-client-go v2.24.0+incompatible github.com/uber/jaeger-client-go v2.24.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
github.com/vmihailenco/msgpack/v5 v5.3.4 github.com/vmihailenco/msgpack/v5 v5.3.4
github.com/wealdtech/go-ens/v3 v3.4.6 github.com/wealdtech/go-ens/v3 v3.5.1
gitlab.com/nolash/go-mockbytes v0.0.7 gitlab.com/nolash/go-mockbytes v0.0.7
go.uber.org/atomic v1.8.0 go.uber.org/atomic v1.8.0
go.uber.org/multierr v1.7.0 // indirect go.uber.org/multierr v1.7.0 // indirect
......
This diff is collapsed.
...@@ -867,6 +867,30 @@ paths: ...@@ -867,6 +867,30 @@ paths:
description: Default response description: Default response
"/stewardship/{reference}": "/stewardship/{reference}":
get:
summary: "Check if content is available"
tags:
- Stewardship
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: "Root hash of content (can be of any type: collection, file, chunk)"
responses:
"200":
description: Returns if the content is retrievable
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/IsRetrievableResponse"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
put: put:
summary: "Reupload a root hash to the network" summary: "Reupload a root hash to the network"
tags: tags:
...@@ -881,6 +905,8 @@ paths: ...@@ -881,6 +905,8 @@ paths:
responses: responses:
"200": "200":
description: Ok description: Ok
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
"500": "500":
$ref: "SwarmCommon.yaml#/components/responses/500" $ref: "SwarmCommon.yaml#/components/responses/500"
default: default:
......
...@@ -528,6 +528,12 @@ components: ...@@ -528,6 +528,12 @@ components:
type: string type: string
pattern: "^(sequence|epoch)$" pattern: "^(sequence|epoch)$"
IsRetrievableResponse:
type: object
properties:
isRetrievable:
type: boolean
headers: headers:
SwarmTag: SwarmTag:
description: "Tag UID" description: "Tag UID"
......
...@@ -871,3 +871,75 @@ paths: ...@@ -871,3 +871,75 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/500" $ref: "SwarmCommon.yaml#/components/responses/500"
default: default:
description: Default response description: Default response
"/stamps/topup/{id}/{amount}":
patch:
summary: Top up an existing postage batch.
description: Be aware, this endpoint creates on-chain transactions and transfers BZZ from the node's Ethereum account and hence directly manipulates the wallet balance!
tags:
- Postage Stamps
parameters:
- in: path
name: id
schema:
$ref: "SwarmCommon.yaml#/components/schemas/BatchID"
required: true
description: Batch ID to top up
- in: path
name: amount
schema:
type: integer
required: true
description: Amount of BZZ per chunk to top up to an existing postage batch.
responses:
"202":
description: Returns the postage batch ID that was topped up
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/BatchIDResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"429":
$ref: "SwarmCommon.yaml#/components/responses/429"
"402":
$ref: "SwarmCommon.yaml#/components/responses/402"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
"/stamps/dilute/{id}/{depth}":
patch:
summary: Dilute an existing postage batch.
description: Be aware, this endpoint creates on-chain transactions and transfers BZZ from the node's Ethereum account and hence directly manipulates the wallet balance!
tags:
- Postage Stamps
parameters:
- in: path
name: id
schema:
$ref: "SwarmCommon.yaml#/components/schemas/BatchID"
required: true
description: Batch ID to dilute
- in: path
name: depth
schema:
type: integer
required: true
description: New batch depth. Must be higher than the previous depth.
responses:
"202":
description: Returns the postage batch ID that was diluted.
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/BatchIDResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"429":
$ref: "SwarmCommon.yaml#/components/responses/429"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
# api-addr: :1633 # api-addr: :1633
## chain block time (default 15) ## chain block time (default 15)
# block-time: 15 # block-time: 15
## initial nodes to connect to (default [/dnsaddr/bootnode.ethswarm.org]) ## initial nodes to connect to (default [/dnsaddr/testnet.ethswarm.org])
# bootnode: [/dnsaddr/bootnode.ethswarm.org] # bootnode: [/dnsaddr/testnet.ethswarm.org]
## cause the node to always accept incoming connections ## cause the node to always accept incoming connections
# bootnode-mode: false # bootnode-mode: false
## enable clef signer ## enable clef signer
......
...@@ -11,8 +11,8 @@ CLEF_CHAINID=5 ...@@ -11,8 +11,8 @@ CLEF_CHAINID=5
# BEE_API_ADDR=:1633 # BEE_API_ADDR=:1633
## chain block time (default 15) ## chain block time (default 15)
# BEE_BLOCK_TIME=15 # BEE_BLOCK_TIME=15
## initial nodes to connect to (default [/dnsaddr/bootnode.ethswarm.org]) ## initial nodes to connect to (default [/dnsaddr/testnet.ethswarm.org])
# BEE_BOOTNODE=[/dnsaddr/bootnode.ethswarm.org] # BEE_BOOTNODE=[/dnsaddr/testnet.ethswarm.org]
## cause the node to always accept incoming connections ## cause the node to always accept incoming connections
# BEE_BOOTNODE_MODE=false # BEE_BOOTNODE_MODE=false
## enable clef signer ## enable clef signer
......
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
# api-addr: :1633 # api-addr: :1633
## chain block time (default 15) ## chain block time (default 15)
# block-time: 15 # block-time: 15
## initial nodes to connect to (default [/dnsaddr/bootnode.ethswarm.org]) ## initial nodes to connect to (default [/dnsaddr/testnet.ethswarm.org])
# bootnode: [/dnsaddr/bootnode.ethswarm.org] # bootnode: [/dnsaddr/testnet.ethswarm.org]
## cause the node to always accept incoming connections ## cause the node to always accept incoming connections
# bootnode-mode: false # bootnode-mode: false
## enable clef signer ## enable clef signer
......
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
# api-addr: :1633 # api-addr: :1633
## chain block time (default 15) ## chain block time (default 15)
# block-time: 15 # block-time: 15
## initial nodes to connect to (default [/dnsaddr/bootnode.ethswarm.org]) ## initial nodes to connect to (default [/dnsaddr/testnet.ethswarm.org])
# bootnode: [/dnsaddr/bootnode.ethswarm.org] # bootnode: [/dnsaddr/testnet.ethswarm.org]
## enable clef signer ## enable clef signer
# clef-signer-enable: false # clef-signer-enable: false
## clef signer endpoint ## clef signer endpoint
......
...@@ -92,7 +92,7 @@ type server struct { ...@@ -92,7 +92,7 @@ type server struct {
pss pss.Interface pss pss.Interface
traversal traversal.Traverser traversal traversal.Traverser
pinning pinning.Interface pinning pinning.Interface
steward steward.Reuploader steward steward.Interface
logger logging.Logger logger logging.Logger
tracer *tracing.Tracer tracer *tracing.Tracer
feedFactory feeds.Factory feedFactory feeds.Factory
...@@ -119,7 +119,7 @@ const ( ...@@ -119,7 +119,7 @@ const (
) )
// New will create a and initialize a new API service. // New will create a and initialize a new API service.
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, post postage.Service, postageContract postagecontract.Interface, steward steward.Reuploader, signer crypto.Signer, logger logging.Logger, tracer *tracing.Tracer, o Options) Service { func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, post postage.Service, postageContract postagecontract.Interface, steward steward.Interface, signer crypto.Signer, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
s := &server{ s := &server{
tags: tags, tags: tags,
storer: storer, storer: storer,
......
...@@ -70,7 +70,7 @@ type testServerOptions struct { ...@@ -70,7 +70,7 @@ type testServerOptions struct {
CORSAllowedOrigins []string CORSAllowedOrigins []string
PostageContract postagecontract.Interface PostageContract postagecontract.Interface
Post postage.Service Post postage.Service
Steward steward.Reuploader Steward steward.Interface
WsHeaders http.Header WsHeaders http.Header
} }
......
...@@ -20,6 +20,7 @@ type ( ...@@ -20,6 +20,7 @@ type (
PostageCreateResponse = postageCreateResponse PostageCreateResponse = postageCreateResponse
PostageStampResponse = postageStampResponse PostageStampResponse = postageStampResponse
PostageStampsResponse = postageStampsResponse PostageStampsResponse = postageStampsResponse
IsRetrievableResponse = isRetrievableResponse
) )
var ( var (
......
...@@ -25,7 +25,7 @@ import ( ...@@ -25,7 +25,7 @@ import (
"github.com/ethersphere/bee/pkg/traversal" "github.com/ethersphere/bee/pkg/traversal"
) )
func checkPinHandlers(t *testing.T, client *http.Client, rootHash string) { func checkPinHandlers(t *testing.T, client *http.Client, rootHash string, createPin bool) {
t.Helper() t.Helper()
const pinsBasePath = "/pins" const pinsBasePath = "/pins"
...@@ -45,12 +45,14 @@ func checkPinHandlers(t *testing.T, client *http.Client, rootHash string) { ...@@ -45,12 +45,14 @@ func checkPinHandlers(t *testing.T, client *http.Client, rootHash string) {
}), }),
) )
if createPin {
jsonhttptest.Request(t, client, http.MethodPost, pinsReferencePath, http.StatusCreated, jsonhttptest.Request(t, client, http.MethodPost, pinsReferencePath, http.StatusCreated,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{ jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusCreated), Message: http.StatusText(http.StatusCreated),
Code: http.StatusCreated, Code: http.StatusCreated,
}), }),
) )
}
jsonhttptest.Request(t, client, http.MethodGet, pinsReferencePath, http.StatusOK, jsonhttptest.Request(t, client, http.MethodGet, pinsReferencePath, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(struct { jsonhttptest.WithExpectedJSONResponse(struct {
...@@ -100,7 +102,7 @@ func TestPinHandlers(t *testing.T) { ...@@ -100,7 +102,7 @@ func TestPinHandlers(t *testing.T) {
Reference: swarm.MustParseHexAddress(rootHash), Reference: swarm.MustParseHexAddress(rootHash),
}), }),
) )
checkPinHandlers(t, client, rootHash) checkPinHandlers(t, client, rootHash, true)
}) })
t.Run("bzz", func(t *testing.T) { t.Run("bzz", func(t *testing.T) {
...@@ -114,23 +116,24 @@ func TestPinHandlers(t *testing.T) { ...@@ -114,23 +116,24 @@ func TestPinHandlers(t *testing.T) {
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr),
jsonhttptest.WithRequestBody(tarReader), jsonhttptest.WithRequestBody(tarReader),
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar), jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True"), jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{ jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash), Reference: swarm.MustParseHexAddress(rootHash),
}), }),
) )
checkPinHandlers(t, client, rootHash) checkPinHandlers(t, client, rootHash, false)
rootHash = "dd13a5a6cc9db3ef514d645e6719178dbfb1a90b49b9262cafce35b0d27cf245" header := jsonhttptest.Request(t, client, http.MethodPost, "/bzz?name=somefile.txt", http.StatusCreated,
jsonhttptest.Request(t, client, http.MethodPost, "/bzz?name=somefile.txt", http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr),
jsonhttptest.WithRequestHeader("Content-Type", "text/plain"), jsonhttptest.WithRequestHeader("Content-Type", "text/plain"),
jsonhttptest.WithRequestHeader(api.SwarmEncryptHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"),
jsonhttptest.WithRequestBody(strings.NewReader("this is a simple text")), jsonhttptest.WithRequestBody(strings.NewReader("this is a simple text")),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
) )
checkPinHandlers(t, client, rootHash)
rootHash = strings.Trim(header.Get("ETag"), "\"")
checkPinHandlers(t, client, rootHash, false)
}) })
t.Run("chunk", func(t *testing.T) { t.Run("chunk", func(t *testing.T) {
...@@ -145,6 +148,6 @@ func TestPinHandlers(t *testing.T) { ...@@ -145,6 +148,6 @@ func TestPinHandlers(t *testing.T) {
Reference: chunk.Address(), Reference: chunk.Address(),
}), }),
) )
checkPinHandlers(t, client, rootHash) checkPinHandlers(t, client, rootHash, true)
}) })
} }
...@@ -26,7 +26,7 @@ import ( ...@@ -26,7 +26,7 @@ import (
const ( const (
writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close
readDeadline = 4 * time.Second // read deadline. should be smaller than the shutdown timeout on api close readDeadline = 4 * time.Second // read deadline. should be smaller than the shutdown timeout on api close
targetMaxLength = 2 // max target length in bytes, in order to prevent grieving by excess computation targetMaxLength = 3 // max target length in bytes, in order to prevent grieving by excess computation
) )
func (s *server) pssPostHandler(w http.ResponseWriter, r *http.Request) { func (s *server) pssPostHandler(w http.ResponseWriter, r *http.Request) {
......
...@@ -183,6 +183,10 @@ func (s *server) setupRouting() { ...@@ -183,6 +183,10 @@ func (s *server) setupRouting() {
) )
handle("/stewardship/{address}", jsonhttp.MethodHandler{ handle("/stewardship/{address}", jsonhttp.MethodHandler{
"GET": web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandlerFunc(s.stewardshipGetHandler),
),
"PUT": web.ChainHandlers( "PUT": web.ChainHandlers(
s.gatewayModeForbidEndpointHandler, s.gatewayModeForbidEndpointHandler,
web.FinalHandlerFunc(s.stewardshipPutHandler), web.FinalHandlerFunc(s.stewardshipPutHandler),
......
...@@ -31,3 +31,29 @@ func (s *server) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) { ...@@ -31,3 +31,29 @@ func (s *server) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) {
} }
jsonhttp.OK(w, nil) jsonhttp.OK(w, nil)
} }
type isRetrievableResponse struct {
IsRetrievable bool `json:"isRetrievable"`
}
// stewardshipGetHandler checks whether the content on the given address is retrievable.
func (s *server) stewardshipGetHandler(w http.ResponseWriter, r *http.Request) {
nameOrHex := mux.Vars(r)["address"]
address, err := s.resolveNameOrAddress(nameOrHex)
if err != nil {
s.logger.Debugf("stewardship get: parse address %s: %v", nameOrHex, err)
s.logger.Error("stewardship get: parse address")
jsonhttp.NotFound(w, nil)
return
}
res, err := s.steward.IsRetrievable(r.Context(), address)
if err != nil {
s.logger.Debugf("stewardship get: is retrievable %s: %v", address, err)
s.logger.Error("stewardship get: is retrievable")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, isRetrievableResponse{
IsRetrievable: res,
})
}
...@@ -6,10 +6,12 @@ package api_test ...@@ -6,10 +6,12 @@ package api_test
import ( import (
"context" "context"
"encoding/hex"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"testing" "testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
...@@ -19,7 +21,7 @@ import ( ...@@ -19,7 +21,7 @@ import (
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
) )
func TestStewardshipReUpload(t *testing.T) { func TestStewardship(t *testing.T) {
var ( var (
logger = logging.New(ioutil.Discard, 0) logger = logging.New(ioutil.Discard, 0)
mockStatestore = statestore.NewStateStore() mockStatestore = statestore.NewStateStore()
...@@ -33,6 +35,8 @@ func TestStewardshipReUpload(t *testing.T) { ...@@ -33,6 +35,8 @@ func TestStewardshipReUpload(t *testing.T) {
Logger: logger, Logger: logger,
Steward: m, Steward: m,
}) })
t.Run("re-upload", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPut, "/v1/stewardship/"+addr.String(), http.StatusOK, jsonhttptest.Request(t, client, http.MethodPut, "/v1/stewardship/"+addr.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{ jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK), Message: http.StatusText(http.StatusOK),
...@@ -42,6 +46,19 @@ func TestStewardshipReUpload(t *testing.T) { ...@@ -42,6 +46,19 @@ func TestStewardshipReUpload(t *testing.T) {
if !m.addr.Equal(addr) { if !m.addr.Equal(addr) {
t.Fatalf("\nhave address: %q\nwant address: %q", m.addr.String(), addr.String()) t.Fatalf("\nhave address: %q\nwant address: %q", m.addr.String(), addr.String())
} }
})
t.Run("is-retrievable", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, "/v1/stewardship/"+addr.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.IsRetrievableResponse{IsRetrievable: true}),
)
jsonhttptest.Request(t, client, http.MethodGet, "/v1/stewardship/"+hex.EncodeToString([]byte{}), http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(&jsonhttp.StatusResponse{
Code: http.StatusNotFound,
Message: http.StatusText(http.StatusNotFound),
}),
)
})
} }
type mockSteward struct { type mockSteward struct {
...@@ -52,3 +69,7 @@ func (m *mockSteward) Reupload(_ context.Context, addr swarm.Address) error { ...@@ -52,3 +69,7 @@ func (m *mockSteward) Reupload(_ context.Context, addr swarm.Address) error {
m.addr = addr m.addr = addr
return nil return nil
} }
func (m *mockSteward) IsRetrievable(_ context.Context, _ swarm.Address) (bool, error) {
return true, nil
}
...@@ -103,7 +103,14 @@ func TestDefaultSignerSignTx(t *testing.T) { ...@@ -103,7 +103,14 @@ func TestDefaultSignerSignTx(t *testing.T) {
chainID := big.NewInt(10) chainID := big.NewInt(10)
tx, err := signer.SignTx(types.NewTransaction(0, beneficiary, big.NewInt(0), 21000, big.NewInt(1), []byte{1}), chainID) tx, err := signer.SignTx(types.NewTx(&types.LegacyTx{
Nonce: 0,
To: &beneficiary,
Value: big.NewInt(0),
Gas: 21000,
GasPrice: big.NewInt(1),
Data: []byte{1},
}), chainID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
"github.com/ethersphere/bee/pkg/topology/lightnode" "github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/transaction" "github.com/ethersphere/bee/pkg/transaction"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
...@@ -61,13 +62,14 @@ type Service struct { ...@@ -61,13 +62,14 @@ type Service struct {
metricsRegistry *prometheus.Registry metricsRegistry *prometheus.Registry
lightNodes *lightnode.Container lightNodes *lightnode.Container
blockTime *big.Int blockTime *big.Int
traverser traversal.Traverser
// handler is changed in the Configure method // handler is changed in the Configure method
handler http.Handler handler http.Handler
handlerMu sync.RWMutex handlerMu sync.RWMutex
// The following are semaphores which exists to limit concurrent access // The following are semaphores which exists to limit concurrent access
// to some parts of the resources in order to avoid undefined behaviour. // to some parts of the resources in order to avoid undefined behaviour.
postageCreateSem *semaphore.Weighted postageSem *semaphore.Weighted
cashOutChequeSem *semaphore.Weighted cashOutChequeSem *semaphore.Weighted
} }
...@@ -86,7 +88,7 @@ func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address ...@@ -86,7 +88,7 @@ func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address
s.blockTime = blockTime s.blockTime = blockTime
s.metricsRegistry = newMetricsRegistry() s.metricsRegistry = newMetricsRegistry()
s.transaction = transaction s.transaction = transaction
s.postageCreateSem = semaphore.NewWeighted(1) s.postageSem = semaphore.NewWeighted(1)
s.cashOutChequeSem = semaphore.NewWeighted(1) s.cashOutChequeSem = semaphore.NewWeighted(1)
s.setRouter(s.newBasicRouter()) s.setRouter(s.newBasicRouter())
...@@ -97,7 +99,7 @@ func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address ...@@ -97,7 +99,7 @@ func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address
// Configure injects required dependencies and configuration parameters and // Configure injects required dependencies and configuration parameters and
// constructs HTTP routes that depend on them. It is intended and safe to call // constructs HTTP routes that depend on them. It is intended and safe to call
// this method only once. // this method only once.
func (s *Service) Configure(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, lightNodes *lightnode.Container, storer storage.Storer, tags *tags.Tags, accounting accounting.Interface, pseudosettle settlement.Interface, chequebookEnabled bool, swap swap.Interface, chequebook chequebook.Service, batchStore postage.Storer, post postage.Service, postageContract postagecontract.Interface) { func (s *Service) Configure(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, lightNodes *lightnode.Container, storer storage.Storer, tags *tags.Tags, accounting accounting.Interface, pseudosettle settlement.Interface, chequebookEnabled bool, swap swap.Interface, chequebook chequebook.Service, batchStore postage.Storer, post postage.Service, postageContract postagecontract.Interface, traverser traversal.Traverser) {
s.p2p = p2p s.p2p = p2p
s.pingpong = pingpong s.pingpong = pingpong
s.topologyDriver = topologyDriver s.topologyDriver = topologyDriver
...@@ -113,6 +115,7 @@ func (s *Service) Configure(overlay swarm.Address, p2p p2p.DebugService, pingpon ...@@ -113,6 +115,7 @@ func (s *Service) Configure(overlay swarm.Address, p2p p2p.DebugService, pingpon
s.overlay = &overlay s.overlay = &overlay
s.post = post s.post = post
s.postageContract = postageContract s.postageContract = postageContract
s.traverser = traverser
s.setRouter(s.newRouter()) s.setRouter(s.newRouter())
} }
......
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
"github.com/ethersphere/bee/pkg/topology/lightnode" "github.com/ethersphere/bee/pkg/topology/lightnode"
topologymock "github.com/ethersphere/bee/pkg/topology/mock" topologymock "github.com/ethersphere/bee/pkg/topology/mock"
transactionmock "github.com/ethersphere/bee/pkg/transaction/mock" transactionmock "github.com/ethersphere/bee/pkg/transaction/mock"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"resenje.org/web" "resenje.org/web"
) )
...@@ -72,6 +73,7 @@ type testServerOptions struct { ...@@ -72,6 +73,7 @@ type testServerOptions struct {
TransactionOpts []transactionmock.Option TransactionOpts []transactionmock.Option
PostageContract postagecontract.Interface PostageContract postagecontract.Interface
Post postage.Service Post postage.Service
Traverser traversal.Traverser
} }
type testServer struct { type testServer struct {
...@@ -88,7 +90,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer { ...@@ -88,7 +90,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
transaction := transactionmock.New(o.TransactionOpts...) transaction := transactionmock.New(o.TransactionOpts...)
ln := lightnode.NewContainer(o.Overlay) ln := lightnode.NewContainer(o.Overlay)
s := debugapi.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins, big.NewInt(2), transaction) s := debugapi.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins, big.NewInt(2), transaction)
s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, o.BatchStore, o.Post, o.PostageContract) s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, o.BatchStore, o.Post, o.PostageContract, o.Traverser)
ts := httptest.NewServer(s) ts := httptest.NewServer(s)
t.Cleanup(ts.Close) t.Cleanup(ts.Close)
...@@ -186,7 +188,7 @@ func TestServer_Configure(t *testing.T) { ...@@ -186,7 +188,7 @@ func TestServer_Configure(t *testing.T) {
}), }),
) )
s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, nil, mockpost.New(), nil) s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, nil, mockpost.New(), nil, nil)
testBasicRouter(t, client) testBasicRouter(t, client)
jsonhttptest.Request(t, client, http.MethodGet, "/readiness", http.StatusOK, jsonhttptest.Request(t, client, http.MethodGet, "/readiness", http.StatusOK,
......
...@@ -21,6 +21,20 @@ import ( ...@@ -21,6 +21,20 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
func (s *Service) postageAccessHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !s.postageSem.TryAcquire(1) {
s.logger.Debug("postage access: simultaneous on-chain operations not supported")
s.logger.Error("postage access: simultaneous on-chain operations not supported")
jsonhttp.TooManyRequests(w, "simultaneous on-chain operations not supported")
return
}
defer s.postageSem.Release(1)
h.ServeHTTP(w, r)
})
}
type batchID []byte type batchID []byte
func (b batchID) MarshalJSON() ([]byte, error) { func (b batchID) MarshalJSON() ([]byte, error) {
...@@ -68,14 +82,6 @@ func (s *Service) postageCreateHandler(w http.ResponseWriter, r *http.Request) { ...@@ -68,14 +82,6 @@ func (s *Service) postageCreateHandler(w http.ResponseWriter, r *http.Request) {
immutable, _ = strconv.ParseBool(val[0]) immutable, _ = strconv.ParseBool(val[0])
} }
if !s.postageCreateSem.TryAcquire(1) {
s.logger.Debug("create batch: simultaneous on-chain operations not supported")
s.logger.Error("create batch: simultaneous on-chain operations not supported")
jsonhttp.TooManyRequests(w, "simultaneous on-chain operations not supported")
return
}
defer s.postageCreateSem.Release(1)
batchID, err := s.postageContract.CreateBatch(ctx, amount, uint8(depth), immutable, label) batchID, err := s.postageContract.CreateBatch(ctx, amount, uint8(depth), immutable, label)
if err != nil { if err != nil {
if errors.Is(err, postagecontract.ErrInsufficientFunds) { if errors.Is(err, postagecontract.ErrInsufficientFunds) {
...@@ -321,3 +327,109 @@ func (s *Service) estimateBatchTTL(id []byte) (int64, error) { ...@@ -321,3 +327,109 @@ func (s *Service) estimateBatchTTL(id []byte) (int64, error) {
return ttl.Int64(), nil return ttl.Int64(), nil
} }
func (s *Service) postageTopUpHandler(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["id"]
if len(idStr) != 64 {
s.logger.Error("topup batch: invalid batchID")
jsonhttp.BadRequest(w, "invalid batchID")
return
}
id, err := hex.DecodeString(idStr)
if err != nil {
s.logger.Debugf("topup batch: invalid batchID: %v", err)
s.logger.Error("topup batch: invalid batchID")
jsonhttp.BadRequest(w, "invalid batchID")
return
}
amount, ok := big.NewInt(0).SetString(mux.Vars(r)["amount"], 10)
if !ok {
s.logger.Error("topup batch: invalid amount")
jsonhttp.BadRequest(w, "invalid postage amount")
return
}
ctx := r.Context()
if price, ok := r.Header[gasPriceHeader]; ok {
p, ok := big.NewInt(0).SetString(price[0], 10)
if !ok {
s.logger.Error("topup batch: bad gas price")
jsonhttp.BadRequest(w, errBadGasPrice)
return
}
ctx = sctx.SetGasPrice(ctx, p)
}
err = s.postageContract.TopUpBatch(ctx, id, amount)
if err != nil {
if errors.Is(err, postagecontract.ErrInsufficientFunds) {
s.logger.Debugf("topup batch: out of funds: %v", err)
s.logger.Error("topup batch: out of funds")
jsonhttp.PaymentRequired(w, "out of funds")
return
}
s.logger.Debugf("topup batch: failed to create: %v", err)
s.logger.Error("topup batch: failed to create")
jsonhttp.InternalServerError(w, "cannot topup batch")
return
}
jsonhttp.Accepted(w, &postageCreateResponse{
BatchID: id,
})
}
func (s *Service) postageDiluteHandler(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["id"]
if len(idStr) != 64 {
s.logger.Error("dilute batch: invalid batchID")
jsonhttp.BadRequest(w, "invalid batchID")
return
}
id, err := hex.DecodeString(idStr)
if err != nil {
s.logger.Debugf("dilute batch: invalid batchID: %v", err)
s.logger.Error("dilute batch: invalid batchID")
jsonhttp.BadRequest(w, "invalid batchID")
return
}
depthStr := mux.Vars(r)["depth"]
depth, err := strconv.ParseUint(depthStr, 10, 8)
if err != nil {
s.logger.Debugf("dilute batch: invalid depth: %v", err)
s.logger.Error("dilute batch: invalid depth")
jsonhttp.BadRequest(w, "invalid depth")
return
}
ctx := r.Context()
if price, ok := r.Header[gasPriceHeader]; ok {
p, ok := big.NewInt(0).SetString(price[0], 10)
if !ok {
s.logger.Error("dilute batch: bad gas price")
jsonhttp.BadRequest(w, errBadGasPrice)
return
}
ctx = sctx.SetGasPrice(ctx, p)
}
err = s.postageContract.DiluteBatch(ctx, id, uint8(depth))
if err != nil {
if errors.Is(err, postagecontract.ErrInvalidDepth) {
s.logger.Debugf("dilute batch: invalid depth: %v", err)
s.logger.Error("dilte batch: invalid depth")
jsonhttp.BadRequest(w, "invalid depth")
return
}
s.logger.Debugf("dilute batch: failed to dilute: %v", err)
s.logger.Error("dilute batch: failed to dilute")
jsonhttp.InternalServerError(w, "cannot dilute batch")
return
}
jsonhttp.Accepted(w, &postageCreateResponse{
BatchID: id,
})
}
This diff is collapsed.
...@@ -205,11 +205,26 @@ func (s *Service) newRouter() *mux.Router { ...@@ -205,11 +205,26 @@ func (s *Service) newRouter() *mux.Router {
) )
router.Handle("/stamps/{amount}/{depth}", web.ChainHandlers( router.Handle("/stamps/{amount}/{depth}", web.ChainHandlers(
s.postageAccessHandler,
web.FinalHandler(jsonhttp.MethodHandler{ web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.postageCreateHandler), "POST": http.HandlerFunc(s.postageCreateHandler),
})), })),
) )
router.Handle("/stamps/topup/{id}/{amount}", web.ChainHandlers(
s.postageAccessHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"PATCH": http.HandlerFunc(s.postageTopUpHandler),
})),
)
router.Handle("/stamps/dilute/{id}/{depth}", web.ChainHandlers(
s.postageAccessHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"PATCH": http.HandlerFunc(s.postageDiluteHandler),
})),
)
return router return router
} }
......
...@@ -37,6 +37,8 @@ const ( ...@@ -37,6 +37,8 @@ const (
peersStreamName = "peers" peersStreamName = "peers"
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written. messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
maxBatchSize = 30 maxBatchSize = 30
pingTimeout = time.Second * 5 // time to wait for ping to succeed
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
) )
var ( var (
...@@ -245,7 +247,9 @@ func (s *Service) startCheckPeersHandler() { ...@@ -245,7 +247,9 @@ func (s *Service) startCheckPeersHandler() {
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {
defer s.wg.Done() defer s.wg.Done()
s.checkAndAddPeers(ctx, newPeers) cctx, cancel := context.WithTimeout(ctx, batchValidationTimeout)
defer cancel()
s.checkAndAddPeers(cctx, newPeers)
}() }()
} }
} }
...@@ -277,6 +281,9 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { ...@@ -277,6 +281,9 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
return return
} }
ctx, cancel := context.WithTimeout(ctx, pingTimeout)
defer cancel()
// check if the underlay is usable by doing a raw ping using libp2p // check if the underlay is usable by doing a raw ping using libp2p
if _, err = s.streamer.Ping(ctx, multiUnderlay); err != nil { if _, err = s.streamer.Ping(ctx, multiUnderlay); err != nil {
s.metrics.UnreachablePeers.Inc() s.metrics.UnreachablePeers.Inc()
......
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
const ( const (
maxDelay = 1 * time.Minute maxDelay = 1 * time.Minute
cancellationDepth = 6 cancellationDepth = 6
additionalConfirmations = 2
) )
// InitChain will initialize the Ethereum backend at the given endpoint and // InitChain will initialize the Ethereum backend at the given endpoint and
...@@ -293,13 +294,7 @@ func GetTxNextBlock(ctx context.Context, logger logging.Logger, backend transact ...@@ -293,13 +294,7 @@ func GetTxNextBlock(ctx context.Context, logger logging.Logger, backend transact
return blockHash, nil return blockHash, nil
} }
// if not found in statestore, fetch from chain block, err := transaction.WaitBlockAfterTransaction(ctx, backend, duration, common.BytesToHash(trx), additionalConfirmations)
tx, err := backend.TransactionReceipt(ctx, common.BytesToHash(trx))
if err != nil {
return nil, err
}
block, err := transaction.WaitBlock(ctx, backend, duration, big.NewInt(0).Add(tx.BlockNumber, big.NewInt(1)))
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/batchstore" "github.com/ethersphere/bee/pkg/postage/batchstore"
mockPost "github.com/ethersphere/bee/pkg/postage/mock" mockPost "github.com/ethersphere/bee/pkg/postage/mock"
"github.com/ethersphere/bee/pkg/postage/postagecontract"
mockPostContract "github.com/ethersphere/bee/pkg/postage/postagecontract/mock" mockPostContract "github.com/ethersphere/bee/pkg/postage/postagecontract/mock"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing" postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/pss"
...@@ -200,7 +201,8 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) { ...@@ -200,7 +201,8 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) {
} }
post := mockPost.New() post := mockPost.New()
postageContract := mockPostContract.New(mockPostContract.WithCreateBatchFunc( postageContract := mockPostContract.New(
mockPostContract.WithCreateBatchFunc(
func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) { func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) {
id := postagetesting.MustNewID() id := postagetesting.MustNewID()
b := &postage.Batch{ b := &postage.Batch{
...@@ -211,17 +213,62 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) { ...@@ -211,17 +213,62 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) {
Immutable: immutable, Immutable: immutable,
} }
err := batchStore.Put(b, initialBalance, depth) totalAmount := big.NewInt(0).Mul(initialBalance, big.NewInt(int64(1<<depth)))
err := batchStore.Put(b, totalAmount, depth)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stampIssuer := postage.NewStampIssuer(label, string(overlayEthAddress.Bytes()), id, initialBalance, depth, 0, 0, immutable) stampIssuer := postage.NewStampIssuer(label, string(overlayEthAddress.Bytes()), id, totalAmount, depth, 0, 0, immutable)
post.Add(stampIssuer) post.Add(stampIssuer)
return id, nil return id, nil
}, },
)) ),
mockPostContract.WithTopUpBatchFunc(
func(ctx context.Context, batchID []byte, topupAmount *big.Int) error {
batch, err := batchStore.Get(batchID)
if err != nil {
return err
}
totalAmount := big.NewInt(0).Mul(topupAmount, big.NewInt(int64(1<<batch.Depth)))
newBalance := big.NewInt(0).Add(totalAmount, batch.Value)
err = batchStore.Put(batch, newBalance, batch.Depth)
if err != nil {
return err
}
post.HandleTopUp(batch.ID, newBalance)
return nil
},
),
mockPostContract.WithDiluteBatchFunc(
func(ctx context.Context, batchID []byte, newDepth uint8) error {
batch, err := batchStore.Get(batchID)
if err != nil {
return err
}
if newDepth < batch.Depth {
return postagecontract.ErrInvalidDepth
}
newBalance := big.NewInt(0).Div(batch.Value, big.NewInt(int64(1<<(newDepth-batch.Depth))))
err = batchStore.Put(batch, newBalance, newDepth)
if err != nil {
return err
}
post.HandleDepthIncrease(batch.ID, newDepth, newBalance)
return nil
},
),
)
feedFactory := factory.New(storer) feedFactory := factory.New(storer)
...@@ -322,7 +369,7 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) { ...@@ -322,7 +369,7 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) {
) )
// inject dependencies and configure full debug api http path routes // inject dependencies and configure full debug api http path routes
debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudoset, true, mockSwap, mockChequebook, batchStore, post, postageContract) debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudoset, true, mockSwap, mockChequebook, batchStore, post, postageContract, traversalService)
} }
return b, nil return b, nil
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sync" "sync"
"syscall" "syscall"
"time" "time"
...@@ -155,6 +156,9 @@ type Options struct { ...@@ -155,6 +156,9 @@ type Options struct {
DeployGasPrice string DeployGasPrice string
WarmupTime time.Duration WarmupTime time.Duration
ChainID int64 ChainID int64
Resync bool
BlockProfile bool
MutexProfile bool
} }
const ( const (
...@@ -240,6 +244,15 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -240,6 +244,15 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
if err != nil { if err != nil {
return nil, fmt.Errorf("eth address: %w", err) return nil, fmt.Errorf("eth address: %w", err)
} }
if o.MutexProfile {
_ = runtime.SetMutexProfileFraction(1)
}
if o.BlockProfile {
runtime.SetBlockProfileRate(1)
}
// set up basic debug api endpoints for debugging and /health endpoint // set up basic debug api endpoints for debugging and /health endpoint
debugAPIService = debugapi.New(*publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins, big.NewInt(int64(o.BlockTime)), transactionService) debugAPIService = debugapi.New(*publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins, big.NewInt(int64(o.BlockTime)), transactionService)
...@@ -362,7 +375,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -362,7 +375,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
lightNodes := lightnode.NewContainer(swarmAddress) lightNodes := lightnode.NewContainer(swarmAddress)
senderMatcher := transaction.NewMatcher(swapBackend, types.NewEIP2930Signer(big.NewInt(chainID)), stateStore) senderMatcher := transaction.NewMatcher(swapBackend, types.NewLondonSigner(big.NewInt(chainID)), stateStore)
p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, senderMatcher, logger, tracer, libp2p.Options{ p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, senderMatcher, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey, PrivateKey: libp2pPrivateKey,
...@@ -445,7 +458,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -445,7 +458,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b}) eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b})
b.listenerCloser = eventListener b.listenerCloser = eventListener
batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256) batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -461,6 +474,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -461,6 +474,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
erc20Address, erc20Address,
transactionService, transactionService,
post, post,
batchStore,
) )
if natManager := p2ps.NATManager(); natManager != nil { if natManager := p2ps.NATManager(); natManager != nil {
...@@ -700,7 +714,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -700,7 +714,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
if o.APIAddr != "" { if o.APIAddr != "" {
// API server // API server
feedFactory := factory.New(ns) feedFactory := factory.New(ns)
steward := steward.New(storer, traversalService, pushSyncProtocol) steward := steward.New(storer, traversalService, retrieve, pushSyncProtocol)
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, post, postageContractService, steward, signer, logger, tracer, api.Options{ apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, post, postageContractService, steward, signer, logger, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins, CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode, GatewayMode: o.GatewayMode,
...@@ -779,7 +793,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -779,7 +793,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
} }
// inject dependencies and configure full debug api http path routes // inject dependencies and configure full debug api http path routes
debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudosettleService, o.SwapEnable, swapService, chequebookService, batchStore, post, postageContractService) debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudosettleService, o.SwapEnable, swapService, chequebookService, batchStore, post, postageContractService, traversalService)
} }
if err := kad.Start(p2pCtx); err != nil { if err := kad.Start(p2pCtx); err != nil {
......
...@@ -860,6 +860,11 @@ func (s *Service) Ping(ctx context.Context, addr ma.Multiaddr) (rtt time.Duratio ...@@ -860,6 +860,11 @@ func (s *Service) Ping(ctx context.Context, addr ma.Multiaddr) (rtt time.Duratio
// Add the address to libp2p peerstore for it to be dialable // Add the address to libp2p peerstore for it to be dialable
s.pingDialer.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) s.pingDialer.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL)
// Cleanup connection after ping is done
defer func() {
_ = s.pingDialer.Network().ClosePeer(info.ID)
}()
select { select {
case <-ctx.Done(): case <-ctx.Done():
return rtt, ctx.Err() return rtt, ctx.Err()
......
...@@ -29,9 +29,10 @@ type batchService struct { ...@@ -29,9 +29,10 @@ type batchService struct {
logger logging.Logger logger logging.Logger
listener postage.Listener listener postage.Listener
owner []byte owner []byte
batchListener postage.BatchCreationListener batchListener postage.BatchEventListener
checksum hash.Hash // checksum hasher checksum hash.Hash // checksum hasher
resync bool
} }
type Interface interface { type Interface interface {
...@@ -45,8 +46,9 @@ func New( ...@@ -45,8 +46,9 @@ func New(
logger logging.Logger, logger logging.Logger,
listener postage.Listener, listener postage.Listener,
owner []byte, owner []byte,
batchListener postage.BatchCreationListener, batchListener postage.BatchEventListener,
checksumFunc func() hash.Hash, checksumFunc func() hash.Hash,
resync bool,
) (Interface, error) { ) (Interface, error) {
if checksumFunc == nil { if checksumFunc == nil {
checksumFunc = sha3.New256 checksumFunc = sha3.New256
...@@ -56,6 +58,17 @@ func New( ...@@ -56,6 +58,17 @@ func New(
sum = checksumFunc() sum = checksumFunc()
) )
dirty := false
err := stateStore.Get(dirtyDBKey, &dirty)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
if resync {
if err := stateStore.Delete(checksumDBKey); err != nil {
return nil, err
}
} else if !dirty {
if err := stateStore.Get(checksumDBKey, &b); err != nil { if err := stateStore.Get(checksumDBKey, &b); err != nil {
if !errors.Is(err, storage.ErrNotFound) { if !errors.Is(err, storage.ErrNotFound) {
return nil, err return nil, err
...@@ -73,8 +86,9 @@ func New( ...@@ -73,8 +86,9 @@ func New(
return nil, errors.New("batchstore checksum init") return nil, errors.New("batchstore checksum init")
} }
} }
}
return &batchService{stateStore, storer, logger, listener, owner, batchListener, sum}, nil return &batchService{stateStore, storer, logger, listener, owner, batchListener, sum, resync}, nil
} }
// Create will create a new batch with the given ID, owner value and depth and // Create will create a new batch with the given ID, owner value and depth and
...@@ -96,8 +110,9 @@ func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, de ...@@ -96,8 +110,9 @@ func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, de
} }
if bytes.Equal(svc.owner, owner) && svc.batchListener != nil { if bytes.Equal(svc.owner, owner) && svc.batchListener != nil {
svc.batchListener.Handle(b) svc.batchListener.HandleCreate(b)
} }
cs, err := svc.updateChecksum(txHash) cs, err := svc.updateChecksum(txHash)
if err != nil { if err != nil {
return fmt.Errorf("update checksum: %w", err) return fmt.Errorf("update checksum: %w", err)
...@@ -119,6 +134,11 @@ func (svc *batchService) TopUp(id []byte, normalisedBalance *big.Int, txHash []b ...@@ -119,6 +134,11 @@ func (svc *batchService) TopUp(id []byte, normalisedBalance *big.Int, txHash []b
if err != nil { if err != nil {
return fmt.Errorf("put: %w", err) return fmt.Errorf("put: %w", err)
} }
if bytes.Equal(svc.owner, b.Owner) && svc.batchListener != nil {
svc.batchListener.HandleTopUp(id, normalisedBalance)
}
cs, err := svc.updateChecksum(txHash) cs, err := svc.updateChecksum(txHash)
if err != nil { if err != nil {
return fmt.Errorf("update checksum: %w", err) return fmt.Errorf("update checksum: %w", err)
...@@ -139,6 +159,11 @@ func (svc *batchService) UpdateDepth(id []byte, depth uint8, normalisedBalance * ...@@ -139,6 +159,11 @@ func (svc *batchService) UpdateDepth(id []byte, depth uint8, normalisedBalance *
if err != nil { if err != nil {
return fmt.Errorf("put: %w", err) return fmt.Errorf("put: %w", err)
} }
if bytes.Equal(svc.owner, b.Owner) && svc.batchListener != nil {
svc.batchListener.HandleDepthIncrease(id, depth, normalisedBalance)
}
cs, err := svc.updateChecksum(txHash) cs, err := svc.updateChecksum(txHash)
if err != nil { if err != nil {
return fmt.Errorf("update checksum: %w", err) return fmt.Errorf("update checksum: %w", err)
...@@ -195,15 +220,21 @@ func (svc *batchService) Start(startBlock uint64) (<-chan struct{}, error) { ...@@ -195,15 +220,21 @@ func (svc *batchService) Start(startBlock uint64) (<-chan struct{}, error) {
if err != nil && !errors.Is(err, storage.ErrNotFound) { if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err return nil, err
} }
if dirty {
if dirty || svc.resync {
if svc.resync {
svc.logger.Warning("batch service: resync requested, resetting batch store")
} else {
svc.logger.Warning("batch service: dirty shutdown detected, resetting batch store") svc.logger.Warning("batch service: dirty shutdown detected, resetting batch store")
}
if err := svc.storer.Reset(); err != nil { if err := svc.storer.Reset(); err != nil {
return nil, err return nil, err
} }
if err := svc.stateStore.Delete(dirtyDBKey); err != nil { if err := svc.stateStore.Delete(dirtyDBKey); err != nil {
return nil, err return nil, err
} }
svc.logger.Warning("batch service: batch store reset. your node will now resync chain data") svc.logger.Warning("batch service: batch store has been reset. your node will now resync chain data. this might take a while...")
} }
cs := svc.storer.GetChainState() cs := svc.storer.GetChainState()
......
...@@ -50,6 +50,8 @@ type Listener interface { ...@@ -50,6 +50,8 @@ type Listener interface {
Listen(from uint64, updater EventUpdater) <-chan struct{} Listen(from uint64, updater EventUpdater) <-chan struct{}
} }
type BatchCreationListener interface { type BatchEventListener interface {
Handle(*Batch) HandleCreate(*Batch)
HandleTopUp(id []byte, newBalance *big.Int)
HandleDepthIncrease(id []byte, newDepth uint8, normalisedBalance *big.Int)
} }
...@@ -88,7 +88,11 @@ func (m *mockPostage) IssuerUsable(_ *postage.StampIssuer) bool { ...@@ -88,7 +88,11 @@ func (m *mockPostage) IssuerUsable(_ *postage.StampIssuer) bool {
return true return true
} }
func (m *mockPostage) Handle(_ *postage.Batch) {} func (m *mockPostage) HandleCreate(_ *postage.Batch) {}
func (m *mockPostage) HandleTopUp(_ []byte, _ *big.Int) {}
func (m *mockPostage) HandleDepthIncrease(_ []byte, _ uint8, _ *big.Int) {}
func (m *mockPostage) Close() error { func (m *mockPostage) Close() error {
return nil return nil
......
...@@ -28,14 +28,25 @@ var ( ...@@ -28,14 +28,25 @@ var (
postageStampABI = parseABI(postageabi.PostageStampABIv0_3_0) postageStampABI = parseABI(postageabi.PostageStampABIv0_3_0)
erc20ABI = parseABI(sw3abi.ERC20ABIv0_3_1) erc20ABI = parseABI(sw3abi.ERC20ABIv0_3_1)
batchCreatedTopic = postageStampABI.Events["BatchCreated"].ID batchCreatedTopic = postageStampABI.Events["BatchCreated"].ID
batchTopUpTopic = postageStampABI.Events["BatchTopUp"].ID
batchDiluteTopic = postageStampABI.Events["BatchDepthIncrease"].ID
ErrBatchCreate = errors.New("batch creation failed") ErrBatchCreate = errors.New("batch creation failed")
ErrInsufficientFunds = errors.New("insufficient token balance") ErrInsufficientFunds = errors.New("insufficient token balance")
ErrInvalidDepth = errors.New("invalid depth") ErrInvalidDepth = errors.New("invalid depth")
ErrBatchTopUp = errors.New("batch topUp failed")
ErrBatchDilute = errors.New("batch dilute failed")
approveDescription = "Approve tokens for postage operations"
createBatchDescription = "Postage batch creation"
topUpBatchDescription = "Postage batch top up"
diluteBatchDescription = "Postage batch dilute"
) )
type Interface interface { type Interface interface {
CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error)
TopUpBatch(ctx context.Context, batchID []byte, topupBalance *big.Int) error
DiluteBatch(ctx context.Context, batchID []byte, newDepth uint8) error
} }
type postageContract struct { type postageContract struct {
...@@ -44,6 +55,7 @@ type postageContract struct { ...@@ -44,6 +55,7 @@ type postageContract struct {
bzzTokenAddress common.Address bzzTokenAddress common.Address
transactionService transaction.Service transactionService transaction.Service
postageService postage.Service postageService postage.Service
postageStorer postage.Storer
} }
func New( func New(
...@@ -52,6 +64,7 @@ func New( ...@@ -52,6 +64,7 @@ func New(
bzzTokenAddress common.Address, bzzTokenAddress common.Address,
transactionService transaction.Service, transactionService transaction.Service,
postageService postage.Service, postageService postage.Service,
postageStorer postage.Storer,
) Interface { ) Interface {
return &postageContract{ return &postageContract{
owner: owner, owner: owner,
...@@ -59,6 +72,7 @@ func New( ...@@ -59,6 +72,7 @@ func New(
bzzTokenAddress: bzzTokenAddress, bzzTokenAddress: bzzTokenAddress,
transactionService: transactionService, transactionService: transactionService,
postageService: postageService, postageService: postageService,
postageStorer: postageStorer,
} }
} }
...@@ -74,6 +88,7 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi ...@@ -74,6 +88,7 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi
GasPrice: sctx.GetGasPrice(ctx), GasPrice: sctx.GetGasPrice(ctx),
GasLimit: 65000, GasLimit: 65000,
Value: big.NewInt(0), Value: big.NewInt(0),
Description: approveDescription,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -91,24 +106,19 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi ...@@ -91,24 +106,19 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi
return receipt, nil return receipt, nil
} }
func (c *postageContract) sendCreateBatchTransaction(ctx context.Context, owner common.Address, initialBalance *big.Int, depth uint8, nonce common.Hash, immutable bool) (*types.Receipt, error) { func (c *postageContract) sendTransaction(ctx context.Context, callData []byte, desc string) (*types.Receipt, error) {
callData, err := postageStampABI.Pack("createBatch", owner, initialBalance, depth, BucketDepth, nonce, immutable)
if err != nil {
return nil, err
}
request := &transaction.TxRequest{ request := &transaction.TxRequest{
To: &c.postageContractAddress, To: &c.postageContractAddress,
Data: callData, Data: callData,
GasPrice: sctx.GetGasPrice(ctx), GasPrice: sctx.GetGasPrice(ctx),
GasLimit: 160000, GasLimit: 160000,
Value: big.NewInt(0), Value: big.NewInt(0),
Description: desc,
} }
txHash, err := c.transactionService.Send(ctx, request) txHash, err := c.transactionService.Send(ctx, request)
if err != nil { if err != nil {
return nil, fmt.Errorf("send: depth %d bucketDepth %d immutable %t: %w", depth, BucketDepth, immutable, err) return nil, err
} }
receipt, err := c.transactionService.WaitForReceipt(ctx, txHash) receipt, err := c.transactionService.WaitForReceipt(ctx, txHash)
...@@ -123,6 +133,51 @@ func (c *postageContract) sendCreateBatchTransaction(ctx context.Context, owner ...@@ -123,6 +133,51 @@ func (c *postageContract) sendCreateBatchTransaction(ctx context.Context, owner
return receipt, nil return receipt, nil
} }
func (c *postageContract) sendCreateBatchTransaction(ctx context.Context, owner common.Address, initialBalance *big.Int, depth uint8, nonce common.Hash, immutable bool) (*types.Receipt, error) {
callData, err := postageStampABI.Pack("createBatch", owner, initialBalance, depth, BucketDepth, nonce, immutable)
if err != nil {
return nil, err
}
receipt, err := c.sendTransaction(ctx, callData, createBatchDescription)
if err != nil {
return nil, fmt.Errorf("create batch: depth %d bucketDepth %d immutable %t: %w", depth, BucketDepth, immutable, err)
}
return receipt, nil
}
func (c *postageContract) sendTopUpBatchTransaction(ctx context.Context, batchID []byte, topUpAmount *big.Int) (*types.Receipt, error) {
callData, err := postageStampABI.Pack("topUp", common.BytesToHash(batchID), topUpAmount)
if err != nil {
return nil, err
}
receipt, err := c.sendTransaction(ctx, callData, topUpBatchDescription)
if err != nil {
return nil, fmt.Errorf("topup batch: amount %d: %w", topUpAmount.Int64(), err)
}
return receipt, nil
}
func (c *postageContract) sendDiluteTransaction(ctx context.Context, batchID []byte, newDepth uint8) (*types.Receipt, error) {
callData, err := postageStampABI.Pack("increaseDepth", common.BytesToHash(batchID), newDepth)
if err != nil {
return nil, err
}
receipt, err := c.sendTransaction(ctx, callData, diluteBatchDescription)
if err != nil {
return nil, fmt.Errorf("dilute batch: new depth %d: %w", newDepth, err)
}
return receipt, nil
}
func (c *postageContract) getBalance(ctx context.Context) (*big.Int, error) { func (c *postageContract) getBalance(ctx context.Context) (*big.Int, error) {
callData, err := erc20ABI.Pack("balanceOf", c.owner) callData, err := erc20ABI.Pack("balanceOf", c.owner)
if err != nil { if err != nil {
...@@ -177,7 +232,7 @@ func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.I ...@@ -177,7 +232,7 @@ func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.I
} }
for _, ev := range receipt.Logs { for _, ev := range receipt.Logs {
if ev.Address == c.postageContractAddress && ev.Topics[0] == batchCreatedTopic { if ev.Address == c.postageContractAddress && len(ev.Topics) > 0 && ev.Topics[0] == batchCreatedTopic {
var createdEvent batchCreatedEvent var createdEvent batchCreatedEvent
err = transaction.ParseEvent(&postageStampABI, "BatchCreated", &createdEvent, *ev) err = transaction.ParseEvent(&postageStampABI, "BatchCreated", &createdEvent, *ev)
if err != nil { if err != nil {
...@@ -204,6 +259,67 @@ func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.I ...@@ -204,6 +259,67 @@ func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.I
return nil, ErrBatchCreate return nil, ErrBatchCreate
} }
func (c *postageContract) TopUpBatch(ctx context.Context, batchID []byte, topUpAmount *big.Int) error {
batch, err := c.postageStorer.Get(batchID)
if err != nil {
return err
}
totalAmount := big.NewInt(0).Mul(topUpAmount, big.NewInt(int64(1<<batch.Depth)))
balance, err := c.getBalance(ctx)
if err != nil {
return err
}
if balance.Cmp(totalAmount) < 0 {
return ErrInsufficientFunds
}
_, err = c.sendApproveTransaction(ctx, totalAmount)
if err != nil {
return err
}
receipt, err := c.sendTopUpBatchTransaction(ctx, batch.ID, topUpAmount)
if err != nil {
return err
}
for _, ev := range receipt.Logs {
if ev.Address == c.postageContractAddress && len(ev.Topics) > 0 && ev.Topics[0] == batchTopUpTopic {
return nil
}
}
return ErrBatchTopUp
}
func (c *postageContract) DiluteBatch(ctx context.Context, batchID []byte, newDepth uint8) error {
batch, err := c.postageStorer.Get(batchID)
if err != nil {
return err
}
if batch.Depth > newDepth {
return fmt.Errorf("new depth should be greater: %w", ErrInvalidDepth)
}
receipt, err := c.sendDiluteTransaction(ctx, batch.ID, newDepth)
if err != nil {
return err
}
for _, ev := range receipt.Logs {
if ev.Address == c.postageContractAddress && len(ev.Topics) > 0 && ev.Topics[0] == batchDiluteTopic {
return nil
}
}
return ErrBatchDilute
}
type batchCreatedEvent struct { type batchCreatedEvent struct {
BatchId [32]byte BatchId [32]byte
TotalAmount *big.Int TotalAmount *big.Int
......
...@@ -14,8 +14,11 @@ import ( ...@@ -14,8 +14,11 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/pkg/postage"
postagestoreMock "github.com/ethersphere/bee/pkg/postage/batchstore/mock"
postageMock "github.com/ethersphere/bee/pkg/postage/mock" postageMock "github.com/ethersphere/bee/pkg/postage/mock"
"github.com/ethersphere/bee/pkg/postage/postagecontract" "github.com/ethersphere/bee/pkg/postage/postagecontract"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/transaction" "github.com/ethersphere/bee/pkg/transaction"
transactionMock "github.com/ethersphere/bee/pkg/transaction/mock" transactionMock "github.com/ethersphere/bee/pkg/transaction/mock"
) )
...@@ -85,6 +88,7 @@ func TestCreateBatch(t *testing.T) { ...@@ -85,6 +88,7 @@ func TestCreateBatch(t *testing.T) {
}), }),
), ),
postageMock, postageMock,
postagestoreMock.New(),
) )
returnedID, err := contract.CreateBatch(ctx, initialBalance, depth, false, label) returnedID, err := contract.CreateBatch(ctx, initialBalance, depth, false, label)
...@@ -115,6 +119,7 @@ func TestCreateBatch(t *testing.T) { ...@@ -115,6 +119,7 @@ func TestCreateBatch(t *testing.T) {
bzzTokenAddress, bzzTokenAddress,
transactionMock.New(), transactionMock.New(),
postageMock.New(), postageMock.New(),
postagestoreMock.New(),
) )
_, err := contract.CreateBatch(ctx, initialBalance, depth, false, label) _, err := contract.CreateBatch(ctx, initialBalance, depth, false, label)
...@@ -140,6 +145,7 @@ func TestCreateBatch(t *testing.T) { ...@@ -140,6 +145,7 @@ func TestCreateBatch(t *testing.T) {
}), }),
), ),
postageMock.New(), postageMock.New(),
postagestoreMock.New(),
) )
_, err := contract.CreateBatch(ctx, initialBalance, depth, false, label) _, err := contract.CreateBatch(ctx, initialBalance, depth, false, label)
...@@ -192,3 +198,291 @@ func TestLookupERC20Address(t *testing.T) { ...@@ -192,3 +198,291 @@ func TestLookupERC20Address(t *testing.T) {
t.Fatalf("got wrong erc20 address. wanted %v, got %v", erc20Address, addr) t.Fatalf("got wrong erc20 address. wanted %v, got %v", erc20Address, addr)
} }
} }
func TestTopUpBatch(t *testing.T) {
defer func(b uint8) {
postagecontract.BucketDepth = b
}(postagecontract.BucketDepth)
postagecontract.BucketDepth = 9
owner := common.HexToAddress("abcd")
postageStampAddress := common.HexToAddress("ffff")
bzzTokenAddress := common.HexToAddress("eeee")
ctx := context.Background()
topupBalance := big.NewInt(100)
t.Run("ok", func(t *testing.T) {
totalAmount := big.NewInt(102400)
txHashApprove := common.HexToHash("abb0")
txHashTopup := common.HexToHash("c3a7")
batch := postagetesting.MustNewBatch(postagetesting.WithOwner(owner.Bytes()))
batch.Depth = uint8(10)
batch.BucketDepth = uint8(9)
postageMock := postageMock.New(postageMock.WithIssuer(postage.NewStampIssuer(
"label",
"keyID",
batch.ID,
batch.Value,
batch.Depth,
batch.BucketDepth,
batch.Start,
batch.Immutable,
)))
batchStoreMock := postagestoreMock.New(postagestoreMock.WithBatch(batch))
expectedCallData, err := postagecontract.PostageStampABI.Pack("topUp", common.BytesToHash(batch.ID), topupBalance)
if err != nil {
t.Fatal(err)
}
contract := postagecontract.New(
owner,
postageStampAddress,
bzzTokenAddress,
transactionMock.New(
transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error) {
if *request.To == bzzTokenAddress {
return txHashApprove, nil
} else if *request.To == postageStampAddress {
if !bytes.Equal(expectedCallData[:64], request.Data[:64]) {
return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data)
}
return txHashTopup, nil
}
return common.Hash{}, errors.New("sent to wrong contract")
}),
transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) {
if txHash == txHashApprove {
return &types.Receipt{
Status: 1,
}, nil
} else if txHash == txHashTopup {
return &types.Receipt{
Logs: []*types.Log{
newTopUpEvent(postageStampAddress, batch),
},
Status: 1,
}, nil
}
return nil, errors.New("unknown tx hash")
}),
transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) {
if *request.To == bzzTokenAddress {
return totalAmount.FillBytes(make([]byte, 32)), nil
}
return nil, errors.New("unexpected call")
}),
),
postageMock,
batchStoreMock,
)
err = contract.TopUpBatch(ctx, batch.ID, topupBalance)
if err != nil {
t.Fatal(err)
}
si, err := postageMock.GetStampIssuer(batch.ID)
if err != nil {
t.Fatal(err)
}
if si == nil {
t.Fatal("stamp issuer not set")
}
})
t.Run("batch doesnt exist", func(t *testing.T) {
errNotFound := errors.New("not found")
contract := postagecontract.New(
owner,
postageStampAddress,
bzzTokenAddress,
transactionMock.New(),
postageMock.New(),
postagestoreMock.New(postagestoreMock.WithGetErr(errNotFound, 0)),
)
err := contract.TopUpBatch(ctx, postagetesting.MustNewID(), topupBalance)
if !errors.Is(err, errNotFound) {
t.Fatal("expected error on topup of non existent batch")
}
})
t.Run("insufficient funds", func(t *testing.T) {
totalAmount := big.NewInt(102399)
batch := postagetesting.MustNewBatch(postagetesting.WithOwner(owner.Bytes()))
batchStoreMock := postagestoreMock.New(postagestoreMock.WithBatch(batch))
contract := postagecontract.New(
owner,
postageStampAddress,
bzzTokenAddress,
transactionMock.New(
transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) {
if *request.To == bzzTokenAddress {
return big.NewInt(0).Sub(totalAmount, big.NewInt(1)).FillBytes(make([]byte, 32)), nil
}
return nil, errors.New("unexpected call")
}),
),
postageMock.New(),
batchStoreMock,
)
err := contract.TopUpBatch(ctx, batch.ID, topupBalance)
if !errors.Is(err, postagecontract.ErrInsufficientFunds) {
t.Fatalf("expected error %v. got %v", postagecontract.ErrInsufficientFunds, err)
}
})
}
func newTopUpEvent(postageContractAddress common.Address, batch *postage.Batch) *types.Log {
b, err := postagecontract.PostageStampABI.Events["BatchTopUp"].Inputs.NonIndexed().Pack(
big.NewInt(0),
big.NewInt(0),
)
if err != nil {
panic(err)
}
return &types.Log{
Address: postageContractAddress,
Data: b,
Topics: []common.Hash{postagecontract.BatchTopUpTopic, common.BytesToHash(batch.ID)},
BlockNumber: batch.Start + 1,
}
}
func TestDiluteBatch(t *testing.T) {
defer func(b uint8) {
postagecontract.BucketDepth = b
}(postagecontract.BucketDepth)
postagecontract.BucketDepth = 9
owner := common.HexToAddress("abcd")
postageStampAddress := common.HexToAddress("ffff")
bzzTokenAddress := common.HexToAddress("eeee")
ctx := context.Background()
t.Run("ok", func(t *testing.T) {
txHashDilute := common.HexToHash("c3a7")
batch := postagetesting.MustNewBatch(postagetesting.WithOwner(owner.Bytes()))
batch.Depth = uint8(10)
batch.BucketDepth = uint8(9)
batch.Value = big.NewInt(100)
newDepth := batch.Depth + 1
postageMock := postageMock.New(postageMock.WithIssuer(postage.NewStampIssuer(
"label",
"keyID",
batch.ID,
batch.Value,
batch.Depth,
batch.BucketDepth,
batch.Start,
batch.Immutable,
)))
batchStoreMock := postagestoreMock.New(postagestoreMock.WithBatch(batch))
expectedCallData, err := postagecontract.PostageStampABI.Pack("increaseDepth", common.BytesToHash(batch.ID), newDepth)
if err != nil {
t.Fatal(err)
}
contract := postagecontract.New(
owner,
postageStampAddress,
bzzTokenAddress,
transactionMock.New(
transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error) {
if *request.To == postageStampAddress {
if !bytes.Equal(expectedCallData[:64], request.Data[:64]) {
return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data)
}
return txHashDilute, nil
}
return common.Hash{}, errors.New("sent to wrong contract")
}),
transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) {
if txHash == txHashDilute {
return &types.Receipt{
Logs: []*types.Log{
newDiluteEvent(postageStampAddress, batch),
},
Status: 1,
}, nil
}
return nil, errors.New("unknown tx hash")
}),
),
postageMock,
batchStoreMock,
)
err = contract.DiluteBatch(ctx, batch.ID, newDepth)
if err != nil {
t.Fatal(err)
}
si, err := postageMock.GetStampIssuer(batch.ID)
if err != nil {
t.Fatal(err)
}
if si == nil {
t.Fatal("stamp issuer not set")
}
})
t.Run("batch doesnt exist", func(t *testing.T) {
errNotFound := errors.New("not found")
contract := postagecontract.New(
owner,
postageStampAddress,
bzzTokenAddress,
transactionMock.New(),
postageMock.New(),
postagestoreMock.New(postagestoreMock.WithGetErr(errNotFound, 0)),
)
err := contract.DiluteBatch(ctx, postagetesting.MustNewID(), uint8(17))
if !errors.Is(err, errNotFound) {
t.Fatal("expected error on topup of non existent batch")
}
})
t.Run("invalid depth", func(t *testing.T) {
batch := postagetesting.MustNewBatch(postagetesting.WithOwner(owner.Bytes()))
batch.Depth = uint8(16)
batchStoreMock := postagestoreMock.New(postagestoreMock.WithBatch(batch))
contract := postagecontract.New(
owner,
postageStampAddress,
bzzTokenAddress,
transactionMock.New(),
postageMock.New(),
batchStoreMock,
)
err := contract.DiluteBatch(ctx, batch.ID, batch.Depth-1)
if !errors.Is(err, postagecontract.ErrInvalidDepth) {
t.Fatalf("expected error %v. got %v", postagecontract.ErrInvalidDepth, err)
}
})
}
func newDiluteEvent(postageContractAddress common.Address, batch *postage.Batch) *types.Log {
b, err := postagecontract.PostageStampABI.Events["BatchDepthIncrease"].Inputs.NonIndexed().Pack(
uint8(0),
big.NewInt(0),
)
if err != nil {
panic(err)
}
return &types.Log{
Address: postageContractAddress,
Data: b,
Topics: []common.Hash{postagecontract.BatchDiluteTopic, common.BytesToHash(batch.ID)},
BlockNumber: batch.Start + 1,
}
}
...@@ -7,4 +7,6 @@ package postagecontract ...@@ -7,4 +7,6 @@ package postagecontract
var ( var (
PostageStampABI = postageStampABI PostageStampABI = postageStampABI
BatchCreatedTopic = batchCreatedTopic BatchCreatedTopic = batchCreatedTopic
BatchTopUpTopic = batchTopUpTopic
BatchDiluteTopic = batchDiluteTopic
) )
...@@ -13,12 +13,22 @@ import ( ...@@ -13,12 +13,22 @@ import (
type contractMock struct { type contractMock struct {
createBatch func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) createBatch func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error)
topupBatch func(ctx context.Context, id []byte, amount *big.Int) error
diluteBatch func(ctx context.Context, id []byte, newDepth uint8) error
} }
func (c *contractMock) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) { func (c *contractMock) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) {
return c.createBatch(ctx, initialBalance, depth, immutable, label) return c.createBatch(ctx, initialBalance, depth, immutable, label)
} }
func (c *contractMock) TopUpBatch(ctx context.Context, batchID []byte, amount *big.Int) error {
return c.topupBatch(ctx, batchID, amount)
}
func (c *contractMock) DiluteBatch(ctx context.Context, batchID []byte, newDepth uint8) error {
return c.diluteBatch(ctx, batchID, newDepth)
}
// Option is a an option passed to New // Option is a an option passed to New
type Option func(*contractMock) type Option func(*contractMock)
...@@ -38,3 +48,15 @@ func WithCreateBatchFunc(f func(ctx context.Context, initialBalance *big.Int, de ...@@ -38,3 +48,15 @@ func WithCreateBatchFunc(f func(ctx context.Context, initialBalance *big.Int, de
m.createBatch = f m.createBatch = f
} }
} }
func WithTopUpBatchFunc(f func(ctx context.Context, batchID []byte, amount *big.Int) error) Option {
return func(m *contractMock) {
m.topupBatch = f
}
}
func WithDiluteBatchFunc(f func(ctx context.Context, batchID []byte, newDepth uint8) error) Option {
return func(m *contractMock) {
m.diluteBatch = f
}
}
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math/big"
"sync" "sync"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
...@@ -34,7 +35,7 @@ type Service interface { ...@@ -34,7 +35,7 @@ type Service interface {
StampIssuers() []*StampIssuer StampIssuers() []*StampIssuer
GetStampIssuer([]byte) (*StampIssuer, error) GetStampIssuer([]byte) (*StampIssuer, error)
IssuerUsable(*StampIssuer) bool IssuerUsable(*StampIssuer) bool
BatchCreationListener BatchEventListener
io.Closer io.Closer
} }
...@@ -87,10 +88,10 @@ func (ps *service) Add(st *StampIssuer) { ...@@ -87,10 +88,10 @@ func (ps *service) Add(st *StampIssuer) {
ps.issuers = append(ps.issuers, st) ps.issuers = append(ps.issuers, st)
} }
// Handle implements the BatchCreationListener interface. This is fired on receiving // HandleCreate implements the BatchEventListener interface. This is fired on receiving
// a batch creation event from the blockchain listener to ensure that if a stamp // a batch creation event from the blockchain listener to ensure that if a stamp
// issuer was not created initially, we will create it here. // issuer was not created initially, we will create it here.
func (ps *service) Handle(b *Batch) { func (ps *service) HandleCreate(b *Batch) {
ps.Add(NewStampIssuer( ps.Add(NewStampIssuer(
"recovered", "recovered",
string(b.Owner), string(b.Owner),
...@@ -103,6 +104,37 @@ func (ps *service) Handle(b *Batch) { ...@@ -103,6 +104,37 @@ func (ps *service) Handle(b *Batch) {
)) ))
} }
// HandleTopUp implements the BatchEventListener interface. This is fired on receiving
// a batch topup event from the blockchain to update stampissuer details
func (ps *service) HandleTopUp(batchID []byte, newValue *big.Int) {
ps.lock.Lock()
defer ps.lock.Unlock()
for _, v := range ps.issuers {
if bytes.Equal(batchID, v.data.BatchID) {
if newValue.Cmp(v.data.BatchAmount) > 0 {
v.data.BatchAmount = newValue
}
return
}
}
}
func (ps *service) HandleDepthIncrease(batchID []byte, newDepth uint8, normalisedBalance *big.Int) {
ps.lock.Lock()
defer ps.lock.Unlock()
for _, v := range ps.issuers {
if bytes.Equal(batchID, v.data.BatchID) {
if newDepth > v.data.BatchDepth {
v.data.BatchDepth = newDepth
v.data.BatchAmount = normalisedBalance
}
return
}
}
}
// StampIssuers returns the currently active stamp issuers. // StampIssuers returns the currently active stamp issuers.
func (ps *service) StampIssuers() []*StampIssuer { func (ps *service) StampIssuers() []*StampIssuer {
ps.lock.Lock() ps.lock.Lock()
......
...@@ -83,9 +83,6 @@ func TestGetStampIssuer(t *testing.T) { ...@@ -83,9 +83,6 @@ func TestGetStampIssuer(t *testing.T) {
} }
ps.Add(postage.NewStampIssuer(string(id), "", id, big.NewInt(3), 16, 8, validBlockNumber+shift, true)) ps.Add(postage.NewStampIssuer(string(id), "", id, big.NewInt(3), 16, 8, validBlockNumber+shift, true))
} }
b := postagetesting.MustNewBatch()
b.Start = validBlockNumber
ps.Handle(b)
t.Run("found", func(t *testing.T) { t.Run("found", func(t *testing.T) {
for _, id := range ids[1:4] { for _, id := range ids[1:4] {
st, err := ps.GetStampIssuer(id) st, err := ps.GetStampIssuer(id)
...@@ -112,6 +109,9 @@ func TestGetStampIssuer(t *testing.T) { ...@@ -112,6 +109,9 @@ func TestGetStampIssuer(t *testing.T) {
} }
}) })
t.Run("recovered", func(t *testing.T) { t.Run("recovered", func(t *testing.T) {
b := postagetesting.MustNewBatch()
b.Start = validBlockNumber
ps.HandleCreate(b)
st, err := ps.GetStampIssuer(b.ID) st, err := ps.GetStampIssuer(b.ID)
if err != nil { if err != nil {
t.Fatalf("expected no error, got %v", err) t.Fatalf("expected no error, got %v", err)
...@@ -120,4 +120,27 @@ func TestGetStampIssuer(t *testing.T) { ...@@ -120,4 +120,27 @@ func TestGetStampIssuer(t *testing.T) {
t.Fatal("wrong issuer returned") t.Fatal("wrong issuer returned")
} }
}) })
t.Run("topup", func(t *testing.T) {
ps.HandleTopUp(ids[1], big.NewInt(10))
_, err := ps.GetStampIssuer(ids[1])
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if ps.StampIssuers()[0].Amount().Cmp(big.NewInt(10)) != 0 {
t.Fatalf("expected amount %d got %d", 10, ps.StampIssuers()[0].Amount().Int64())
}
})
t.Run("dilute", func(t *testing.T) {
ps.HandleDepthIncrease(ids[2], 17, big.NewInt(1))
_, err := ps.GetStampIssuer(ids[2])
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if ps.StampIssuers()[1].Amount().Cmp(big.NewInt(1)) != 0 {
t.Fatalf("expected amount %d got %d", 1, ps.StampIssuers()[1].Amount().Int64())
}
if ps.StampIssuers()[1].Depth() != 17 {
t.Fatalf("expected depth %d got %d", 17, ps.StampIssuers()[1].Depth())
}
})
} }
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"fmt" "fmt"
"github.com/ethersphere/bee/pkg/pushsync" "github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
...@@ -22,20 +23,30 @@ import ( ...@@ -22,20 +23,30 @@ import (
// how many parallel push operations // how many parallel push operations
const parallelPush = 5 const parallelPush = 5
type Reuploader interface { type Interface interface {
// Reupload root hash and all of its underlying // Reupload root hash and all of its underlying
// associated chunks to the network. // associated chunks to the network.
Reupload(context.Context, swarm.Address) error Reupload(context.Context, swarm.Address) error
// IsRetrievable checks whether the content
// on the given address is retrievable.
IsRetrievable(context.Context, swarm.Address) (bool, error)
} }
type steward struct { type steward struct {
getter storage.Getter getter storage.Getter
push pushsync.PushSyncer push pushsync.PushSyncer
traverser traversal.Traverser traverser traversal.Traverser
netTraverser traversal.Traverser
} }
func New(getter storage.Getter, t traversal.Traverser, p pushsync.PushSyncer) Reuploader { func New(getter storage.Getter, t traversal.Traverser, r retrieval.Interface, p pushsync.PushSyncer) Interface {
return &steward{getter: getter, push: p, traverser: t} return &steward{
getter: getter,
push: p,
traverser: t,
netTraverser: traversal.New(&netGetter{r}),
}
} }
// Reupload content with the given root hash to the network. // Reupload content with the given root hash to the network.
...@@ -76,3 +87,32 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address) error { ...@@ -76,3 +87,32 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address) error {
} }
return nil return nil
} }
// IsRetrievable implements Interface.IsRetrievable method.
func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address) (bool, error) {
noop := func(leaf swarm.Address) error { return nil }
switch err := s.netTraverser.Traverse(ctx, root, noop); {
case errors.Is(err, storage.ErrNotFound):
return false, nil
case err != nil:
return false, fmt.Errorf("traversal of %q failed: %w", root, err)
default:
return true, nil
}
}
// netGetter implements the storage Getter.Get method in a way
// that it will try to retrieve the chunk only from the network.
type netGetter struct {
retrieval retrieval.Interface
}
// Get implements the storage Getter.Get interface.
func (ng *netGetter) Get(ctx context.Context, _ storage.ModeGet, addr swarm.Address) (swarm.Chunk, error) {
return ng.retrieval.RetrieveChunk(ctx, addr, true)
}
// Put implements the storage Putter.Put interface.
func (ng *netGetter) Put(_ context.Context, _ storage.ModePut, _ ...swarm.Chunk) ([]bool, error) {
return nil, errors.New("operation is not supported")
}
...@@ -29,6 +29,7 @@ func TestSteward(t *testing.T) { ...@@ -29,6 +29,7 @@ func TestSteward(t *testing.T) {
data = make([]byte, chunks*4096) //1k chunks data = make([]byte, chunks*4096) //1k chunks
store = mock.NewStorer() store = mock.NewStorer()
traverser = traversal.New(store) traverser = traversal.New(store)
loggingStorer = &loggingStore{Storer: store}
traversedAddrs = make(map[string]struct{}) traversedAddrs = make(map[string]struct{})
mu sync.Mutex mu sync.Mutex
fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) { fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) {
...@@ -38,7 +39,7 @@ func TestSteward(t *testing.T) { ...@@ -38,7 +39,7 @@ func TestSteward(t *testing.T) {
return nil, nil return nil, nil
} }
ps = psmock.New(fn) ps = psmock.New(fn)
s = steward.New(store, traverser, ps) s = steward.New(store, traverser, loggingStorer, ps)
) )
n, err := rand.Read(data) n, err := rand.Read(data)
if n != cap(data) { if n != cap(data) {
...@@ -48,8 +49,7 @@ func TestSteward(t *testing.T) { ...@@ -48,8 +49,7 @@ func TestSteward(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
l := &loggingStore{Storer: store} pipe := builder.NewPipelineBuilder(ctx, loggingStorer, storage.ModePutUpload, false)
pipe := builder.NewPipelineBuilder(ctx, l, storage.ModePutUpload, false)
addr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data)) addr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -62,8 +62,16 @@ func TestSteward(t *testing.T) { ...@@ -62,8 +62,16 @@ func TestSteward(t *testing.T) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
isRetrievable, err := s.IsRetrievable(ctx, addr)
if err != nil {
t.Fatal(err)
}
if !isRetrievable {
t.Fatalf("re-uploaded content on %q should be retrievable", addr)
}
// check that everything that was stored is also traversed // check that everything that was stored is also traversed
for _, a := range l.addrs { for _, a := range loggingStorer.addrs {
if _, ok := traversedAddrs[a.String()]; !ok { if _, ok := traversedAddrs[a.String()]; !ok {
t.Fatalf("expected address %s to be traversed", a.String()) t.Fatalf("expected address %s to be traversed", a.String())
} }
...@@ -77,11 +85,12 @@ func TestSteward_ErrWantSelf(t *testing.T) { ...@@ -77,11 +85,12 @@ func TestSteward_ErrWantSelf(t *testing.T) {
data = make([]byte, chunks*4096) data = make([]byte, chunks*4096)
store = mock.NewStorer() store = mock.NewStorer()
traverser = traversal.New(store) traverser = traversal.New(store)
loggingStorer = &loggingStore{Storer: store}
fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) { fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) {
return nil, topology.ErrWantSelf return nil, topology.ErrWantSelf
} }
ps = psmock.New(fn) ps = psmock.New(fn)
s = steward.New(store, traverser, ps) s = steward.New(store, traverser, loggingStorer, ps)
) )
n, err := rand.Read(data) n, err := rand.Read(data)
if n != cap(data) { if n != cap(data) {
...@@ -91,8 +100,7 @@ func TestSteward_ErrWantSelf(t *testing.T) { ...@@ -91,8 +100,7 @@ func TestSteward_ErrWantSelf(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
l := &loggingStore{Storer: store} pipe := builder.NewPipelineBuilder(ctx, loggingStorer, storage.ModePutUpload, false)
pipe := builder.NewPipelineBuilder(ctx, l, storage.ModePutUpload, false)
addr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data)) addr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -109,9 +117,13 @@ type loggingStore struct { ...@@ -109,9 +117,13 @@ type loggingStore struct {
addrs []swarm.Address addrs []swarm.Address
} }
func (l *loggingStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) { func (ls *loggingStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, c := range chs { for _, c := range chs {
l.addrs = append(l.addrs, c.Address()) ls.addrs = append(ls.addrs, c.Address())
} }
return l.Storer.Put(ctx, mode, chs...) return ls.Storer.Put(ctx, mode, chs...)
}
func (ls *loggingStore) RetrieveChunk(ctx context.Context, addr swarm.Address, _ bool) (chunk swarm.Chunk, err error) {
return ls.Get(ctx, storage.ModeGetRequest, addr)
} }
...@@ -156,7 +156,7 @@ func (t *Tag) Get(state State) int64 { ...@@ -156,7 +156,7 @@ func (t *Tag) Get(state State) int64 {
return atomic.LoadInt64(v) return atomic.LoadInt64(v)
} }
// GetTotal returns the total count // TotalCounter returns the total count
func (t *Tag) TotalCounter() int64 { func (t *Tag) TotalCounter() int64 {
return atomic.LoadInt64(&t.Total) return atomic.LoadInt64(&t.Total)
} }
...@@ -169,6 +169,8 @@ func (t *Tag) WaitTillDone(ctx context.Context, s State) error { ...@@ -169,6 +169,8 @@ func (t *Tag) WaitTillDone(ctx context.Context, s State) error {
return nil return nil
} }
ticker := time.NewTicker(100 * time.Millisecond) ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
package metrics package metrics
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"time" "time"
...@@ -164,7 +165,7 @@ type Counters struct { ...@@ -164,7 +165,7 @@ type Counters struct {
// flush writes the current state of in memory counters into the given db. // flush writes the current state of in memory counters into the given db.
func (cs *Counters) flush(db *shed.DB, batch *leveldb.Batch) error { func (cs *Counters) flush(db *shed.DB, batch *leveldb.Batch) error {
if cs.dirty.Load() > 1 { if cs.dirty.Load() < 3 {
return nil return nil
} }
cs.dirty.CAS(3, 2) cs.dirty.CAS(3, 2)
...@@ -341,15 +342,19 @@ func (c *Collector) Flush(addresses ...swarm.Address) error { ...@@ -341,15 +342,19 @@ func (c *Collector) Flush(addresses ...swarm.Address) error {
return mErr return mErr
} }
// Finalize logs out all ongoing peer sessions // Finalize tries to logs out all ongoing peer sessions.
// and flushes all in-memory metrics counters. func (c *Collector) Finalize(ctx context.Context, t time.Time) error {
func (c *Collector) Finalize(t time.Time) error {
var ( var (
mErr error mErr error
batch = new(leveldb.Batch) batch = new(leveldb.Batch)
) )
c.counters.Range(func(_, val interface{}) bool { c.counters.Range(func(_, val interface{}) bool {
select {
case <-ctx.Done():
return false
default:
}
cs := val.(*Counters) cs := val.(*Counters)
PeerLogOut(t)(cs) PeerLogOut(t)(cs)
if err := cs.flush(c.db, batch); err != nil { if err := cs.flush(c.db, batch); err != nil {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package metrics_test package metrics_test
import ( import (
"context"
"testing" "testing"
"time" "time"
...@@ -125,7 +126,7 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -125,7 +126,7 @@ func TestPeerMetricsCollector(t *testing.T) {
// Finalize. // Finalize.
mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound)) mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound))
if err := mc.Finalize(t3); err != nil { if err := mc.Finalize(context.Background(), t3); err != nil {
t.Fatalf("Finalize(%s): unexpected error: %v", t3, err) t.Fatalf("Finalize(%s): unexpected error: %v", t3, err)
} }
if have, want := len(mc.Snapshot(t2, addr)), 0; have != want { if have, want := len(mc.Snapshot(t2, addr)), 0; have != want {
......
...@@ -506,18 +506,33 @@ func (k *Kad) manage() { ...@@ -506,18 +506,33 @@ func (k *Kad) manage() {
peerConnChan2 := make(chan *peerConnInfo) peerConnChan2 := make(chan *peerConnInfo)
go k.connectionAttemptsHandler(ctx, &wg, peerConnChan, peerConnChan2) go k.connectionAttemptsHandler(ctx, &wg, peerConnChan, peerConnChan2)
k.wg.Add(1)
go func() {
defer k.wg.Done()
for { for {
select { select {
case <-k.halt:
return
case <-k.quit: case <-k.quit:
return return
case <-time.After(15 * time.Second): case <-time.After(5 * time.Minute):
start := time.Now() start := time.Now()
if err := k.collector.Flush(); err != nil { if err := k.collector.Flush(); err != nil {
k.metrics.InternalMetricsFlushTotalErrors.Inc() k.metrics.InternalMetricsFlushTotalErrors.Inc()
k.logger.Debugf("kademlia: unable to flush metrics counters to the persistent store: %v", err) k.logger.Debugf("kademlia: took %s unable to flush metrics counters to the persistent store: %v", time.Since(start), err)
} else { } else {
k.metrics.InternalMetricsFlushTime.Observe(float64(time.Since(start).Nanoseconds())) k.metrics.InternalMetricsFlushTime.Observe(float64(time.Since(start).Nanoseconds()))
k.logger.Tracef("kademlia took %s to flush", time.Since(start))
}
}
} }
}()
for {
select {
case <-k.quit:
return
case <-time.After(15 * time.Second):
k.notifyManageLoop() k.notifyManageLoop()
case <-k.manageC: case <-k.manageC:
start := time.Now() start := time.Now()
...@@ -1321,9 +1336,11 @@ func (k *Kad) Close() error { ...@@ -1321,9 +1336,11 @@ func (k *Kad) Close() error {
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
k.logger.Warning("kademlia manage loop did not shut down properly") k.logger.Warning("kademlia manage loop did not shut down properly")
} }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
k.logger.Info("kademlia persisting peer metrics") k.logger.Info("kademlia persisting peer metrics")
if err := k.collector.Finalize(time.Now()); err != nil { if err := k.collector.Finalize(ctx, time.Now()); err != nil {
k.logger.Debugf("kademlia: unable to finalize open sessions: %v", err) k.logger.Debugf("kademlia: unable to finalize open sessions: %v", err)
} }
......
...@@ -76,16 +76,34 @@ func WaitSynced(ctx context.Context, logger logging.Logger, backend Backend, max ...@@ -76,16 +76,34 @@ func WaitSynced(ctx context.Context, logger logging.Logger, backend Backend, max
} }
} }
func WaitBlock(ctx context.Context, backend Backend, pollingInterval time.Duration, block *big.Int) (*types.Header, error) { func WaitBlockAfterTransaction(ctx context.Context, backend Backend, pollingInterval time.Duration, txHash common.Hash, additionalConfirmations uint64) (*types.Header, error) {
for { for {
header, err := backend.HeaderByNumber(ctx, block) receipt, err := backend.TransactionReceipt(ctx, txHash)
if err != nil { if err != nil {
if !errors.Is(err, ethereum.NotFound) { if !errors.Is(err, ethereum.NotFound) {
return nil, err return nil, err
} }
continue
}
bn, err := backend.BlockNumber(ctx)
if err != nil {
return nil, err
}
nextBlock := receipt.BlockNumber.Uint64() + 1
if bn >= nextBlock+additionalConfirmations {
header, err := backend.HeaderByNumber(ctx, new(big.Int).SetUint64(nextBlock))
if err != nil {
if !errors.Is(err, ethereum.NotFound) {
return nil, err
}
// in the case where we cannot find the block even though we already saw a higher number we keep on trying
} else { } else {
return header, nil return header, nil
} }
}
select { select {
case <-time.After(pollingInterval): case <-time.After(pollingInterval):
......
...@@ -129,6 +129,10 @@ func (m *backendMock) NonceAt(ctx context.Context, account common.Address, block ...@@ -129,6 +129,10 @@ func (m *backendMock) NonceAt(ctx context.Context, account common.Address, block
return 0, errors.New("not implemented") return 0, errors.New("not implemented")
} }
func (m *backendMock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
return nil, errors.New("not implemented")
}
func New(opts ...Option) transaction.Backend { func New(opts ...Option) transaction.Backend {
mock := new(backendMock) mock := new(backendMock)
for _, o := range opts { for _, o := range opts {
......
...@@ -159,3 +159,7 @@ func (m *simulatedBackend) NonceAt(ctx context.Context, account common.Address, ...@@ -159,3 +159,7 @@ func (m *simulatedBackend) NonceAt(ctx context.Context, account common.Address,
return 0, nil return 0, nil
} }
} }
func (m *simulatedBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
return nil, errors.New("not implemented")
}
...@@ -94,7 +94,15 @@ func (m *Matcher) Matches(ctx context.Context, tx []byte, networkID uint64, send ...@@ -94,7 +94,15 @@ func (m *Matcher) Matches(ctx context.Context, tx []byte, networkID uint64, send
return nil, ErrTransactionPending return nil, ErrTransactionPending
} }
sender, err := types.Sender(m.signer, nTx) // if transaction data is exactly one word and starts with 4 0-bytes this is a transaction data type proof
// we check for the starting 0-bytes so we don't mistake a 28 byte data solidity call for this
// otherwise this is considered as a signer based proof. a transaction can be only one of the two.
var attestedOverlay common.Address
txData := nTx.Data()
if len(txData) == 32 && bytes.Equal(txData[0:4], []byte{0, 0, 0, 0}) {
attestedOverlay = common.BytesToAddress(nTx.Data())
} else {
attestedOverlay, err = types.Sender(m.signer, nTx)
if err != nil { if err != nil {
err2 := m.storage.Put(peerOverlayKey(senderOverlay, incomingTx), &overlayVerification{ err2 := m.storage.Put(peerOverlayKey(senderOverlay, incomingTx), &overlayVerification{
TimeStamp: m.timeNow(), TimeStamp: m.timeNow(),
...@@ -105,6 +113,7 @@ func (m *Matcher) Matches(ctx context.Context, tx []byte, networkID uint64, send ...@@ -105,6 +113,7 @@ func (m *Matcher) Matches(ctx context.Context, tx []byte, networkID uint64, send
} }
return nil, fmt.Errorf("%v: %w", err, ErrTransactionSenderInvalid) return nil, fmt.Errorf("%v: %w", err, ErrTransactionSenderInvalid)
} }
}
receipt, err := m.backend.TransactionReceipt(ctx, incomingTx) receipt, err := m.backend.TransactionReceipt(ctx, incomingTx)
if err != nil { if err != nil {
...@@ -145,7 +154,7 @@ func (m *Matcher) Matches(ctx context.Context, tx []byte, networkID uint64, send ...@@ -145,7 +154,7 @@ func (m *Matcher) Matches(ctx context.Context, tx []byte, networkID uint64, send
return nil, fmt.Errorf("receipt hash %x does not match block's parent hash %x: %w", receiptBlockHash, nextBlockParentHash, ErrBlockHashMismatch) return nil, fmt.Errorf("receipt hash %x does not match block's parent hash %x: %w", receiptBlockHash, nextBlockParentHash, ErrBlockHashMismatch)
} }
expectedRemoteBzzAddress := crypto.NewOverlayFromEthereumAddress(sender.Bytes(), networkID, nextBlockHash) expectedRemoteBzzAddress := crypto.NewOverlayFromEthereumAddress(attestedOverlay.Bytes(), networkID, nextBlockHash)
if !expectedRemoteBzzAddress.Equal(senderOverlay) { if !expectedRemoteBzzAddress.Equal(senderOverlay) {
err2 := m.storage.Put(peerOverlayKey(senderOverlay, incomingTx), &overlayVerification{ err2 := m.storage.Put(peerOverlayKey(senderOverlay, incomingTx), &overlayVerification{
......
...@@ -24,7 +24,14 @@ func TestMatchesSender(t *testing.T) { ...@@ -24,7 +24,14 @@ func TestMatchesSender(t *testing.T) {
nonce := uint64(2) nonce := uint64(2)
trx := common.HexToAddress("0x1").Bytes() trx := common.HexToAddress("0x1").Bytes()
signedTx := types.NewTransaction(nonce, recipient, value, estimatedGasLimit, suggestedGasPrice, txData) signedTx := types.NewTx(&types.LegacyTx{
Nonce: nonce,
To: &recipient,
Value: value,
Gas: estimatedGasLimit,
GasPrice: suggestedGasPrice,
Data: txData,
})
t.Run("fail to retrieve tx from backend", func(t *testing.T) { t.Run("fail to retrieve tx from backend", func(t *testing.T) {
txByHash := backendmock.WithTransactionByHashFunc(func(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) { txByHash := backendmock.WithTransactionByHashFunc(func(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) {
...@@ -102,7 +109,7 @@ func TestMatchesSender(t *testing.T) { ...@@ -102,7 +109,7 @@ func TestMatchesSender(t *testing.T) {
} }
}) })
t.Run("sender matches", func(t *testing.T) { t.Run("sender matches signer type", func(t *testing.T) {
trxBlock := common.HexToHash("0x2") trxBlock := common.HexToHash("0x2")
nextBlockHeader := &types.Header{ nextBlockHeader := &types.Header{
...@@ -138,6 +145,52 @@ func TestMatchesSender(t *testing.T) { ...@@ -138,6 +145,52 @@ func TestMatchesSender(t *testing.T) {
} }
}) })
t.Run("sender matches data type", func(t *testing.T) {
trxBlock := common.HexToHash("0x2")
nextBlockHeader := &types.Header{
ParentHash: trxBlock,
}
overlayEth := common.HexToAddress("0xff")
signedTx := types.NewTransaction(nonce, recipient, value, estimatedGasLimit, suggestedGasPrice, overlayEth.Hash().Bytes())
trxReceipt := backendmock.WithTransactionReceiptFunc(func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
return &types.Receipt{
BlockNumber: big.NewInt(0),
BlockHash: trxBlock,
}, nil
})
headerByNum := backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) {
return nextBlockHeader, nil
})
txByHash := backendmock.WithTransactionByHashFunc(func(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) {
return signedTx, false, nil
})
signer := &mockSigner{
addr: common.HexToAddress("0xee"),
}
matcher := transaction.NewMatcher(backendmock.New(trxReceipt, headerByNum, txByHash), signer, statestore.NewStateStore())
senderOverlay := crypto.NewOverlayFromEthereumAddress(overlayEth.Bytes(), 0, nextBlockHeader.Hash().Bytes())
_, err := matcher.Matches(context.Background(), trx, 0, senderOverlay)
if err != nil {
t.Fatalf("expected match. got %v", err)
}
senderOverlay = crypto.NewOverlayFromEthereumAddress(signer.addr.Bytes(), 0, nextBlockHeader.Hash().Bytes())
_, err = matcher.Matches(context.Background(), trx, 0, senderOverlay)
if err == nil {
t.Fatalf("matched signer for data tx")
}
})
t.Run("cached", func(t *testing.T) { t.Run("cached", func(t *testing.T) {
trxBlock := common.HexToHash("0x2") trxBlock := common.HexToHash("0x2")
......
...@@ -142,7 +142,7 @@ func (t *transactionService) Send(ctx context.Context, request *TxRequest) (txHa ...@@ -142,7 +142,7 @@ func (t *transactionService) Send(ctx context.Context, request *TxRequest) (txHa
return common.Hash{}, err return common.Hash{}, err
} }
tx, err := prepareTransaction(ctx, request, t.sender, t.backend, nonce) tx, err := t.prepareTransaction(ctx, request, nonce)
if err != nil { if err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
...@@ -243,11 +243,11 @@ func (t *transactionService) StoredTransaction(txHash common.Hash) (*StoredTrans ...@@ -243,11 +243,11 @@ func (t *transactionService) StoredTransaction(txHash common.Hash) (*StoredTrans
} }
// prepareTransaction creates a signable transaction based on a request. // prepareTransaction creates a signable transaction based on a request.
func prepareTransaction(ctx context.Context, request *TxRequest, from common.Address, backend Backend, nonce uint64) (tx *types.Transaction, err error) { func (t *transactionService) prepareTransaction(ctx context.Context, request *TxRequest, nonce uint64) (tx *types.Transaction, err error) {
var gasLimit uint64 var gasLimit uint64
if request.GasLimit == 0 { if request.GasLimit == 0 {
gasLimit, err = backend.EstimateGas(ctx, ethereum.CallMsg{ gasLimit, err = t.backend.EstimateGas(ctx, ethereum.CallMsg{
From: from, From: t.sender,
To: request.To, To: request.To,
Data: request.Data, Data: request.Data,
}) })
...@@ -263,7 +263,7 @@ func prepareTransaction(ctx context.Context, request *TxRequest, from common.Add ...@@ -263,7 +263,7 @@ func prepareTransaction(ctx context.Context, request *TxRequest, from common.Add
var gasPrice *big.Int var gasPrice *big.Int
if request.GasPrice == nil { if request.GasPrice == nil {
gasPrice, err = backend.SuggestGasPrice(ctx) gasPrice, err = t.backend.SuggestGasPrice(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -271,24 +271,14 @@ func prepareTransaction(ctx context.Context, request *TxRequest, from common.Add ...@@ -271,24 +271,14 @@ func prepareTransaction(ctx context.Context, request *TxRequest, from common.Add
gasPrice = request.GasPrice gasPrice = request.GasPrice
} }
if request.To != nil { return types.NewTx(&types.LegacyTx{
return types.NewTransaction( Nonce: nonce,
nonce, To: request.To,
*request.To, Value: request.Value,
request.Value, Gas: gasLimit,
gasLimit, GasPrice: gasPrice,
gasPrice, Data: request.Data,
request.Data, }), nil
), nil
}
return types.NewContractCreation(
nonce,
request.Value,
gasLimit,
gasPrice,
request.Data,
), nil
} }
func (t *transactionService) nonceKey() string { func (t *transactionService) nonceKey() string {
...@@ -382,25 +372,14 @@ func (t *transactionService) ResendTransaction(ctx context.Context, txHash commo ...@@ -382,25 +372,14 @@ func (t *transactionService) ResendTransaction(ctx context.Context, txHash commo
return err return err
} }
var tx *types.Transaction tx := types.NewTx(&types.LegacyTx{
if storedTransaction.To != nil { Nonce: storedTransaction.Nonce,
tx = types.NewTransaction( To: storedTransaction.To,
storedTransaction.Nonce, Value: storedTransaction.Value,
*storedTransaction.To, Gas: storedTransaction.GasLimit,
storedTransaction.Value, GasPrice: storedTransaction.GasPrice,
storedTransaction.GasLimit, Data: storedTransaction.Data,
storedTransaction.GasPrice, })
storedTransaction.Data,
)
} else {
tx = types.NewContractCreation(
storedTransaction.Nonce,
storedTransaction.Value,
storedTransaction.GasLimit,
storedTransaction.GasPrice,
storedTransaction.Data,
)
}
signedTx, err := t.signer.SignTx(tx, t.chainID) signedTx, err := t.signer.SignTx(tx, t.chainID)
if err != nil { if err != nil {
...@@ -433,14 +412,14 @@ func (t *transactionService) CancelTransaction(ctx context.Context, originalTxHa ...@@ -433,14 +412,14 @@ func (t *transactionService) CancelTransaction(ctx context.Context, originalTxHa
return common.Hash{}, ErrGasPriceTooLow return common.Hash{}, ErrGasPriceTooLow
} }
signedTx, err := t.signer.SignTx(types.NewTransaction( signedTx, err := t.signer.SignTx(types.NewTx(&types.AccessListTx{
storedTransaction.Nonce, Nonce: storedTransaction.Nonce,
t.sender, To: &t.sender,
big.NewInt(0), Value: big.NewInt(0),
21000, Gas: 21000,
gasPrice, GasPrice: gasPrice,
[]byte{}, Data: []byte{},
), t.chainID) }), t.chainID)
if err != nil { if err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
......
...@@ -82,7 +82,14 @@ func TestTransactionSend(t *testing.T) { ...@@ -82,7 +82,14 @@ func TestTransactionSend(t *testing.T) {
chainID := big.NewInt(5) chainID := big.NewInt(5)
t.Run("send", func(t *testing.T) { t.Run("send", func(t *testing.T) {
signedTx := types.NewTransaction(nonce, recipient, value, estimatedGasLimit, suggestedGasPrice, txData) signedTx := types.NewTx(&types.LegacyTx{
Nonce: nonce,
To: &recipient,
Value: value,
Gas: estimatedGasLimit,
GasPrice: suggestedGasPrice,
Data: txData,
})
request := &transaction.TxRequest{ request := &transaction.TxRequest{
To: &recipient, To: &recipient,
Data: txData, Data: txData,
...@@ -193,7 +200,14 @@ func TestTransactionSend(t *testing.T) { ...@@ -193,7 +200,14 @@ func TestTransactionSend(t *testing.T) {
}) })
t.Run("send_no_nonce", func(t *testing.T) { t.Run("send_no_nonce", func(t *testing.T) {
signedTx := types.NewTransaction(nonce, recipient, value, estimatedGasLimit, suggestedGasPrice, txData) signedTx := types.NewTx(&types.LegacyTx{
Nonce: nonce,
To: &recipient,
Value: value,
Gas: estimatedGasLimit,
GasPrice: suggestedGasPrice,
Data: txData,
})
request := &transaction.TxRequest{ request := &transaction.TxRequest{
To: &recipient, To: &recipient,
Data: txData, Data: txData,
...@@ -256,7 +270,14 @@ func TestTransactionSend(t *testing.T) { ...@@ -256,7 +270,14 @@ func TestTransactionSend(t *testing.T) {
t.Run("send_skipped_nonce", func(t *testing.T) { t.Run("send_skipped_nonce", func(t *testing.T) {
nextNonce := nonce + 5 nextNonce := nonce + 5
signedTx := types.NewTransaction(nextNonce, recipient, value, estimatedGasLimit, suggestedGasPrice, txData) signedTx := types.NewTx(&types.LegacyTx{
Nonce: nextNonce,
To: &recipient,
Value: value,
Gas: estimatedGasLimit,
GasPrice: suggestedGasPrice,
Data: txData,
})
request := &transaction.TxRequest{ request := &transaction.TxRequest{
To: &recipient, To: &recipient,
Data: txData, Data: txData,
...@@ -319,58 +340,6 @@ func TestTransactionSend(t *testing.T) { ...@@ -319,58 +340,6 @@ func TestTransactionSend(t *testing.T) {
t.Fatalf("did not store nonce correctly. wanted %d, got %d", nextNonce+1, storedNonce) t.Fatalf("did not store nonce correctly. wanted %d, got %d", nextNonce+1, storedNonce)
} }
}) })
t.Run("deploy", func(t *testing.T) {
signedTx := types.NewContractCreation(nonce, value, estimatedGasLimit, suggestedGasPrice, txData)
request := &transaction.TxRequest{
To: nil,
Data: txData,
Value: value,
}
transactionService, err := transaction.NewService(logger,
backendmock.New(
backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error {
if tx != signedTx {
t.Fatal("not sending signed transaction")
}
return nil
}),
backendmock.WithEstimateGasFunc(func(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) {
if call.To != nil {
t.Fatalf("estimating with recipient. wanted nil, got %x", call.To)
}
if !bytes.Equal(call.Data, txData) {
t.Fatal("estimating with wrong data")
}
return estimatedGasLimit, nil
}),
backendmock.WithSuggestGasPriceFunc(func(ctx context.Context) (*big.Int, error) {
return suggestedGasPrice, nil
}),
backendmock.WithPendingNonceAtFunc(func(ctx context.Context, account common.Address) (uint64, error) {
return nonce, nil
}),
),
signerMockForTransaction(signedTx, sender, chainID, t),
storemock.NewStateStore(),
chainID,
monitormock.New(),
)
if err != nil {
t.Fatal(err)
}
defer transactionService.Close()
txHash, err := transactionService.Send(context.Background(), request)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(txHash.Bytes(), signedTx.Hash().Bytes()) {
t.Fatal("returning wrong transaction hash")
}
})
} }
func TestTransactionWaitForReceipt(t *testing.T) { func TestTransactionWaitForReceipt(t *testing.T) {
...@@ -444,7 +413,14 @@ func TestTransactionResend(t *testing.T) { ...@@ -444,7 +413,14 @@ func TestTransactionResend(t *testing.T) {
store := storemock.NewStateStore() store := storemock.NewStateStore()
defer store.Close() defer store.Close()
signedTx := types.NewTransaction(nonce, recipient, value, gasLimit, gasPrice, data) signedTx := types.NewTx(&types.LegacyTx{
Nonce: nonce,
To: &recipient,
Value: value,
Gas: gasLimit,
GasPrice: gasPrice,
Data: data,
})
err := store.Put(transaction.StoredTransactionKey(signedTx.Hash()), transaction.StoredTransaction{ err := store.Put(transaction.StoredTransactionKey(signedTx.Hash()), transaction.StoredTransaction{
Nonce: nonce, Nonce: nonce,
...@@ -496,7 +472,14 @@ func TestTransactionCancel(t *testing.T) { ...@@ -496,7 +472,14 @@ func TestTransactionCancel(t *testing.T) {
store := storemock.NewStateStore() store := storemock.NewStateStore()
defer store.Close() defer store.Close()
signedTx := types.NewTransaction(nonce, recipient, value, gasLimit, gasPrice, data) signedTx := types.NewTx(&types.LegacyTx{
Nonce: nonce,
To: &recipient,
Value: value,
Gas: gasLimit,
GasPrice: gasPrice,
Data: data,
})
err := store.Put(transaction.StoredTransactionKey(signedTx.Hash()), transaction.StoredTransaction{ err := store.Put(transaction.StoredTransactionKey(signedTx.Hash()), transaction.StoredTransaction{
Nonce: nonce, Nonce: nonce,
To: &recipient, To: &recipient,
...@@ -510,14 +493,14 @@ func TestTransactionCancel(t *testing.T) { ...@@ -510,14 +493,14 @@ func TestTransactionCancel(t *testing.T) {
} }
t.Run("ok", func(t *testing.T) { t.Run("ok", func(t *testing.T) {
cancelTx := types.NewTransaction( cancelTx := types.NewTx(&types.LegacyTx{
nonce, Nonce: nonce,
recipient, To: &recipient,
big.NewInt(0), Value: big.NewInt(0),
21000, Gas: 21000,
new(big.Int).Add(gasPrice, big.NewInt(1)), GasPrice: new(big.Int).Add(gasPrice, big.NewInt(1)),
[]byte{}, Data: []byte{},
) })
transactionService, err := transaction.NewService(logger, transactionService, err := transaction.NewService(logger,
backendmock.New( backendmock.New(
...@@ -550,15 +533,14 @@ func TestTransactionCancel(t *testing.T) { ...@@ -550,15 +533,14 @@ func TestTransactionCancel(t *testing.T) {
t.Run("custom gas price", func(t *testing.T) { t.Run("custom gas price", func(t *testing.T) {
customGasPrice := big.NewInt(5) customGasPrice := big.NewInt(5)
cancelTx := types.NewTx(&types.LegacyTx{
cancelTx := types.NewTransaction( Nonce: nonce,
nonce, To: &recipient,
recipient, Value: big.NewInt(0),
big.NewInt(0), Gas: 21000,
21000, GasPrice: customGasPrice,
customGasPrice, Data: []byte{},
[]byte{}, })
)
transactionService, err := transaction.NewService(logger, transactionService, err := transaction.NewService(logger,
backendmock.New( backendmock.New(
...@@ -592,15 +574,14 @@ func TestTransactionCancel(t *testing.T) { ...@@ -592,15 +574,14 @@ func TestTransactionCancel(t *testing.T) {
t.Run("too low gas price", func(t *testing.T) { t.Run("too low gas price", func(t *testing.T) {
customGasPrice := big.NewInt(0) customGasPrice := big.NewInt(0)
cancelTx := types.NewTx(&types.LegacyTx{
cancelTx := types.NewTransaction( Nonce: nonce,
nonce, To: &recipient,
recipient, Value: big.NewInt(0),
big.NewInt(0), Gas: 21000,
21000, GasPrice: customGasPrice,
customGasPrice, Data: []byte{},
[]byte{}, })
)
transactionService, err := transaction.NewService(logger, transactionService, err := transaction.NewService(logger,
backendmock.New( backendmock.New(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment