Commit a7789208 authored by acud's avatar acud Committed by GitHub

feat: batchstore checksums (#2227)

parent 95f7bb17
......@@ -75,6 +75,7 @@ import (
"github.com/hashicorp/go-multierror"
ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
"golang.org/x/sync/errgroup"
)
......@@ -432,7 +433,10 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b})
b.listenerCloser = eventListener
batchSvc = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post)
batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256)
if err != nil {
return nil, err
}
erc20Address, err := postagecontract.LookupERC20Address(p2pCtx, transactionService, postageContractAddress)
if err != nil {
......
......@@ -9,14 +9,19 @@ import (
"encoding/hex"
"errors"
"fmt"
"hash"
"math/big"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
"golang.org/x/crypto/sha3"
)
const dirtyDBKey = "batchservice_dirty_db"
const (
dirtyDBKey = "batchservice_dirty_db"
checksumDBKey = "batchservice_checksum"
)
type batchService struct {
stateStore storage.StateStorer
......@@ -25,6 +30,8 @@ type batchService struct {
listener postage.Listener
owner []byte
batchListener postage.BatchCreationListener
checksum hash.Hash // checksum hasher
}
type Interface interface {
......@@ -39,13 +46,40 @@ func New(
listener postage.Listener,
owner []byte,
batchListener postage.BatchCreationListener,
) Interface {
return &batchService{stateStore, storer, logger, listener, owner, batchListener}
checksumFunc func() hash.Hash,
) (Interface, error) {
if checksumFunc == nil {
checksumFunc = sha3.New256
}
var (
b string
sum = checksumFunc()
)
if err := stateStore.Get(checksumDBKey, &b); err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
} else {
s, err := hex.DecodeString(b)
if err != nil {
return nil, err
}
n, err := sum.Write(s)
if err != nil {
return nil, err
}
if n != len(s) {
return nil, errors.New("batchstore checksum init")
}
}
return &batchService{stateStore, storer, logger, listener, owner, batchListener, sum}, nil
}
// Create will create a new batch with the given ID, owner value and depth and
// stores it in the BatchStore.
func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool) error {
func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash []byte) error {
b := &postage.Batch{
ID: id,
Owner: owner,
......@@ -64,14 +98,18 @@ func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, de
if bytes.Equal(svc.owner, owner) && svc.batchListener != nil {
svc.batchListener.Handle(b)
}
cs, err := svc.updateChecksum(txHash)
if err != nil {
return fmt.Errorf("update checksum: %w", err)
}
svc.logger.Debugf("batch service: created batch id %s", hex.EncodeToString(b.ID))
svc.logger.Debugf("batch service: created batch id %s, tx %x, checksum %x", hex.EncodeToString(b.ID), txHash, cs)
return nil
}
// TopUp implements the EventUpdater interface. It tops ups a batch with the
// given ID with the given amount.
func (svc *batchService) TopUp(id []byte, normalisedBalance *big.Int) error {
func (svc *batchService) TopUp(id []byte, normalisedBalance *big.Int, txHash []byte) error {
b, err := svc.storer.Get(id)
if err != nil {
return fmt.Errorf("get: %w", err)
......@@ -81,14 +119,18 @@ func (svc *batchService) TopUp(id []byte, normalisedBalance *big.Int) error {
if err != nil {
return fmt.Errorf("put: %w", err)
}
cs, err := svc.updateChecksum(txHash)
if err != nil {
return fmt.Errorf("update checksum: %w", err)
}
svc.logger.Debugf("batch service: topped up batch id %s from %v to %v", hex.EncodeToString(b.ID), b.Value, normalisedBalance)
svc.logger.Debugf("batch service: topped up batch id %s from %v to %v, tx %x, checksum %x", hex.EncodeToString(b.ID), b.Value, normalisedBalance, txHash, cs)
return nil
}
// UpdateDepth implements the EventUpdater inteface. It sets the new depth of a
// batch with the given ID.
func (svc *batchService) UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error {
func (svc *batchService) UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int, txHash []byte) error {
b, err := svc.storer.Get(id)
if err != nil {
return fmt.Errorf("get: %w", err)
......@@ -97,21 +139,30 @@ func (svc *batchService) UpdateDepth(id []byte, depth uint8, normalisedBalance *
if err != nil {
return fmt.Errorf("put: %w", err)
}
cs, err := svc.updateChecksum(txHash)
if err != nil {
return fmt.Errorf("update checksum: %w", err)
}
svc.logger.Debugf("batch service: updated depth of batch id %s from %d to %d", hex.EncodeToString(b.ID), b.Depth, depth)
svc.logger.Debugf("batch service: updated depth of batch id %s from %d to %d, tx %x, checksum %x", hex.EncodeToString(b.ID), b.Depth, depth, txHash, cs)
return nil
}
// UpdatePrice implements the EventUpdater interface. It sets the current
// price from the chain in the service chain state.
func (svc *batchService) UpdatePrice(price *big.Int) error {
func (svc *batchService) UpdatePrice(price *big.Int, txHash []byte) error {
cs := svc.storer.GetChainState()
cs.CurrentPrice = price
if err := svc.storer.PutChainState(cs); err != nil {
return fmt.Errorf("put chain state: %w", err)
}
svc.logger.Debugf("batch service: updated chain price to %s", price)
sum, err := svc.updateChecksum(txHash)
if err != nil {
return fmt.Errorf("update checksum: %w", err)
}
svc.logger.Debugf("batch service: updated chain price to %s, tx %x, checksum %x", price, txHash, sum)
return nil
}
......@@ -161,3 +212,29 @@ func (svc *batchService) Start(startBlock uint64) (<-chan struct{}, error) {
}
return svc.listener.Listen(startBlock+1, svc), nil
}
// updateChecksum updates the batchservice checksum once an event gets
// processed. It swaps the existing checksum which is in the hasher
// with the new checksum and persists it in the statestore.
func (svc *batchService) updateChecksum(txHash []byte) ([]byte, error) {
n, err := svc.checksum.Write(txHash)
if err != nil {
return nil, err
}
if l := len(txHash); l != n {
return nil, fmt.Errorf("update checksum wrote %d bytes but want %d bytes", n, l)
}
s := svc.checksum.Sum(nil)
svc.checksum.Reset()
n, err = svc.checksum.Write(s)
if err != nil {
return nil, err
}
if l := len(s); l != n {
return nil, fmt.Errorf("swap checksum wrote %d bytes but want %d bytes", n, l)
}
b := hex.EncodeToString(s)
return s, svc.stateStore.Put(checksumDBKey, b)
}
......@@ -7,6 +7,7 @@ package batchservice_test
import (
"bytes"
"errors"
"hash"
"io/ioutil"
"math/big"
"math/rand"
......@@ -24,6 +25,7 @@ import (
var (
testLog = logging.New(ioutil.Discard, 0)
errTest = errors.New("fails")
testTxHash = make([]byte, 32)
)
type mockListener struct {
......@@ -51,6 +53,7 @@ func TestBatchServiceCreate(t *testing.T) {
testBatch := postagetesting.MustNewBatch()
testBatchListener := &mockBatchCreationHandler{}
svc, _, _ := newTestStoreAndServiceWithListener(
t,
testBatch.Owner,
testBatchListener,
mock.WithChainState(testChainState),
......@@ -64,6 +67,7 @@ func TestBatchServiceCreate(t *testing.T) {
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
testTxHash,
); err == nil {
t.Fatalf("expected error")
}
......@@ -105,6 +109,7 @@ func TestBatchServiceCreate(t *testing.T) {
testBatch := postagetesting.MustNewBatch()
testBatchListener := &mockBatchCreationHandler{}
svc, batchStore, _ := newTestStoreAndServiceWithListener(
t,
testBatch.Owner,
testBatchListener,
mock.WithChainState(testChainState),
......@@ -117,6 +122,7 @@ func TestBatchServiceCreate(t *testing.T) {
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
testTxHash,
); err != nil {
t.Fatalf("got error %v", err)
}
......@@ -135,6 +141,7 @@ func TestBatchServiceCreate(t *testing.T) {
rand.Read(owner)
svc, batchStore, _ := newTestStoreAndServiceWithListener(
t,
owner,
testBatchListener,
mock.WithChainState(testChainState),
......@@ -147,6 +154,7 @@ func TestBatchServiceCreate(t *testing.T) {
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
testTxHash,
); err != nil {
t.Fatalf("got error %v", err)
}
......@@ -164,32 +172,34 @@ func TestBatchServiceTopUp(t *testing.T) {
t.Run("expect get error", func(t *testing.T) {
svc, _, _ := newTestStoreAndService(
t,
mock.WithGetErr(errTest, 0),
)
if err := svc.TopUp(testBatch.ID, testNormalisedBalance); err == nil {
if err := svc.TopUp(testBatch.ID, testNormalisedBalance, testTxHash); err == nil {
t.Fatal("expected error")
}
})
t.Run("expect put error", func(t *testing.T) {
svc, batchStore, _ := newTestStoreAndService(
t,
mock.WithPutErr(errTest, 1),
)
putBatch(t, batchStore, testBatch)
if err := svc.TopUp(testBatch.ID, testNormalisedBalance); err == nil {
if err := svc.TopUp(testBatch.ID, testNormalisedBalance, testTxHash); err == nil {
t.Fatal("expected error")
}
})
t.Run("passes", func(t *testing.T) {
svc, batchStore, _ := newTestStoreAndService()
svc, batchStore, _ := newTestStoreAndService(t)
putBatch(t, batchStore, testBatch)
want := testNormalisedBalance
if err := svc.TopUp(testBatch.ID, testNormalisedBalance); err != nil {
if err := svc.TopUp(testBatch.ID, testNormalisedBalance, testTxHash); err != nil {
t.Fatalf("top up: %v", err)
}
......@@ -211,30 +221,32 @@ func TestBatchServiceUpdateDepth(t *testing.T) {
t.Run("expect get error", func(t *testing.T) {
svc, _, _ := newTestStoreAndService(
t,
mock.WithGetErr(errTest, 0),
)
if err := svc.UpdateDepth(testBatch.ID, testNewDepth, testNormalisedBalance); err == nil {
if err := svc.UpdateDepth(testBatch.ID, testNewDepth, testNormalisedBalance, testTxHash); err == nil {
t.Fatal("expected get error")
}
})
t.Run("expect put error", func(t *testing.T) {
svc, batchStore, _ := newTestStoreAndService(
t,
mock.WithPutErr(errTest, 1),
)
putBatch(t, batchStore, testBatch)
if err := svc.UpdateDepth(testBatch.ID, testNewDepth, testNormalisedBalance); err == nil {
if err := svc.UpdateDepth(testBatch.ID, testNewDepth, testNormalisedBalance, testTxHash); err == nil {
t.Fatal("expected put error")
}
})
t.Run("passes", func(t *testing.T) {
svc, batchStore, _ := newTestStoreAndService()
svc, batchStore, _ := newTestStoreAndService(t)
putBatch(t, batchStore, testBatch)
if err := svc.UpdateDepth(testBatch.ID, testNewDepth, testNormalisedBalance); err != nil {
if err := svc.UpdateDepth(testBatch.ID, testNewDepth, testNormalisedBalance, testTxHash); err != nil {
t.Fatalf("update depth: %v", err)
}
......@@ -256,22 +268,24 @@ func TestBatchServiceUpdatePrice(t *testing.T) {
t.Run("expect put error", func(t *testing.T) {
svc, batchStore, _ := newTestStoreAndService(
t,
mock.WithChainState(testChainState),
mock.WithPutErr(errTest, 1),
)
putChainState(t, batchStore, testChainState)
if err := svc.UpdatePrice(testNewPrice); err == nil {
if err := svc.UpdatePrice(testNewPrice, testTxHash); err == nil {
t.Fatal("expected error")
}
})
t.Run("passes", func(t *testing.T) {
svc, batchStore, _ := newTestStoreAndService(
t,
mock.WithChainState(testChainState),
)
if err := svc.UpdatePrice(testNewPrice); err != nil {
if err := svc.UpdatePrice(testNewPrice, testTxHash); err != nil {
t.Fatalf("update price: %v", err)
}
......@@ -288,6 +302,7 @@ func TestBatchServiceUpdateBlockNumber(t *testing.T) {
TotalAmount: big.NewInt(100),
}
svc, batchStore, _ := newTestStoreAndService(
t,
mock.WithChainState(testChainState),
)
......@@ -305,7 +320,7 @@ func TestBatchServiceUpdateBlockNumber(t *testing.T) {
}
func TestTransactionOk(t *testing.T) {
svc, store, s := newTestStoreAndService()
svc, store, s := newTestStoreAndService(t)
if _, err := svc.Start(10); err != nil {
t.Fatal(err)
}
......@@ -318,7 +333,10 @@ func TestTransactionOk(t *testing.T) {
t.Fatal(err)
}
svc2 := batchservice.New(s, store, testLog, newMockListener(), nil, nil)
svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil)
if err != nil {
t.Fatal(err)
}
if _, err := svc2.Start(10); err != nil {
t.Fatal(err)
}
......@@ -329,7 +347,7 @@ func TestTransactionOk(t *testing.T) {
}
func TestTransactionFail(t *testing.T) {
svc, store, s := newTestStoreAndService()
svc, store, s := newTestStoreAndService(t)
if _, err := svc.Start(10); err != nil {
t.Fatal(err)
}
......@@ -338,7 +356,10 @@ func TestTransactionFail(t *testing.T) {
t.Fatal(err)
}
svc2 := batchservice.New(s, store, testLog, newMockListener(), nil, nil)
svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil)
if err != nil {
t.Fatal(err)
}
if _, err := svc2.Start(10); err != nil {
t.Fatal(err)
}
......@@ -347,19 +368,47 @@ func TestTransactionFail(t *testing.T) {
t.Fatalf("expect %d reset calls got %d", 1, c)
}
}
func TestChecksum(t *testing.T) {
s := mocks.NewStateStore()
store := mock.New()
mockHash := &hs{}
svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash })
if err != nil {
t.Fatal(err)
}
testNormalisedBalance := big.NewInt(2000000000000)
testBatch := postagetesting.MustNewBatch()
putBatch(t, store, testBatch)
if err := svc.TopUp(testBatch.ID, testNormalisedBalance, testTxHash); err != nil {
t.Fatalf("top up: %v", err)
}
if m := mockHash.ctr; m != 2 {
t.Fatalf("expected %d calls got %d", 2, m)
}
}
func newTestStoreAndServiceWithListener(
t *testing.T,
owner []byte,
batchListener postage.BatchCreationListener,
opts ...mock.Option,
) (postage.EventUpdater, *mock.BatchStore, storage.StateStorer) {
t.Helper()
s := mocks.NewStateStore()
store := mock.New(opts...)
svc := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener)
svc, err := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener, nil)
if err != nil {
t.Fatal(err)
}
return svc, store, s
}
func newTestStoreAndService(opts ...mock.Option) (postage.EventUpdater, *mock.BatchStore, storage.StateStorer) {
return newTestStoreAndServiceWithListener(nil, nil, opts...)
func newTestStoreAndService(t *testing.T, opts ...mock.Option) (postage.EventUpdater, *mock.BatchStore, storage.StateStorer) {
t.Helper()
return newTestStoreAndServiceWithListener(t, nil, nil, opts...)
}
func putBatch(t *testing.T, store postage.Storer, b *postage.Batch) {
......@@ -377,3 +426,11 @@ func putChainState(t *testing.T, store postage.Storer, cs *postage.ChainState) {
t.Fatalf("store put chain state: %v", err)
}
}
type hs struct{ ctr uint8 }
func (h *hs) Write(p []byte) (n int, err error) { h.ctr++; return len(p), nil }
func (h *hs) Sum(b []byte) []byte { return []byte{h.ctr} }
func (h *hs) Reset() {}
func (h *hs) Size() int { panic("not implemented") }
func (h *hs) BlockSize() int { panic("not implemented") }
......@@ -12,10 +12,10 @@ import (
// EventUpdater interface definitions reflect the updates triggered by events
// emitted by the postage contract on the blockchain.
type EventUpdater interface {
Create(id []byte, owner []byte, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool) error
TopUp(id []byte, normalisedBalance *big.Int) error
UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error
UpdatePrice(price *big.Int) error
Create(id []byte, owner []byte, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash []byte) error
TopUp(id []byte, normalisedBalance *big.Int, txHash []byte) error
UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int, txHash []byte) error
UpdatePrice(price *big.Int, txHash []byte) error
UpdateBlockNumber(blockNumber uint64) error
Start(startBlock uint64) (<-chan struct{}, error)
......
......@@ -118,6 +118,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
c.Depth,
c.BucketDepth,
c.ImmutableFlag,
e.TxHash.Bytes(),
)
case batchTopupTopic:
c := &batchTopUpEvent{}
......@@ -129,6 +130,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
return updater.TopUp(
c.BatchId[:],
c.NormalisedBalance,
e.TxHash.Bytes(),
)
case batchDepthIncreaseTopic:
c := &batchDepthIncreaseEvent{}
......@@ -141,6 +143,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
c.BatchId[:],
c.NewDepth,
c.NormalisedBalance,
e.TxHash.Bytes(),
)
case priceUpdateTopic:
c := &priceUpdateEvent{}
......@@ -151,6 +154,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
l.metrics.PriceCounter.Inc()
return updater.UpdatePrice(
c.Price,
e.TxHash.Bytes(),
)
default:
l.metrics.EventErrors.Inc()
......
......@@ -307,7 +307,7 @@ type updater struct {
eventC chan interface{}
}
func (u *updater) Create(id, owner []byte, normalisedAmount *big.Int, depth, bucketDepth uint8, immutable bool) error {
func (u *updater) Create(id, owner []byte, normalisedAmount *big.Int, depth, bucketDepth uint8, immutable bool, _ []byte) error {
u.eventC <- createArgs{
id: id,
owner: owner,
......@@ -319,7 +319,7 @@ func (u *updater) Create(id, owner []byte, normalisedAmount *big.Int, depth, buc
return nil
}
func (u *updater) TopUp(id []byte, normalisedBalance *big.Int) error {
func (u *updater) TopUp(id []byte, normalisedBalance *big.Int, _ []byte) error {
u.eventC <- topupArgs{
id: id,
normalisedBalance: normalisedBalance,
......@@ -327,7 +327,7 @@ func (u *updater) TopUp(id []byte, normalisedBalance *big.Int) error {
return nil
}
func (u *updater) UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error {
func (u *updater) UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int, _ []byte) error {
u.eventC <- depthArgs{
id: id,
depth: depth,
......@@ -336,7 +336,7 @@ func (u *updater) UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int
return nil
}
func (u *updater) UpdatePrice(price *big.Int) error {
func (u *updater) UpdatePrice(price *big.Int, _ []byte) error {
u.eventC <- priceArgs{price}
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment