Commit 1e58ecfc authored by aloknerurkar's avatar aloknerurkar Committed by GitHub

feat(postage): stampissuer usability (#2063)

parent d73dfe30
21c21
< blockThreshold = 10
---
> blockThreshold = 0
......@@ -43,6 +43,7 @@ jobs:
patch pkg/postage/batchstore/reserve.go .github/patches/postagereserve.patch
patch pkg/postage/postagecontract/contract.go .github/patches/postagecontract.patch
patch pkg/postage/listener/listener.go .github/patches/listener.patch
patch pkg/postage/service.go .github/patches/postageservice.patch
- name: Prepare local cluster
run: |
printf ${{ secrets.CR_PAT }} | docker login ghcr.io -u bee-worker --password-stdin
......
......@@ -319,6 +319,8 @@ components:
$ref: "#/components/schemas/BatchID"
utilization:
type: integer
usable:
type: boolean
Settlement:
type: object
......
......@@ -267,7 +267,7 @@ func TestPostageHeaderError(t *testing.T) {
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 5)
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10)))
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10, 1000)))
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tags.NewTags(mockStatestore, logger),
......
......@@ -57,7 +57,14 @@ func (s *server) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
logger.Debugf("bzz upload: putter: %v", err)
logger.Error("bzz upload: putter")
jsonhttp.BadRequest(w, nil)
switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, nil)
}
return
}
......
......@@ -19,6 +19,7 @@ import (
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
......@@ -152,7 +153,14 @@ func (s *server) feedPostHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
s.logger.Debugf("feed put: putter: %v", err)
s.logger.Error("feed put: putter")
jsonhttp.BadRequest(w, nil)
switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, nil)
}
return
}
......@@ -185,7 +193,12 @@ func (s *server) feedPostHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
s.logger.Debugf("feed post: store manifest: %v", err)
s.logger.Error("feed post: store manifest")
jsonhttp.InternalServerError(w, nil)
switch {
case errors.Is(err, postage.ErrBucketFull):
jsonhttp.PaymentRequired(w, "batch is overissued")
default:
jsonhttp.InternalServerError(w, nil)
}
return
}
......
......@@ -154,7 +154,7 @@ func TestFeed_Post(t *testing.T) {
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
topic = "aabbcc"
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10)))
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10, 1000)))
mockStorer = mock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
......
......@@ -99,6 +99,7 @@ func (s *server) postageCreateHandler(w http.ResponseWriter, r *http.Request) {
type postageStampResponse struct {
BatchID batchID `json:"batchID"`
Utilization uint32 `json:"utilization"`
Usable bool `json:"usable"`
}
type postageStampsResponse struct {
......@@ -109,7 +110,11 @@ func (s *server) postageGetStampsHandler(w http.ResponseWriter, r *http.Request)
issuers := s.post.StampIssuers()
resp := postageStampsResponse{}
for _, v := range issuers {
issuer := postageStampResponse{BatchID: v.ID(), Utilization: v.Utilization()}
issuer := postageStampResponse{
BatchID: v.ID(),
Utilization: v.Utilization(),
Usable: s.post.IssuerUsable(v),
}
resp.Stamps = append(resp.Stamps, issuer)
}
jsonhttp.OK(w, resp)
......@@ -140,6 +145,7 @@ func (s *server) postageGetStampHandler(w http.ResponseWriter, r *http.Request)
resp := postageStampResponse{
BatchID: id,
Utilization: issuer.Utilization(),
Usable: s.post.IssuerUsable(issuer),
}
jsonhttp.OK(w, &resp)
}
......@@ -192,7 +192,7 @@ func TestPostageCreateStamp(t *testing.T) {
}
func TestPostageGetStamps(t *testing.T) {
mp := mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10)))
mp := mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10, 1000)))
client, _, _ := newTestServer(t, testServerOptions{Post: mp})
jsonhttptest.Request(t, client, http.MethodGet, "/stamps", http.StatusOK,
......@@ -201,6 +201,7 @@ func TestPostageGetStamps(t *testing.T) {
{
BatchID: batchOk,
Utilization: 0,
Usable: true,
},
},
}),
......@@ -208,7 +209,7 @@ func TestPostageGetStamps(t *testing.T) {
}
func TestPostageGetStamp(t *testing.T) {
mp := mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10)))
mp := mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10, 1000)))
client, _, _ := newTestServer(t, testServerOptions{Post: mp})
t.Run("ok", func(t *testing.T) {
......@@ -216,6 +217,7 @@ func TestPostageGetStamp(t *testing.T) {
jsonhttptest.WithExpectedJSONResponse(&api.PostageStampResponse{
BatchID: batchOk,
Utilization: 0,
Usable: true,
}),
)
})
......
......@@ -88,7 +88,14 @@ func (s *server) pssPostHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
s.logger.Debugf("pss: postage batch issuer: %v", err)
s.logger.Error("pss: postage batch issue")
jsonhttp.BadRequest(w, "postage stamp issuer")
switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, "postage stamp issuer")
}
return
}
stamper := postage.NewStamper(i, s.signer)
......
......@@ -185,7 +185,7 @@ func TestPssSend(t *testing.T) {
mtx.Unlock()
return err
}
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10)))
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10, 1000)))
p = newMockPss(sendFn)
client, _, _ = newTestServer(t, testServerOptions{
Pss: p,
......
......@@ -139,7 +139,14 @@ func (s *server) socUploadHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
s.logger.Debugf("soc upload: postage batch issuer: %v", err)
s.logger.Error("soc upload: postage batch issue")
jsonhttp.BadRequest(w, "postage stamp issuer")
switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, "postage stamp issuer")
}
return
}
stamper := postage.NewStamper(i, s.signer)
......@@ -147,7 +154,12 @@ func (s *server) socUploadHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
s.logger.Debugf("soc upload: stamp: %v", err)
s.logger.Error("soc upload: stamp error")
jsonhttp.InternalServerError(w, "stamp error")
switch {
case errors.Is(err, postage.ErrBucketFull):
jsonhttp.PaymentRequired(w, "batch is overissued")
default:
jsonhttp.InternalServerError(w, "stamp error")
}
return
}
sch = sch.WithStamp(stamp)
......
......@@ -32,7 +32,7 @@ func TestSOC(t *testing.T) {
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10)))
mp = mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, 11, 10, 1000)))
mockStorer = mock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
......
......@@ -357,7 +357,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
return nil, fmt.Errorf("batchstore: %w", err)
}
validStamp := postage.ValidStamp(batchStore)
post, err := postage.NewService(stateStore, chainID)
post, err := postage.NewService(stateStore, batchStore, chainID)
if err != nil {
return nil, fmt.Errorf("postage service load: %w", err)
}
......
......@@ -5,6 +5,7 @@
package postage
var (
IndexToBytes = indexToBytes
BytesToIndex = bytesToIndex
IndexToBytes = indexToBytes
BytesToIndex = bytesToIndex
BlockThreshold = blockThreshold
)
......@@ -54,7 +54,7 @@ func (m *mockPostage) StampIssuers() []*postage.StampIssuer {
func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, error) {
if m.acceptAll {
return postage.NewStampIssuer("test fallback", "test identity", id, 24, 6), nil
return postage.NewStampIssuer("test fallback", "test identity", id, 24, 6, 1000), nil
}
if m.i != nil {
......@@ -64,6 +64,10 @@ func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, error) {
return nil, errors.New("stampissuer not found")
}
func (m *mockPostage) IssuerUsable(_ *postage.StampIssuer) bool {
return true
}
func (m *mockPostage) Close() error {
return nil
}
......@@ -192,6 +192,7 @@ func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.I
batchID,
depth,
createdEvent.BucketDepth,
ev.BlockNumber,
))
return createdEvent.BatchId[:], nil
......
......@@ -16,11 +16,16 @@ import (
const (
postagePrefix = "postage"
// blockThreshold is used to allow threshold no of blocks to be synced before a
// batch is usable.
blockThreshold = 10
)
var (
// ErrNotFound is the error returned when issuer with given batch ID does not exist.
ErrNotFound = errors.New("not found")
// ErrNotUsable is the error returned when issuer with given batch ID is not usable.
ErrNotUsable = errors.New("not usable")
)
// Service is the postage service interface.
......@@ -28,23 +33,26 @@ type Service interface {
Add(*StampIssuer)
StampIssuers() []*StampIssuer
GetStampIssuer([]byte) (*StampIssuer, error)
IssuerUsable(*StampIssuer) bool
io.Closer
}
// service handles postage batches
// stores the active batches.
type service struct {
lock sync.Mutex
store storage.StateStorer
chainID int64
issuers []*StampIssuer
lock sync.Mutex
store storage.StateStorer
postageStore Storer
chainID int64
issuers []*StampIssuer
}
// NewService constructs a new Service.
func NewService(store storage.StateStorer, chainID int64) (Service, error) {
func NewService(store storage.StateStorer, postageStore Storer, chainID int64) (Service, error) {
s := &service{
store: store,
chainID: chainID,
store: store,
postageStore: postageStore,
chainID: chainID,
}
n := 0
......@@ -79,12 +87,28 @@ func (ps *service) StampIssuers() []*StampIssuer {
return ps.issuers
}
func (ps *service) IssuerUsable(st *StampIssuer) bool {
cs := ps.postageStore.GetChainState()
// this checks atleast threshold blocks are seen on the blockchain after
// the batch creation, before we start using a stamp issuer. The threshold
// is meant to allow enough time for upstream peers to see the batch and
// hence validate the stamps issued
if cs.Block < st.blockNumber || (cs.Block-st.blockNumber) < blockThreshold {
return false
}
return true
}
// GetStampIssuer finds a stamp issuer by batch ID.
func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, error) {
ps.lock.Lock()
defer ps.lock.Unlock()
for _, st := range ps.issuers {
if bytes.Equal(batchID, st.batchID) {
if !ps.IssuerUsable(st) {
return nil, ErrNotUsable
}
return st, nil
}
}
......
......@@ -11,6 +11,8 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/postage"
pstoremock "github.com/ethersphere/bee/pkg/postage/batchstore/mock"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
storemock "github.com/ethersphere/bee/pkg/statestore/mock"
)
......@@ -18,13 +20,14 @@ import (
// with all the active stamp issuers.
func TestSaveLoad(t *testing.T) {
store := storemock.NewStateStore()
pstore := pstoremock.New()
saved := func(id int64) postage.Service {
ps, err := postage.NewService(store, id)
ps, err := postage.NewService(store, pstore, id)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 16; i++ {
ps.Add(newTestStampIssuer(t))
ps.Add(newTestStampIssuer(t, 1000))
}
if err := ps.Close(); err != nil {
t.Fatal(err)
......@@ -32,7 +35,7 @@ func TestSaveLoad(t *testing.T) {
return ps
}
loaded := func(id int64) postage.Service {
ps, err := postage.NewService(store, id)
ps, err := postage.NewService(store, pstore, id)
if err != nil {
t.Fatal(err)
}
......@@ -51,7 +54,13 @@ func TestSaveLoad(t *testing.T) {
func TestGetStampIssuer(t *testing.T) {
store := storemock.NewStateStore()
ps, err := postage.NewService(store, int64(0))
testChainState := postagetesting.NewChainState()
if testChainState.Block < uint64(postage.BlockThreshold) {
testChainState.Block += uint64(postage.BlockThreshold + 1)
}
validBlockNumber := testChainState.Block - uint64(postage.BlockThreshold+1)
pstore := pstoremock.New(pstoremock.WithChainState(testChainState))
ps, err := postage.NewService(store, pstore, int64(0))
if err != nil {
t.Fatal(err)
}
......@@ -66,10 +75,14 @@ func TestGetStampIssuer(t *testing.T) {
if i == 0 {
continue
}
ps.Add(postage.NewStampIssuer(string(id), "", id, 16, 8))
if i < 4 {
ps.Add(postage.NewStampIssuer(string(id), "", id, 16, 8, validBlockNumber))
} else {
ps.Add(postage.NewStampIssuer(string(id), "", id, 16, 8, validBlockNumber+uint64(i)))
}
}
t.Run("found", func(t *testing.T) {
for _, id := range ids[1:] {
for _, id := range ids[1:4] {
st, err := ps.GetStampIssuer(id)
if err != nil {
t.Fatalf("expected no error, got %v", err)
......@@ -85,4 +98,12 @@ func TestGetStampIssuer(t *testing.T) {
t.Fatalf("expected ErrNotFound, got %v", err)
}
})
t.Run("not usable", func(t *testing.T) {
for _, id := range ids[4:] {
_, err := ps.GetStampIssuer(id)
if err != postage.ErrNotUsable {
t.Fatalf("expected ErrNotUsable, got %v", err)
}
}
})
}
......@@ -74,7 +74,7 @@ func TestValidStamp(t *testing.T) {
b := postagetesting.MustNewBatch(postagetesting.WithOwner(owner))
bs := mock.New(mock.WithBatch(b))
signer := crypto.NewDefaultSigner(privKey)
issuer := postage.NewStampIssuer("label", "keyID", b.ID, b.Depth, b.BucketDepth)
issuer := postage.NewStampIssuer("label", "keyID", b.ID, b.Depth, b.BucketDepth, 1000)
stamper := postage.NewStamper(issuer, signer)
// this creates a chunk with a mocked stamp. ValidStamp will override this
......
......@@ -44,7 +44,7 @@ func TestStamperStamping(t *testing.T) {
// tests a valid stamp
t.Run("valid stamp", func(t *testing.T) {
st := newTestStampIssuer(t)
st := newTestStampIssuer(t, 1000)
stamper := postage.NewStamper(st, signer)
chunkAddr, stamp := createStamp(t, stamper)
if err := stamp.Valid(chunkAddr, owner, 12, 8, true); err != nil {
......@@ -54,7 +54,7 @@ func TestStamperStamping(t *testing.T) {
// tests that Stamps returns with postage.ErrBucketMismatch
t.Run("bucket mismatch", func(t *testing.T) {
st := newTestStampIssuer(t)
st := newTestStampIssuer(t, 1000)
stamper := postage.NewStamper(st, signer)
chunkAddr, stamp := createStamp(t, stamper)
a := chunkAddr.Bytes()
......@@ -66,7 +66,7 @@ func TestStamperStamping(t *testing.T) {
// tests that Stamps returns with postage.ErrInvalidIndex
t.Run("invalid index", func(t *testing.T) {
st := newTestStampIssuer(t)
st := newTestStampIssuer(t, 1000)
stamper := postage.NewStamper(st, signer)
// issue 1 stamp
chunkAddr, _ := createStamp(t, stamper)
......@@ -90,8 +90,8 @@ func TestStamperStamping(t *testing.T) {
// tests that Stamps returns with postage.ErrBucketFull iff
// issuer has the corresponding collision bucket filled]
t.Run("bucket full", func(t *testing.T) {
st := newTestStampIssuer(t)
st = postage.NewStampIssuer("", "", st.ID(), 12, 8)
st := newTestStampIssuer(t, 1000)
st = postage.NewStampIssuer("", "", st.ID(), 12, 8, 1000)
stamper := postage.NewStamper(st, signer)
// issue 1 stamp
chunkAddr, _ := createStamp(t, stamper)
......@@ -112,7 +112,7 @@ func TestStamperStamping(t *testing.T) {
// tests return with ErrOwnerMismatch
t.Run("owner mismatch", func(t *testing.T) {
owner[0] ^= 0xff // bitflip the owner first byte, this case must come last!
st := newTestStampIssuer(t)
st := newTestStampIssuer(t, 1000)
stamper := postage.NewStamper(st, signer)
chunkAddr, stamp := createStamp(t, stamper)
if err := stamp.Valid(chunkAddr, owner, 12, 8, true); !errors.Is(err, postage.ErrOwnerMismatch) {
......
......@@ -23,13 +23,14 @@ type StampIssuer struct {
mu sync.Mutex // Mutex for buckets.
buckets []uint32 // Collision buckets: counts per neighbourhoods (limited to 2^{batchdepth-bucketdepth}).
maxBucketCount uint32 // the count of the fullest bucket
blockNumber uint64 // blockNumber when this batch was created
}
// NewStampIssuer constructs a StampIssuer as an extension of a batch for local
// upload.
//
// bucketDepth must always be smaller than batchDepth otherwise inc() panics.
func NewStampIssuer(label, keyID string, batchID []byte, batchDepth, bucketDepth uint8) *StampIssuer {
func NewStampIssuer(label, keyID string, batchID []byte, batchDepth, bucketDepth uint8, blockNumber uint64) *StampIssuer {
return &StampIssuer{
label: label,
keyID: keyID,
......@@ -37,6 +38,7 @@ func NewStampIssuer(label, keyID string, batchID []byte, batchDepth, bucketDepth
batchDepth: batchDepth,
bucketDepth: bucketDepth,
buckets: make([]uint32, 1<<bucketDepth),
blockNumber: blockNumber,
}
}
......@@ -87,9 +89,9 @@ func (st *StampIssuer) Label() string {
}
// MarshalBinary gives the byte slice serialisation of a StampIssuer:
// = label[32]|keyID[32]|batchID[32]|batchDepth[1]|bucketDepth[1]|size_0[4]|size_1[4]|....
// = label[32]|keyID[32]|batchID[32]|batchDepth[1]|bucketDepth[1]|blockNumber[8]|size_0[4]|size_1[4]|....
func (st *StampIssuer) MarshalBinary() ([]byte, error) {
buf := make([]byte, 32+32+32+1+1+4*(1<<st.bucketDepth))
buf := make([]byte, 32+32+32+1+1+8+4*(1<<st.bucketDepth))
label := []byte(st.label)
copy(buf[32-len(label):32], label)
keyID := []byte(st.keyID)
......@@ -97,10 +99,11 @@ func (st *StampIssuer) MarshalBinary() ([]byte, error) {
copy(buf[64:96], st.batchID)
buf[96] = st.batchDepth
buf[97] = st.bucketDepth
binary.BigEndian.PutUint64(buf[98:106], st.blockNumber)
st.mu.Lock()
defer st.mu.Unlock()
for i, addr := range st.buckets {
offset := 98 + i*4
offset := 106 + i*4
binary.BigEndian.PutUint32(buf[offset:offset+4], addr)
}
return buf, nil
......@@ -113,10 +116,11 @@ func (st *StampIssuer) UnmarshalBinary(buf []byte) error {
st.batchID = buf[64:96]
st.batchDepth = buf[96]
st.bucketDepth = buf[97]
st.blockNumber = binary.BigEndian.Uint64(buf[98:106])
st.buckets = make([]uint32, 1<<st.bucketDepth)
// not using lock as unmarshal is init
for i := range st.buckets {
offset := 98 + i*4
offset := 106 + i*4
st.buckets[i] = binary.BigEndian.Uint32(buf[offset : offset+4])
}
return nil
......
......@@ -15,7 +15,7 @@ import (
// TestStampIssuerMarshalling tests the idempotence of binary marshal/unmarshal.
func TestStampIssuerMarshalling(t *testing.T) {
st := newTestStampIssuer(t)
st := newTestStampIssuer(t, 1000)
buf, err := st.MarshalBinary()
if err != nil {
t.Fatal(err)
......@@ -30,12 +30,12 @@ func TestStampIssuerMarshalling(t *testing.T) {
}
}
func newTestStampIssuer(t *testing.T) *postage.StampIssuer {
func newTestStampIssuer(t *testing.T, block uint64) *postage.StampIssuer {
t.Helper()
id := make([]byte, 32)
_, err := io.ReadFull(crand.Reader, id)
if err != nil {
t.Fatal(err)
}
return postage.NewStampIssuer("label", "keyID", id, 12, 8)
return postage.NewStampIssuer("label", "keyID", id, 12, 8, block)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment