Commit 6583c351 authored by acud's avatar acud Committed by GitHub

feat: transact batchstore (#1793)

parent cd0fb0af
......@@ -40,13 +40,20 @@ func (db *DB) UnreserveBatch(id []byte, radius uint8) error {
oldRadius := i.Radius
var gcSizeChange int64 // number to add or subtract from gcSize
unpin := func(item shed.Item) (stop bool, err error) {
c, err := db.setUnpin(batch, swarm.NewAddress(item.Address))
addr := swarm.NewAddress(item.Address)
c, err := db.setUnpin(batch, addr)
if err != nil {
return false, fmt.Errorf("unpin: %w", err)
if !errors.Is(err, leveldb.ErrNotFound) {
return false, fmt.Errorf("unpin: %w", err)
} else {
// this is possible when we are resyncing chain data after
// a dirty shutdown
db.logger.Tracef("unreserve set unpin chunk %s: %v", addr.String(), err)
}
}
gcSizeChange += c
return false, err
return false, nil
}
// iterate over chunk in bins
......
......@@ -364,7 +364,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime)
b.listenerCloser = eventListener
batchSvc = batchservice.New(batchStore, logger, eventListener)
batchSvc = batchservice.New(stateStore, batchStore, logger, eventListener)
erc20Address, err := postagecontract.LookupERC20Address(p2pCtx, transactionService, postageContractAddress)
if err != nil {
......@@ -438,7 +438,10 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
batchStore.SetRadiusSetter(kad)
if batchSvc != nil {
syncedChan := batchSvc.Start(postageSyncStart)
syncedChan, err := batchSvc.Start(postageSyncStart)
if err != nil {
return nil, fmt.Errorf("unable to start batch service: %w", err)
}
// wait for the postage contract listener to sync
logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")
......
......@@ -6,17 +6,22 @@ package batchservice
import (
"encoding/hex"
"errors"
"fmt"
"math/big"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
)
const dirtyDBKey = "batchservice_dirty_db"
type batchService struct {
storer postage.Storer
logger logging.Logger
listener postage.Listener
stateStore storage.StateStorer
storer postage.Storer
logger logging.Logger
listener postage.Listener
}
type Interface interface {
......@@ -24,8 +29,8 @@ type Interface interface {
}
// New will create a new BatchService.
func New(storer postage.Storer, logger logging.Logger, listener postage.Listener) Interface {
return &batchService{storer, logger, listener}
func New(stateStore storage.StateStorer, storer postage.Storer, logger logging.Logger, listener postage.Listener) Interface {
return &batchService{stateStore, storer, logger, listener}
}
// Create will create a new batch with the given ID, owner value and depth and
......@@ -110,11 +115,33 @@ func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error {
svc.logger.Debugf("batch service: updated block height to %d", blockNumber)
return nil
}
func (svc *batchService) TransactionStart() error {
return svc.stateStore.Put(dirtyDBKey, true)
}
func (svc *batchService) TransactionEnd() error {
return svc.stateStore.Delete(dirtyDBKey)
}
func (svc *batchService) Start(startBlock uint64) (<-chan struct{}, error) {
dirty := false
err := svc.stateStore.Get(dirtyDBKey, &dirty)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
if dirty {
svc.logger.Warning("batch service: dirty shutdown detected, resetting batch store")
if err := svc.storer.Reset(); err != nil {
return nil, err
}
if err := svc.stateStore.Delete(dirtyDBKey); err != nil {
return nil, err
}
svc.logger.Warning("batch service: batch store reset. your node will now resync chain data")
}
func (svc *batchService) Start(startBlock uint64) <-chan struct{} {
cs := svc.storer.GetChainState()
if cs.Block > startBlock {
startBlock = cs.Block
}
return svc.listener.Listen(startBlock+1, svc)
return svc.listener.Listen(startBlock+1, svc), nil
}
......@@ -16,6 +16,8 @@ import (
"github.com/ethersphere/bee/pkg/postage/batchservice"
"github.com/ethersphere/bee/pkg/postage/batchstore/mock"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
mocks "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
)
var (
......@@ -38,7 +40,7 @@ func TestBatchServiceCreate(t *testing.T) {
testChainState := postagetesting.NewChainState()
t.Run("expect put create put error", func(t *testing.T) {
svc, _ := newTestStoreAndService(
svc, _, _ := newTestStoreAndService(
mock.WithChainState(testChainState),
mock.WithPutErr(errTest, 0),
)
......@@ -54,7 +56,7 @@ func TestBatchServiceCreate(t *testing.T) {
})
t.Run("passes", func(t *testing.T) {
svc, batchStore := newTestStoreAndService(
svc, batchStore, _ := newTestStoreAndService(
mock.WithChainState(testChainState),
)
......@@ -96,7 +98,7 @@ func TestBatchServiceTopUp(t *testing.T) {
testNormalisedBalance := big.NewInt(2000000000000)
t.Run("expect get error", func(t *testing.T) {
svc, _ := newTestStoreAndService(
svc, _, _ := newTestStoreAndService(
mock.WithGetErr(errTest, 0),
)
......@@ -106,7 +108,7 @@ func TestBatchServiceTopUp(t *testing.T) {
})
t.Run("expect put error", func(t *testing.T) {
svc, batchStore := newTestStoreAndService(
svc, batchStore, _ := newTestStoreAndService(
mock.WithPutErr(errTest, 1),
)
putBatch(t, batchStore, testBatch)
......@@ -117,7 +119,7 @@ func TestBatchServiceTopUp(t *testing.T) {
})
t.Run("passes", func(t *testing.T) {
svc, batchStore := newTestStoreAndService()
svc, batchStore, _ := newTestStoreAndService()
putBatch(t, batchStore, testBatch)
want := testNormalisedBalance
......@@ -143,7 +145,7 @@ func TestBatchServiceUpdateDepth(t *testing.T) {
testBatch := postagetesting.MustNewBatch()
t.Run("expect get error", func(t *testing.T) {
svc, _ := newTestStoreAndService(
svc, _, _ := newTestStoreAndService(
mock.WithGetErr(errTest, 0),
)
......@@ -153,7 +155,7 @@ func TestBatchServiceUpdateDepth(t *testing.T) {
})
t.Run("expect put error", func(t *testing.T) {
svc, batchStore := newTestStoreAndService(
svc, batchStore, _ := newTestStoreAndService(
mock.WithPutErr(errTest, 1),
)
putBatch(t, batchStore, testBatch)
......@@ -164,7 +166,7 @@ func TestBatchServiceUpdateDepth(t *testing.T) {
})
t.Run("passes", func(t *testing.T) {
svc, batchStore := newTestStoreAndService()
svc, batchStore, _ := newTestStoreAndService()
putBatch(t, batchStore, testBatch)
if err := svc.UpdateDepth(testBatch.ID, testNewDepth, testNormalisedBalance); err != nil {
......@@ -188,7 +190,7 @@ func TestBatchServiceUpdatePrice(t *testing.T) {
testNewPrice := big.NewInt(20000000)
t.Run("expect put error", func(t *testing.T) {
svc, batchStore := newTestStoreAndService(
svc, batchStore, _ := newTestStoreAndService(
mock.WithChainState(testChainState),
mock.WithPutErr(errTest, 1),
)
......@@ -200,7 +202,7 @@ func TestBatchServiceUpdatePrice(t *testing.T) {
})
t.Run("passes", func(t *testing.T) {
svc, batchStore := newTestStoreAndService(
svc, batchStore, _ := newTestStoreAndService(
mock.WithChainState(testChainState),
)
......@@ -220,7 +222,7 @@ func TestBatchServiceUpdateBlockNumber(t *testing.T) {
CurrentPrice: big.NewInt(100),
TotalAmount: big.NewInt(100),
}
svc, batchStore := newTestStoreAndService(
svc, batchStore, _ := newTestStoreAndService(
mock.WithChainState(testChainState),
)
......@@ -237,10 +239,54 @@ func TestBatchServiceUpdateBlockNumber(t *testing.T) {
}
}
func newTestStoreAndService(opts ...mock.Option) (postage.EventUpdater, postage.Storer) {
func TestTransactionOk(t *testing.T) {
svc, store, s := newTestStoreAndService()
if _, err := svc.Start(10); err != nil {
t.Fatal(err)
}
if err := svc.TransactionStart(); err != nil {
t.Fatal(err)
}
if err := svc.TransactionEnd(); err != nil {
t.Fatal(err)
}
svc2 := batchservice.New(s, store, testLog, newMockListener())
if _, err := svc2.Start(10); err != nil {
t.Fatal(err)
}
if c := store.ResetCalls(); c != 0 {
t.Fatalf("expect %d reset calls got %d", 0, c)
}
}
func TestTransactionFail(t *testing.T) {
svc, store, s := newTestStoreAndService()
if _, err := svc.Start(10); err != nil {
t.Fatal(err)
}
if err := svc.TransactionStart(); err != nil {
t.Fatal(err)
}
svc2 := batchservice.New(s, store, testLog, newMockListener())
if _, err := svc2.Start(10); err != nil {
t.Fatal(err)
}
if c := store.ResetCalls(); c != 1 {
t.Fatalf("expect %d reset calls got %d", 1, c)
}
}
func newTestStoreAndService(opts ...mock.Option) (postage.EventUpdater, *mock.BatchStore, storage.StateStorer) {
s := mocks.NewStateStore()
store := mock.New(opts...)
svc := batchservice.New(store, testLog, newMockListener())
return svc, store
svc := batchservice.New(s, store, testLog, newMockListener())
return svc, store, s
}
func putBatch(t *testing.T, store postage.Storer, b *postage.Batch) {
......
......@@ -24,6 +24,7 @@ type BatchStore struct {
getErrDelayCnt int
putErr error
putErrDelayCnt int
resetCallCount int
}
// Option is a an option passed to New
......@@ -134,3 +135,12 @@ func (bs *BatchStore) GetReserveState() *postage.ReserveState {
func (bs *BatchStore) SetRadiusSetter(r postage.RadiusSetter) {
panic("not implemented")
}
func (bs *BatchStore) Reset() error {
bs.resetCallCount++
return nil
}
func (bs *BatchStore) ResetCalls() int {
return bs.resetCallCount
}
......@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"math/big"
"strings"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
......@@ -165,6 +166,32 @@ func (s *store) SetRadiusSetter(r postage.RadiusSetter) {
s.radiusSetter = r
}
func (s *store) Reset() error {
prefix := "batchstore_"
if err := s.store.Iterate(prefix, func(k, _ []byte) (bool, error) {
if strings.HasPrefix(string(k), prefix) {
if err := s.store.Delete(string(k)); err != nil {
return false, err
}
}
return false, nil
}); err != nil {
return err
}
s.cs = &postage.ChainState{
Block: 0,
TotalAmount: big.NewInt(0),
CurrentPrice: big.NewInt(0),
}
s.rs = &reserveState{
Radius: DefaultDepth,
Inner: big.NewInt(0),
Outer: big.NewInt(0),
Available: Capacity,
}
return nil
}
// batchKey returns the index key for the batch ID used in the by-ID batch index.
func batchKey(id []byte) string {
return batchKeyPrefix + string(id)
......
......@@ -5,11 +5,15 @@
package batchstore_test
import (
"io/ioutil"
"math/big"
"testing"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/batchstore"
postagetest "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/statestore/leveldb"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
)
......@@ -69,6 +73,49 @@ func TestBatchStorePutChainState(t *testing.T) {
postagetest.CompareChainState(t, testChainState, &got)
}
func TestBatchStoreReset(t *testing.T) {
testChainState := postagetest.NewChainState()
testBatch := postagetest.MustNewBatch()
path := t.TempDir()
logger := logging.New(ioutil.Discard, 0)
// we use the real statestore since the mock uses a mutex,
// therefore deleting while iterating (in Reset() implementation)
// leads to a deadlock.
stateStore, err := leveldb.NewStateStore(path, logger)
if err != nil {
t.Fatal(err)
}
defer stateStore.Close()
batchStore, _ := batchstore.New(stateStore, func([]byte, uint8) error { return nil })
batchStore.SetRadiusSetter(noopRadiusSetter{})
err = batchStore.Put(testBatch, big.NewInt(15), 8)
if err != nil {
t.Fatal(err)
}
err = batchStore.PutChainState(testChainState)
if err != nil {
t.Fatal(err)
}
err = batchStore.Reset()
if err != nil {
t.Fatal(err)
}
c := 0
_ = stateStore.Iterate("", func(k, _ []byte) (bool, error) {
c++
return false, nil
})
// we expect one key in the statestore since the schema name
// will always be there.
if c != 1 {
t.Fatalf("expected only one key in statestore, got %d", c)
}
}
func stateStoreGet(t *testing.T, st storage.StateStorer, k string, v interface{}) {
if err := st.Get(k, v); err != nil {
t.Fatalf("store get batch: %v", err)
......
......@@ -17,7 +17,10 @@ type EventUpdater interface {
UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error
UpdatePrice(price *big.Int) error
UpdateBlockNumber(blockNumber uint64) error
Start(startBlock uint64) <-chan struct{}
Start(startBlock uint64) (<-chan struct{}, error)
TransactionStart() error
TransactionEnd() error
}
// Storer represents the persistence layer for batches on the current (highest
......@@ -29,6 +32,8 @@ type Storer interface {
GetChainState() *ChainState
GetReserveState() *ReserveState
SetRadiusSetter(RadiusSetter)
Reset() error
}
type RadiusSetter interface {
......
......@@ -209,6 +209,10 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
return err
}
if err := updater.TransactionStart(); err != nil {
return err
}
for _, e := range events {
startEv := time.Now()
err = updater.UpdateBlockNumber(e.BlockNumber)
......@@ -226,6 +230,10 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
return err
}
if err := updater.TransactionEnd(); err != nil {
return err
}
from = to + 1
totalTimeMetric(l.metrics.PageProcessDuration, start)
l.metrics.PagesProcessed.Inc()
......
......@@ -300,7 +300,9 @@ func (u *updater) UpdateBlockNumber(blockNumber uint64) error {
return nil
}
func (u *updater) Start(_ uint64) <-chan struct{} { return nil }
func (u *updater) Start(_ uint64) (<-chan struct{}, error) { return nil, nil }
func (u *updater) TransactionStart() error { return nil }
func (u *updater) TransactionEnd() error { return nil }
type mockFilterer struct {
filterLogEvents []types.Log
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment