Commit ebbaf404 authored by aloknerurkar's avatar aloknerurkar Committed by GitHub

fix: recover stampissuer on batch creation (#2080)

parent f5a460b6
...@@ -658,6 +658,8 @@ paths: ...@@ -658,6 +658,8 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/400" $ref: "SwarmCommon.yaml#/components/responses/400"
"401": "401":
$ref: "SwarmCommon.yaml#/components/responses/401" $ref: "SwarmCommon.yaml#/components/responses/401"
"402":
$ref: "SwarmCommon.yaml#/components/responses/402"
"500": "500":
$ref: "SwarmCommon.yaml#/components/responses/500" $ref: "SwarmCommon.yaml#/components/responses/500"
default: default:
...@@ -700,6 +702,8 @@ paths: ...@@ -700,6 +702,8 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/400" $ref: "SwarmCommon.yaml#/components/responses/400"
"401": "401":
$ref: "SwarmCommon.yaml#/components/responses/401" $ref: "SwarmCommon.yaml#/components/responses/401"
"402":
$ref: "SwarmCommon.yaml#/components/responses/402"
"500": "500":
$ref: "SwarmCommon.yaml#/components/responses/500" $ref: "SwarmCommon.yaml#/components/responses/500"
default: default:
......
...@@ -97,7 +97,14 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -97,7 +97,14 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
s.logger.Debugf("chunk upload: putter:%v", err) s.logger.Debugf("chunk upload: putter:%v", err)
s.logger.Error("chunk upload: putter") s.logger.Error("chunk upload: putter")
jsonhttp.BadRequest(w, nil) switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, nil)
}
return return
} }
......
...@@ -387,7 +387,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -387,7 +387,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b}) eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b})
b.listenerCloser = eventListener b.listenerCloser = eventListener
batchSvc = batchservice.New(stateStore, batchStore, logger, eventListener) batchSvc = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post)
erc20Address, err := postagecontract.LookupERC20Address(p2pCtx, transactionService, postageContractAddress) erc20Address, err := postagecontract.LookupERC20Address(p2pCtx, transactionService, postageContractAddress)
if err != nil { if err != nil {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package batchservice package batchservice
import ( import (
"bytes"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
...@@ -18,10 +19,12 @@ import ( ...@@ -18,10 +19,12 @@ import (
const dirtyDBKey = "batchservice_dirty_db" const dirtyDBKey = "batchservice_dirty_db"
type batchService struct { type batchService struct {
stateStore storage.StateStorer stateStore storage.StateStorer
storer postage.Storer storer postage.Storer
logger logging.Logger logger logging.Logger
listener postage.Listener listener postage.Listener
owner []byte
batchListener postage.BatchCreationListener
} }
type Interface interface { type Interface interface {
...@@ -29,8 +32,15 @@ type Interface interface { ...@@ -29,8 +32,15 @@ type Interface interface {
} }
// New will create a new BatchService. // New will create a new BatchService.
func New(stateStore storage.StateStorer, storer postage.Storer, logger logging.Logger, listener postage.Listener) Interface { func New(
return &batchService{stateStore, storer, logger, listener} stateStore storage.StateStorer,
storer postage.Storer,
logger logging.Logger,
listener postage.Listener,
owner []byte,
batchListener postage.BatchCreationListener,
) Interface {
return &batchService{stateStore, storer, logger, listener, owner, batchListener}
} }
// Create will create a new batch with the given ID, owner value and depth and // Create will create a new batch with the given ID, owner value and depth and
...@@ -51,6 +61,10 @@ func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, de ...@@ -51,6 +61,10 @@ func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, de
return fmt.Errorf("put: %w", err) return fmt.Errorf("put: %w", err)
} }
if bytes.Equal(svc.owner, owner) && svc.batchListener != nil {
svc.batchListener.Handle(b)
}
svc.logger.Debugf("batch service: created batch id %s", hex.EncodeToString(b.ID)) svc.logger.Debugf("batch service: created batch id %s", hex.EncodeToString(b.ID))
return nil return nil
} }
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"math/rand"
"testing" "testing"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
...@@ -35,12 +36,23 @@ func newMockListener() *mockListener { ...@@ -35,12 +36,23 @@ func newMockListener() *mockListener {
return &mockListener{} return &mockListener{}
} }
type mockBatchCreationHandler struct {
count int
}
func (m *mockBatchCreationHandler) Handle(b *postage.Batch) {
m.count++
}
func TestBatchServiceCreate(t *testing.T) { func TestBatchServiceCreate(t *testing.T) {
testBatch := postagetesting.MustNewBatch()
testChainState := postagetesting.NewChainState() testChainState := postagetesting.NewChainState()
t.Run("expect put create put error", func(t *testing.T) { t.Run("expect put create put error", func(t *testing.T) {
svc, _, _ := newTestStoreAndService( testBatch := postagetesting.MustNewBatch()
testBatchListener := &mockBatchCreationHandler{}
svc, _, _ := newTestStoreAndServiceWithListener(
testBatch.Owner,
testBatchListener,
mock.WithChainState(testChainState), mock.WithChainState(testChainState),
mock.WithPutErr(errTest, 0), mock.WithPutErr(errTest, 0),
) )
...@@ -55,25 +67,13 @@ func TestBatchServiceCreate(t *testing.T) { ...@@ -55,25 +67,13 @@ func TestBatchServiceCreate(t *testing.T) {
); err == nil { ); err == nil {
t.Fatalf("expected error") t.Fatalf("expected error")
} }
}) if testBatchListener.count != 0 {
t.Fatalf("unexpected batch listener count, exp %d found %d", 0, testBatchListener.count)
t.Run("passes", func(t *testing.T) {
svc, batchStore, _ := newTestStoreAndService(
mock.WithChainState(testChainState),
)
if err := svc.Create(
testBatch.ID,
testBatch.Owner,
testBatch.Value,
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
); err != nil {
t.Fatalf("got error %v", err)
} }
})
got, err := batchStore.Get(testBatch.ID) validateBatch := func(t *testing.T, testBatch *postage.Batch, st *mock.BatchStore) {
got, err := st.Get(testBatch.ID)
if err != nil { if err != nil {
t.Fatalf("batch store get: %v", err) t.Fatalf("batch store get: %v", err)
} }
...@@ -99,8 +99,63 @@ func TestBatchServiceCreate(t *testing.T) { ...@@ -99,8 +99,63 @@ func TestBatchServiceCreate(t *testing.T) {
if got.Start != testChainState.Block { if got.Start != testChainState.Block {
t.Fatalf("batch start block different form chain state: want %v, got %v", got.Start, testChainState.Block) t.Fatalf("batch start block different form chain state: want %v, got %v", got.Start, testChainState.Block)
} }
}
t.Run("passes", func(t *testing.T) {
testBatch := postagetesting.MustNewBatch()
testBatchListener := &mockBatchCreationHandler{}
svc, batchStore, _ := newTestStoreAndServiceWithListener(
testBatch.Owner,
testBatchListener,
mock.WithChainState(testChainState),
)
if err := svc.Create(
testBatch.ID,
testBatch.Owner,
testBatch.Value,
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
); err != nil {
t.Fatalf("got error %v", err)
}
if testBatchListener.count != 1 {
t.Fatalf("unexpected batch listener count, exp %d found %d", 1, testBatchListener.count)
}
validateBatch(t, testBatch, batchStore)
}) })
t.Run("passes without recovery", func(t *testing.T) {
testBatch := postagetesting.MustNewBatch()
testBatchListener := &mockBatchCreationHandler{}
// create a owner different from the batch owner
owner := make([]byte, 32)
rand.Read(owner)
svc, batchStore, _ := newTestStoreAndServiceWithListener(
owner,
testBatchListener,
mock.WithChainState(testChainState),
)
if err := svc.Create(
testBatch.ID,
testBatch.Owner,
testBatch.Value,
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
); err != nil {
t.Fatalf("got error %v", err)
}
if testBatchListener.count != 0 {
t.Fatalf("unexpected batch listener count, exp %d found %d", 1, testBatchListener.count)
}
validateBatch(t, testBatch, batchStore)
})
} }
func TestBatchServiceTopUp(t *testing.T) { func TestBatchServiceTopUp(t *testing.T) {
...@@ -263,7 +318,7 @@ func TestTransactionOk(t *testing.T) { ...@@ -263,7 +318,7 @@ func TestTransactionOk(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
svc2 := batchservice.New(s, store, testLog, newMockListener()) svc2 := batchservice.New(s, store, testLog, newMockListener(), nil, nil)
if _, err := svc2.Start(10); err != nil { if _, err := svc2.Start(10); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -283,7 +338,7 @@ func TestTransactionFail(t *testing.T) { ...@@ -283,7 +338,7 @@ func TestTransactionFail(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
svc2 := batchservice.New(s, store, testLog, newMockListener()) svc2 := batchservice.New(s, store, testLog, newMockListener(), nil, nil)
if _, err := svc2.Start(10); err != nil { if _, err := svc2.Start(10); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -292,13 +347,21 @@ func TestTransactionFail(t *testing.T) { ...@@ -292,13 +347,21 @@ func TestTransactionFail(t *testing.T) {
t.Fatalf("expect %d reset calls got %d", 1, c) t.Fatalf("expect %d reset calls got %d", 1, c)
} }
} }
func newTestStoreAndService(opts ...mock.Option) (postage.EventUpdater, *mock.BatchStore, storage.StateStorer) { func newTestStoreAndServiceWithListener(
owner []byte,
batchListener postage.BatchCreationListener,
opts ...mock.Option,
) (postage.EventUpdater, *mock.BatchStore, storage.StateStorer) {
s := mocks.NewStateStore() s := mocks.NewStateStore()
store := mock.New(opts...) store := mock.New(opts...)
svc := batchservice.New(s, store, testLog, newMockListener()) svc := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener)
return svc, store, s return svc, store, s
} }
func newTestStoreAndService(opts ...mock.Option) (postage.EventUpdater, *mock.BatchStore, storage.StateStorer) {
return newTestStoreAndServiceWithListener(nil, nil, opts...)
}
func putBatch(t *testing.T, store postage.Storer, b *postage.Batch) { func putBatch(t *testing.T, store postage.Storer, b *postage.Batch) {
t.Helper() t.Helper()
......
...@@ -45,3 +45,7 @@ type Listener interface { ...@@ -45,3 +45,7 @@ type Listener interface {
io.Closer io.Closer
Listen(from uint64, updater EventUpdater) <-chan struct{} Listen(from uint64, updater EventUpdater) <-chan struct{}
} }
type BatchCreationListener interface {
Handle(*Batch)
}
...@@ -69,6 +69,8 @@ func (m *mockPostage) IssuerUsable(_ *postage.StampIssuer) bool { ...@@ -69,6 +69,8 @@ func (m *mockPostage) IssuerUsable(_ *postage.StampIssuer) bool {
return true return true
} }
func (m *mockPostage) Handle(_ *postage.Batch) {}
func (m *mockPostage) Close() error { func (m *mockPostage) Close() error {
return nil return nil
} }
...@@ -34,6 +34,7 @@ type Service interface { ...@@ -34,6 +34,7 @@ type Service interface {
StampIssuers() []*StampIssuer StampIssuers() []*StampIssuer
GetStampIssuer([]byte) (*StampIssuer, error) GetStampIssuer([]byte) (*StampIssuer, error)
IssuerUsable(*StampIssuer) bool IssuerUsable(*StampIssuer) bool
BatchCreationListener
io.Closer io.Closer
} }
...@@ -80,6 +81,23 @@ func (ps *service) Add(st *StampIssuer) { ...@@ -80,6 +81,23 @@ func (ps *service) Add(st *StampIssuer) {
ps.issuers = append(ps.issuers, st) ps.issuers = append(ps.issuers, st)
} }
// Handle implements the BatchCreationListener interface. This is fired on receiving
// a batch creation event from the blockchain listener to ensure that if a stamp
// issuer was not created initially, we will create it here.
func (ps *service) Handle(b *Batch) {
_, err := ps.GetStampIssuer(b.ID)
if errors.Is(err, ErrNotFound) {
ps.Add(NewStampIssuer(
"recovered",
string(b.Owner),
b.ID,
b.Depth,
b.BucketDepth,
b.Start,
))
}
}
// StampIssuers returns the currently active stamp issuers. // StampIssuers returns the currently active stamp issuers.
func (ps *service) StampIssuers() []*StampIssuer { func (ps *service) StampIssuers() []*StampIssuer {
ps.lock.Lock() ps.lock.Lock()
......
...@@ -83,6 +83,9 @@ func TestGetStampIssuer(t *testing.T) { ...@@ -83,6 +83,9 @@ func TestGetStampIssuer(t *testing.T) {
} }
ps.Add(postage.NewStampIssuer(string(id), "", id, big.NewInt(3), 16, 8, validBlockNumber+shift, true)) ps.Add(postage.NewStampIssuer(string(id), "", id, big.NewInt(3), 16, 8, validBlockNumber+shift, true))
} }
b := postagetesting.MustNewBatch()
b.Start = validBlockNumber
ps.Handle(b)
t.Run("found", func(t *testing.T) { t.Run("found", func(t *testing.T) {
for _, id := range ids[1:4] { for _, id := range ids[1:4] {
st, err := ps.GetStampIssuer(id) st, err := ps.GetStampIssuer(id)
...@@ -108,4 +111,13 @@ func TestGetStampIssuer(t *testing.T) { ...@@ -108,4 +111,13 @@ func TestGetStampIssuer(t *testing.T) {
} }
} }
}) })
t.Run("recovered", func(t *testing.T) {
st, err := ps.GetStampIssuer(b.ID)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if st.Label() != "recovered" {
t.Fatal("wrong issuer returned")
}
})
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment