Commit 3709726a authored by Viktor Trón's avatar Viktor Trón Committed by GitHub

feat: postage stamp indexing (#1625)

This PR adds postage stamp indexing to enforce overissuance 
on the storer node side. This allows for slot overriding and reusage.
It also adds immutable stamps functionality. 
Co-authored-by: default avatarEsad Akar <esadakar@gmail.com>
parent 6d2b2709
166c166
168c168
< chainUpdateInterval := (time.Duration(l.blockTime) * time.Second) / 2
---
> chainUpdateInterval := (time.Duration(l.blockTime) * time.Second) / 5
25c25
26c26
< BucketDepth = uint8(16)
---
> BucketDepth = uint8(10)
......@@ -13,7 +13,7 @@ jobs:
env:
REPLICA: 3
RUN_TYPE: "PR RUN"
SETUP_CONTRACT_IMAGE_TAG: "0.1.0"
SETUP_CONTRACT_IMAGE_TAG: "0.2.0"
runs-on: ubuntu-latest
steps:
- name: Checkout
......
......@@ -8,7 +8,7 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/ethereum/go-ethereum v1.10.3
github.com/ethersphere/go-storage-incentives-abi v0.2.0
github.com/ethersphere/go-storage-incentives-abi v0.3.0
github.com/ethersphere/go-sw3-abi v0.4.0
github.com/ethersphere/langos v1.0.0
github.com/gogo/protobuf v1.3.1
......
......@@ -176,8 +176,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ethereum/go-ethereum v1.10.3 h1:SEYOYARvbWnoDl1hOSks3ZJQpRiiRJe8ubaQGJQwq0s=
github.com/ethereum/go-ethereum v1.10.3/go.mod h1:99onQmSd1GRGOziyGldI41YQb7EESX3Q4H41IfJgIQQ=
github.com/ethersphere/go-storage-incentives-abi v0.2.0 h1:TZ15auzGsdzuzUR2b5dLAMpFixorb4uKUDGF0QnVmmU=
github.com/ethersphere/go-storage-incentives-abi v0.2.0/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc=
github.com/ethersphere/go-storage-incentives-abi v0.3.0 h1:Y1OyNMI1JjqOmVJlgzR70PPe2Czuh4BglCV/nD3UHIA=
github.com/ethersphere/go-storage-incentives-abi v0.3.0/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc=
github.com/ethersphere/go-sw3-abi v0.4.0 h1:T3ANY+ktWrPAwe2U0tZi+DILpkHzto5ym/XwV/Bbz8g=
github.com/ethersphere/go-sw3-abi v0.4.0/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU=
github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc=
......
......@@ -20,6 +20,7 @@ import (
const (
gasPriceHeader = "Gas-Price"
immutableHeader = "Immutable"
errBadGasPrice = "bad gas price"
)
......@@ -65,7 +66,12 @@ func (s *server) postageCreateHandler(w http.ResponseWriter, r *http.Request) {
ctx = sctx.SetGasPrice(ctx, p)
}
batchID, err := s.postageContract.CreateBatch(ctx, amount, uint8(depth), label)
var immutable bool
if val, ok := r.Header[immutableHeader]; ok {
immutable, _ = strconv.ParseBool(val[0])
}
batchID, err := s.postageContract.CreateBatch(ctx, amount, uint8(depth), immutable, label)
if err != nil {
if errors.Is(err, postagecontract.ErrInsufficientFunds) {
s.logger.Debugf("create batch: out of funds: %v", err)
......
......@@ -34,7 +34,7 @@ func TestPostageCreateStamp(t *testing.T) {
t.Run("ok", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
if ib.Cmp(big.NewInt(initialBalance)) != 0 {
return nil, fmt.Errorf("called with wrong initial balance. wanted %d, got %d", initialBalance, ib)
}
......@@ -60,7 +60,7 @@ func TestPostageCreateStamp(t *testing.T) {
t.Run("with-custom-gas", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
if ib.Cmp(big.NewInt(initialBalance)) != 0 {
return nil, fmt.Errorf("called with wrong initial balance. wanted %d, got %d", initialBalance, ib)
}
......@@ -90,7 +90,7 @@ func TestPostageCreateStamp(t *testing.T) {
t.Run("with-error", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
return nil, errors.New("err")
}),
)
......@@ -108,7 +108,7 @@ func TestPostageCreateStamp(t *testing.T) {
t.Run("out-of-funds", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
return nil, postagecontract.ErrInsufficientFunds
}),
)
......@@ -137,7 +137,7 @@ func TestPostageCreateStamp(t *testing.T) {
t.Run("depth less than bucket depth", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
return nil, postagecontract.ErrInvalidDepth
}),
)
......@@ -163,6 +163,32 @@ func TestPostageCreateStamp(t *testing.T) {
}),
)
})
t.Run("immutable header", func(t *testing.T) {
var immutable bool
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, _ *big.Int, _ uint8, i bool, _ string) ([]byte, error) {
immutable = i
return batchID, nil
}),
)
client, _, _ := newTestServer(t, testServerOptions{
PostageContract: contract,
})
jsonhttptest.Request(t, client, http.MethodPost, "/stamps/1000/24", http.StatusCreated,
jsonhttptest.WithRequestHeader("Immutable", "true"),
jsonhttptest.WithExpectedJSONResponse(&api.PostageCreateResponse{
BatchID: batchID,
}),
)
if !immutable {
t.Fatalf("want true, got %v", immutable)
}
})
}
func TestPostageGetStamps(t *testing.T) {
......
......@@ -36,7 +36,7 @@ const (
// about exported data format version
exportVersionFilename = ".swarm-export-version"
// current export format version
currentExportVersion = "2"
currentExportVersion = "3"
)
// Export writes a tar structured data to the writer of
......@@ -71,6 +71,12 @@ func (db *DB) Export(w io.Writer) (count int64, err error) {
if _, err := tw.Write(item.BatchID); err != nil {
return false, err
}
if _, err := tw.Write(item.Index); err != nil {
return false, err
}
if _, err := tw.Write(item.Timestamp); err != nil {
return false, err
}
if _, err := tw.Write(item.Sig); err != nil {
return false, err
}
......
......@@ -81,7 +81,7 @@ func TestExportImport(t *testing.T) {
}
got := append(stamp, ch.Data()...)
if !bytes.Equal(got, want) {
t.Fatalf("chunk %s: got stamp+data %x, want %x", addr, got, want)
t.Fatalf("chunk %s: got stamp+data %x, want %x", addr, got[:256], want[:256])
}
}
}
......@@ -181,6 +181,11 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return 0, false, err
}
err = db.postageIndexIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, false, err
}
}
if gcSize-collectedCount > target {
done = false
......
......@@ -121,6 +121,8 @@ func testDBCollectGarbageWorker(t *testing.T) {
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc size", newIndexGCSizeTest(db))
......@@ -231,6 +233,8 @@ func TestPinGC(t *testing.T) {
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)+pinChunksCount))
t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)+pinChunksCount))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc size", newIndexGCSizeTest(db))
......@@ -310,6 +314,8 @@ func TestGCAfterPin(t *testing.T) {
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(0)))
t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, chunkCount))
for _, hash := range pinAddrs {
_, err := db.Get(context.Background(), storage.ModeGetRequest, hash)
if err != nil {
......@@ -436,6 +442,8 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc size", newIndexGCSizeTest(db))
......@@ -898,6 +906,8 @@ func TestGC_NoEvictDirty(t *testing.T) {
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc size", newIndexGCSizeTest(db))
......
......@@ -93,9 +93,12 @@ type DB struct {
// postage chunks index
postageChunksIndex shed.Index
// postage chunks index
// postage radius index
postageRadiusIndex shed.Index
// postage index index
postageIndexIndex shed.Index
// field that stores number of intems in gc index
gcSize shed.Uint64Field
......@@ -258,7 +261,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
// Index storing actual chunk address, data and bin id.
headerSize := 16 + postage.StampSize
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|Sig|Data", shed.IndexFuncs{
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
......@@ -270,7 +273,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
b := make([]byte, headerSize)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
stamp, err := postage.NewStamp(fields.BatchID, fields.Sig).MarshalBinary()
stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary()
if err != nil {
return nil, err
}
......@@ -286,6 +289,8 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return e, err
}
e.BatchID = stamp.BatchID()
e.Index = stamp.Index()
e.Timestamp = stamp.Timestamp()
e.Sig = stamp.Sig()
e.Data = value[headerSize:]
return e, nil
......@@ -382,7 +387,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan<- struct{}, 0)
// gc index for removable chunk ordered by ascending last access time
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->BatchID", shed.IndexFuncs{
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->BatchID|BatchIndex", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
......@@ -397,14 +402,16 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
value = make([]byte, 32)
value = make([]byte, 40)
copy(value, fields.BatchID)
copy(value[32:], fields.Index)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e = keyItem
e.BatchID = make([]byte, 32)
copy(e.BatchID, value)
copy(e.BatchID, value[:32])
e.Index = make([]byte, postage.IndexSize)
copy(e.Index, value[32:])
return e, nil
},
})
......@@ -481,6 +488,34 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return nil, err
}
db.postageIndexIndex, err = db.shed.NewIndex("BatchID|BatchIndex->Hash|Timestamp", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
copy(key[:32], fields.BatchID)
copy(key[32:40], fields.Index)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.BatchID = key[:32]
e.Index = key[32:40]
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
value = make([]byte, 40)
copy(value, fields.Address)
copy(value[32:], fields.Timestamp)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.Address = value[:32]
e.Timestamp = value[32:]
return e, nil
},
})
if err != nil {
return nil, err
}
// start garbage collection worker
go db.collectGarbageWorker()
return db, nil
......@@ -557,9 +592,13 @@ func chunkToItem(ch swarm.Chunk) shed.Item {
Data: ch.Data(),
Tag: ch.TagID(),
BatchID: ch.Stamp().BatchID(),
Index: ch.Stamp().Index(),
Timestamp: ch.Stamp().Timestamp(),
Sig: ch.Stamp().Sig(),
Depth: ch.Depth(),
Radius: ch.Radius(),
BucketDepth: ch.BucketDepth(),
Immutable: ch.Immutable(),
}
}
......
......@@ -299,7 +299,7 @@ func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) fun
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, postage.NewStamp(ch.Stamp().BatchID(), nil))
validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, postage.NewStamp(ch.Stamp().BatchID(), nil, nil, nil))
}
}
}
......@@ -318,7 +318,7 @@ func newPushIndexTest(db *DB, ch swarm.Chunk, storeTimestamp int64, wantError er
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0, postage.NewStamp(nil, nil))
validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0, postage.NewStamp(nil, nil, nil, nil))
}
}
}
......@@ -356,7 +356,7 @@ func newPinIndexTest(db *DB, chunk swarm.Chunk, wantError error) func(t *testing
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, chunk.Address().Bytes(), nil, 0, 0, postage.NewStamp(nil, nil))
validateItem(t, item, chunk.Address().Bytes(), nil, 0, 0, postage.NewStamp(nil, nil, nil, nil))
}
}
}
......
......@@ -51,7 +51,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)
return nil, err
}
return swarm.NewChunk(swarm.NewAddress(out.Address), out.Data).
WithStamp(postage.NewStamp(out.BatchID, out.Sig)), nil
WithStamp(postage.NewStamp(out.BatchID, out.Index, out.Timestamp, out.Sig)), nil
}
// get returns Item from the retrieval index
......
......@@ -52,7 +52,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm
chunks = make([]swarm.Chunk, len(out))
for i, ch := range out {
chunks[i] = swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).
WithStamp(postage.NewStamp(ch.BatchID, ch.Sig))
WithStamp(postage.NewStamp(ch.BatchID, ch.Index, ch.Timestamp, ch.Sig))
}
return chunks, nil
}
......
......@@ -105,7 +105,7 @@ func TestModeGetRequest(t *testing.T) {
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1, nil, postage.NewStamp(ch.Stamp().BatchID(), nil)))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1, nil, postage.NewStamp(ch.Stamp().BatchID(), nil, nil, nil)))
t.Run("access count", newItemsCountTest(db.retrievalAccessIndex, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
......@@ -136,7 +136,7 @@ func TestModeGetRequest(t *testing.T) {
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, accessTimestamp))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, accessTimestamp, 1, nil, postage.NewStamp(ch.Stamp().BatchID(), nil)))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, accessTimestamp, 1, nil, postage.NewStamp(ch.Stamp().BatchID(), nil, nil, nil)))
t.Run("access count", newItemsCountTest(db.retrievalAccessIndex, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
......@@ -162,7 +162,7 @@ func TestModeGetRequest(t *testing.T) {
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1, nil, postage.NewStamp(ch.Stamp().BatchID(), nil)))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1, nil, postage.NewStamp(ch.Stamp().BatchID(), nil, nil, nil)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
......
......@@ -18,6 +18,7 @@ package localstore
import (
"context"
"encoding/binary"
"errors"
"time"
......@@ -27,6 +28,10 @@ import (
"github.com/syndtr/goleveldb/leveldb"
)
var (
ErrOverwrite = errors.New("index already exists - double issuance on immutable batch")
)
// Put stores Chunks to database and depending
// on the Putter mode, it updates required indexes.
// Put is required to implement storage.Store
......@@ -196,6 +201,26 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
return true, 0, nil
}
previous, err := db.postageIndexIndex.Get(item)
if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) {
return false, 0, err
}
} else {
if item.Immutable {
return false, 0, ErrOverwrite
}
// if a chunk is found with the same postage stamp index,
// replace it with the new one only if timestamp is later
if !later(previous, item) {
return false, 0, nil
}
gcSizeChange, err = db.setRemove(batch, previous, true)
if err != nil {
return false, 0, err
}
}
item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address)))
if err != nil {
......@@ -209,14 +234,17 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
if err != nil {
return false, 0, err
}
err = db.postageIndexIndex.PutInBatch(batch, item)
if err != nil {
return false, 0, err
}
item.AccessTimestamp = now()
err = db.retrievalAccessIndex.PutInBatch(batch, item)
if err != nil {
return false, 0, err
}
gcSizeChange, err = db.preserveOrCache(batch, item, forcePin, forceCache)
gcSizeChangeNew, err := db.preserveOrCache(batch, item, forcePin, forceCache)
if err != nil {
return false, 0, err
}
......@@ -230,7 +258,7 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
}
}
return false, gcSizeChange, nil
return false, gcSizeChange + gcSizeChangeNew, nil
}
// putUpload adds an Item to the batch by updating required indexes:
......@@ -246,6 +274,26 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
return true, 0, nil
}
previous, err := db.postageIndexIndex.Get(item)
if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) {
return false, 0, err
}
} else {
if item.Immutable {
return false, 0, ErrOverwrite
}
// if a chunk is found with the same postage stamp index,
// replace it with the new one only if timestamp is later
if !later(previous, item) {
return false, 0, nil
}
_, err = db.setRemove(batch, previous, true)
if err != nil {
return false, 0, err
}
}
item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address)))
if err != nil {
......@@ -263,7 +311,10 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
if err != nil {
return false, 0, err
}
err = db.postageIndexIndex.PutInBatch(batch, item)
if err != nil {
return false, 0, err
}
err = db.postageChunksIndex.PutInBatch(batch, item)
if err != nil {
return false, 0, err
......@@ -284,6 +335,26 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
return true, 0, nil
}
previous, err := db.postageIndexIndex.Get(item)
if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) {
return false, 0, err
}
} else {
if item.Immutable {
return false, 0, ErrOverwrite
}
// if a chunk is found with the same postage stamp index,
// replace it with the new one only if timestamp is later
if !later(previous, item) {
return false, 0, nil
}
_, err = db.setRemove(batch, previous, true)
if err != nil {
return false, 0, err
}
}
item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address)))
if err != nil {
......@@ -301,19 +372,22 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
if err != nil {
return false, 0, err
}
err = db.postageIndexIndex.PutInBatch(batch, item)
if err != nil {
return false, 0, err
}
item.AccessTimestamp = now()
err = db.retrievalAccessIndex.PutInBatch(batch, item)
if err != nil {
return false, 0, err
}
gcSizeChange, err = db.preserveOrCache(batch, item, false, false)
gcSizeChangeNew, err := db.preserveOrCache(batch, item, false, false)
if err != nil {
return false, 0, err
}
return false, gcSizeChange, nil
return false, gcSizeChange + gcSizeChangeNew, nil
}
// preserveOrCache is a helper function used to add chunks to either a pinned reserve or gc cache
......@@ -327,7 +401,6 @@ func (db *DB) preserveOrCache(batch *leveldb.Batch, item shed.Item, forcePin, fo
} else {
item.Radius = item2.Radius
}
if !forceCache && (withinRadiusFn(db, item) || forcePin) {
return db.setPin(batch, item)
}
......@@ -380,3 +453,9 @@ func containsChunk(addr swarm.Address, chs ...swarm.Chunk) bool {
}
return false
}
func later(previous, current shed.Item) bool {
pts := binary.BigEndian.Uint64(previous.Timestamp)
cts := binary.BigEndian.Uint64(current.Timestamp)
return cts > pts
}
......@@ -19,17 +19,30 @@ package localstore
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/ethersphere/bee/pkg/postage"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)
var putModes = []storage.ModePut{
storage.ModePutRequest,
storage.ModePutRequestPin,
storage.ModePutSync,
storage.ModePutUpload,
storage.ModePutUploadPin,
storage.ModePutRequestCache,
}
// TestModePutRequest validates ModePutRequest index values on the provided DB.
func TestModePutRequest(t *testing.T) {
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return false }))
......@@ -65,6 +78,7 @@ func TestModePutRequest(t *testing.T) {
newItemsCountTest(db.gcIndex, tc.count)(t)
newItemsCountTest(db.pullIndex, tc.count)(t)
newItemsCountTest(db.postageIndexIndex, tc.count)(t)
newIndexGCSizeTest(db)(t)
})
......@@ -85,6 +99,7 @@ func TestModePutRequest(t *testing.T) {
newItemsCountTest(db.gcIndex, tc.count)(t)
newItemsCountTest(db.pullIndex, tc.count)(t)
newItemsCountTest(db.postageIndexIndex, tc.count)(t)
newIndexGCSizeTest(db)(t)
})
})
......@@ -118,8 +133,11 @@ func TestModePutRequestPin(t *testing.T) {
newPinIndexTest(db, ch, nil)(t)
}
newItemsCountTest(db.postageChunksIndex, tc.count)(t)
newItemsCountTest(db.postageIndexIndex, tc.count)(t)
// gc index should be always 0 since we're pinning
newItemsCountTest(db.gcIndex, 0)(t)
newIndexGCSizeTest(db)(t)
})
}
}
......@@ -158,7 +176,10 @@ func TestModePutRequestCache(t *testing.T) {
newPinIndexTest(db, ch, leveldb.ErrNotFound)(t)
}
newItemsCountTest(db.postageChunksIndex, tc.count)(t)
newItemsCountTest(db.postageIndexIndex, tc.count)(t)
newItemsCountTest(db.gcIndex, tc.count)(t)
newIndexGCSizeTest(db)(t)
})
}
}
......@@ -195,9 +216,10 @@ func TestModePutSync(t *testing.T) {
newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)(t)
newPullIndexTest(db, ch, binIDs[po], nil)(t)
newPinIndexTest(db, ch, leveldb.ErrNotFound)(t)
newItemsCountTest(db.gcIndex, tc.count)(t)
newIndexGCSizeTest(db)(t)
}
newItemsCountTest(db.postageChunksIndex, tc.count)(t)
newItemsCountTest(db.postageIndexIndex, tc.count)(t)
newItemsCountTest(db.gcIndex, tc.count)(t)
newIndexGCSizeTest(db)(t)
})
......@@ -237,6 +259,7 @@ func TestModePutUpload(t *testing.T) {
newPushIndexTest(db, ch, wantTimestamp, nil)(t)
newPinIndexTest(db, ch, leveldb.ErrNotFound)(t)
}
newItemsCountTest(db.postageIndexIndex, tc.count)(t)
})
}
}
......@@ -274,6 +297,7 @@ func TestModePutUploadPin(t *testing.T) {
newPushIndexTest(db, ch, wantTimestamp, nil)(t)
newPinIndexTest(db, ch, nil)(t)
}
newItemsCountTest(db.postageIndexIndex, tc.count)(t)
})
}
}
......@@ -467,6 +491,147 @@ func TestModePut_sameChunk(t *testing.T) {
}
}
// TestModePut_sameChunk puts the same chunk multiple times
// and validates that all relevant indexes have only one item
// in them.
func TestModePut_SameStamp(t *testing.T) {
ctx := context.Background()
stamp := postagetesting.MustNewStamp()
ts := time.Now().Unix()
for _, modeTc1 := range putModes {
for _, modeTc2 := range putModes {
for _, tc := range []struct {
persistChunk swarm.Chunk
discardChunk swarm.Chunk
}{
{
persistChunk: generateChunkWithTimestamp(stamp, ts),
discardChunk: generateChunkWithTimestamp(stamp, ts),
},
{
persistChunk: generateChunkWithTimestamp(stamp, ts+1),
discardChunk: generateChunkWithTimestamp(stamp, ts),
},
{
persistChunk: generateChunkWithTimestamp(stamp, ts),
discardChunk: generateChunkWithTimestamp(stamp, ts-1),
},
} {
t.Run(modeTc1.String()+modeTc2.String(), func(t *testing.T) {
db := newTestDB(t, nil)
unreserveChunkBatch(t, db, 0, tc.persistChunk, tc.discardChunk)
_, err := db.Put(ctx, modeTc1, tc.persistChunk)
if err != nil {
t.Fatal(err)
}
_, err = db.Put(ctx, modeTc2, tc.discardChunk)
if err != nil {
t.Fatal(err)
}
newItemsCountTest(db.retrievalDataIndex, 1)(t)
newItemsCountTest(db.postageChunksIndex, 1)(t)
newItemsCountTest(db.postageRadiusIndex, 1)(t)
newItemsCountTest(db.postageIndexIndex, 1)(t)
if modeTc1 != storage.ModePutRequestCache {
newItemsCountTest(db.pullIndex, 1)(t)
}
_, err = db.Get(ctx, storage.ModeGetLookup, tc.persistChunk.Address())
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
_, err = db.Get(ctx, storage.ModeGetLookup, tc.discardChunk.Address())
if !errors.Is(err, storage.ErrNotFound) {
t.Fatalf("expected %v, got %v", storage.ErrNotFound, err)
}
})
}
}
}
}
func TestModePut_ImmutableStamp(t *testing.T) {
ctx := context.Background()
stamp := postagetesting.MustNewStamp()
ts := time.Now().Unix()
for _, modeTc1 := range putModes {
for _, modeTc2 := range putModes {
for _, tc := range []struct {
name string
persistChunk swarm.Chunk
discardChunk swarm.Chunk
}{
{
name: "same timestamps",
persistChunk: generateImmutableChunkWithTimestamp(stamp, ts),
discardChunk: generateImmutableChunkWithTimestamp(stamp, ts),
},
{
name: "higher timestamp",
persistChunk: generateImmutableChunkWithTimestamp(stamp, ts),
discardChunk: generateImmutableChunkWithTimestamp(stamp, ts+1),
},
{
name: "higher timestamp first",
persistChunk: generateImmutableChunkWithTimestamp(stamp, ts+1),
discardChunk: generateImmutableChunkWithTimestamp(stamp, ts),
},
} {
testName := fmt.Sprintf("%s %s %s", modeTc1.String(), modeTc2.String(), tc.name)
t.Run(testName, func(t *testing.T) {
db := newTestDB(t, nil)
unreserveChunkBatch(t, db, 0, tc.persistChunk, tc.discardChunk)
_, err := db.Put(ctx, modeTc1, tc.persistChunk)
if err != nil {
t.Fatal(err)
}
_, err = db.Put(ctx, modeTc2, tc.discardChunk)
if !errors.Is(err, ErrOverwrite) {
t.Fatalf("expected overwrite error on immutable stamp got %v", err)
}
newItemsCountTest(db.retrievalDataIndex, 1)(t)
newItemsCountTest(db.postageChunksIndex, 1)(t)
newItemsCountTest(db.postageRadiusIndex, 1)(t)
newItemsCountTest(db.postageIndexIndex, 1)(t)
if modeTc1 != storage.ModePutRequestCache {
newItemsCountTest(db.pullIndex, 1)(t)
}
_, err = db.Get(ctx, storage.ModeGetLookup, tc.persistChunk.Address())
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
_, err = db.Get(ctx, storage.ModeGetLookup, tc.discardChunk.Address())
if !errors.Is(err, storage.ErrNotFound) {
t.Fatalf("expected %v, got %v", storage.ErrNotFound, err)
}
})
}
}
}
}
func generateChunkWithTimestamp(stamp *postage.Stamp, timestamp int64) swarm.Chunk {
tsBuf := make([]byte, 8)
binary.BigEndian.PutUint64(tsBuf, uint64(timestamp))
chunk := generateTestRandomChunk()
return chunk.WithStamp(postage.NewStamp(stamp.BatchID(), stamp.Index(), tsBuf, stamp.Sig()))
}
func generateImmutableChunkWithTimestamp(stamp *postage.Stamp, timestamp int64) swarm.Chunk {
return generateChunkWithTimestamp(stamp, timestamp).WithBatch(4, 12, 8, true)
}
// TestPutDuplicateChunks validates the expected behaviour for
// passing duplicate chunks to the Put method.
func TestPutDuplicateChunks(t *testing.T) {
......
......@@ -147,6 +147,8 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange in
item.StoreTimestamp = i.StoreTimestamp
item.BinID = i.BinID
item.BatchID = i.BatchID
item.Index = i.Index
item.Timestamp = i.Timestamp
i, err = db.pushIndex.Get(item)
if err != nil {
......@@ -240,6 +242,7 @@ func (db *DB) setRemove(batch *leveldb.Batch, item shed.Item, check bool) (gcSiz
if err != nil {
return 0, err
}
// unless called by GC which iterates through the gcIndex
// a check is needed for decrementing gcSize
// as delete is not reporting if the key/value pair is deleted or not
......
......@@ -18,10 +18,10 @@ package localstore
import (
"context"
"errors"
"testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)
......@@ -44,28 +44,63 @@ func TestModeSetRemove(t *testing.T) {
}
t.Run("retrieve indexes", func(t *testing.T) {
t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0))
t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0))
})
for _, ch := range chunks {
wantErr := leveldb.ErrNotFound
_, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if !errors.Is(err, wantErr) {
t.Errorf("got error %v, want %v", err, wantErr)
newPullIndexTest(db, ch, 0, leveldb.ErrNotFound)(t)
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
})
}
}
// TestModeSetRemove_WithSync validates ModeSetRemove index values on the provided DB
// with the syncing flow for a reserved chunk that has been marked for removal.
func TestModeSetRemove_WithSync(t *testing.T) {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
db := newTestDB(t, nil)
var chs []swarm.Chunk
for i := 0; i < tc.count; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
}
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
// access index should not be set
_, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if !errors.Is(err, wantErr) {
t.Errorf("got error %v, want %v", err, wantErr)
chs = append(chs, ch)
}
err := db.Set(context.Background(), storage.ModeSetRemove, chunkAddresses(chs)...)
if err != nil {
t.Fatal(err)
}
t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0))
t.Run("retrieve indexes", func(t *testing.T) {
t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0))
t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0))
})
for _, ch := range chunks {
newPullIndexTest(db, ch, 0, leveldb.ErrNotFound)(t)
}
t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, 0))
t.Run("postage index index count", newItemsCountTest(db.postageIndexIndex, tc.count))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))
......
......@@ -39,7 +39,7 @@ func TestDB_ReserveGC_AllOutOfRadius(t *testing.T) {
addrs := make([]swarm.Address, 0)
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(3, 3)
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(3, 3, 2, false)
err := db.UnreserveBatch(ch.Stamp().BatchID(), 4)
if err != nil {
t.Fatal(err)
......@@ -133,7 +133,7 @@ func TestDB_ReserveGC_AllWithinRadius(t *testing.T) {
addrs := make([]swarm.Address, 0)
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3)
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
......@@ -199,7 +199,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
// put the first chunkCount chunks within radius
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3)
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
......@@ -216,7 +216,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
var po4Chs []swarm.Chunk
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 4).WithBatch(2, 3)
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 4).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
......@@ -234,7 +234,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
var gcChs []swarm.Chunk
for i := 0; i < 100; i++ {
gcch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3)
gcch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(gcch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
......@@ -336,7 +336,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {
// put the first chunkCount chunks within radius
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3)
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
......@@ -353,7 +353,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {
var gcChs []swarm.Chunk
for i := 0; i < 100; i++ {
gcch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3)
gcch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(gcch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
......
......@@ -76,7 +76,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
return true, err
}
stamp := postage.NewStamp(dataItem.BatchID, dataItem.Sig)
stamp := postage.NewStamp(dataItem.BatchID, dataItem.Index, dataItem.Timestamp, dataItem.Sig)
select {
case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), dataItem.Data).WithTagID(item.Tag).WithStamp(stamp):
count++
......
......@@ -16,20 +16,26 @@ type Batch struct {
Start uint64 // block number the batch was created
Owner []byte // owner's ethereum address
Depth uint8 // batch depth, i.e., size = 2^{depth}
BucketDepth uint8 // the depth of neighbourhoods t
Immutable bool // if the batch allows adding new capacity (dilution)
Radius uint8 // reserve radius, non-serialised
}
// MarshalBinary implements BinaryMarshaller. It will attempt to serialize the
// postage batch to a byte slice.
// serialised as ID(32)|big endian value(32)|start block(8)|owner addr(20)|depth(1)
// serialised as ID(32)|big endian value(32)|start block(8)|owner addr(20)|bucketDepth(1)|depth(1)|immutable(1)
func (b *Batch) MarshalBinary() ([]byte, error) {
out := make([]byte, 93)
out := make([]byte, 95)
copy(out, b.ID)
value := b.Value.Bytes()
copy(out[64-len(value):], value)
binary.BigEndian.PutUint64(out[64:72], b.Start)
copy(out[72:], b.Owner)
out[92] = b.Depth
out[92] = b.BucketDepth
out[93] = b.Depth
if b.Immutable {
out[94] = 1
}
return out, nil
}
......@@ -40,6 +46,8 @@ func (b *Batch) UnmarshalBinary(buf []byte) error {
b.Value = big.NewInt(0).SetBytes(buf[32:64])
b.Start = binary.BigEndian.Uint64(buf[64:72])
b.Owner = buf[72:92]
b.Depth = buf[92]
b.BucketDepth = buf[92]
b.Depth = buf[93]
b.Immutable = buf[94] > 0
return nil
}
......@@ -20,8 +20,8 @@ func TestBatchMarshalling(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(buf) != 93 {
t.Fatalf("invalid length for serialised batch. expected 93, got %d", len(buf))
if len(buf) != 95 {
t.Fatalf("invalid length for serialised batch. expected 95, got %d", len(buf))
}
b := &postage.Batch{}
if err := b.UnmarshalBinary(buf); err != nil {
......@@ -42,4 +42,10 @@ func TestBatchMarshalling(t *testing.T) {
if a.Depth != b.Depth {
t.Fatalf("depth mismatch, expected %d, got %d", a.Depth, b.Depth)
}
if a.BucketDepth != b.BucketDepth {
t.Fatalf("bucket depth mismatch, expected %d, got %d", a.BucketDepth, b.BucketDepth)
}
if a.Immutable != b.Immutable {
t.Fatalf("depth mismatch, expected %v, got %v", a.Immutable, b.Immutable)
}
}
......@@ -35,13 +35,15 @@ func New(stateStore storage.StateStorer, storer postage.Storer, logger logging.L
// Create will create a new batch with the given ID, owner value and depth and
// stores it in the BatchStore.
func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth uint8) error {
func (svc *batchService) Create(id, owner []byte, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool) error {
b := &postage.Batch{
ID: id,
Owner: owner,
Value: big.NewInt(0),
Start: svc.storer.GetChainState().Block,
Depth: depth,
BucketDepth: bucketDepth,
Immutable: immutable,
}
err := svc.storer.Put(b, normalisedBalance, depth)
......
......@@ -50,6 +50,8 @@ func TestBatchServiceCreate(t *testing.T) {
testBatch.Owner,
testBatch.Value,
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
); err == nil {
t.Fatalf("expected error")
}
......@@ -65,6 +67,8 @@ func TestBatchServiceCreate(t *testing.T) {
testBatch.Owner,
testBatch.Value,
testBatch.Depth,
testBatch.BucketDepth,
testBatch.Immutable,
); err != nil {
t.Fatalf("got error %v", err)
}
......@@ -83,9 +87,15 @@ func TestBatchServiceCreate(t *testing.T) {
if got.Value.Cmp(testBatch.Value) != 0 {
t.Fatalf("batch value: want %v, got %v", testBatch.Value.String(), got.Value.String())
}
if got.BucketDepth != testBatch.BucketDepth {
t.Fatalf("bucket depth: want %v, got %v", got.BucketDepth, testBatch.BucketDepth)
}
if got.Depth != testBatch.Depth {
t.Fatalf("batch depth: want %v, got %v", got.Depth, testBatch.Depth)
}
if got.Immutable != testBatch.Immutable {
t.Fatalf("immutable: want %v, got %v", got.Immutable, testBatch.Immutable)
}
if got.Start != testChainState.Block {
t.Fatalf("batch start block different form chain state: want %v, got %v", got.Start, testChainState.Block)
}
......
......@@ -41,7 +41,7 @@ func New(opts ...Option) *BatchStore {
return bs
}
// WithChainState will set the initial chainstate in the ChainStore mock.
// WithReserveState will set the initial reservestate in the ChainStore mock.
func WithReserveState(rs *postage.ReserveState) Option {
return func(bs *BatchStore) {
bs.rs = rs
......@@ -75,6 +75,15 @@ func WithPutErr(err error, delayCnt int) Option {
}
}
// WithBatch will set batch to the one provided by user. This will be returned in
// the next Get
func WithBatch(b *postage.Batch) Option {
return func(bs *BatchStore) {
bs.batch = b
bs.id = b.ID
}
}
// Get mocks the Get method from the BatchStore
func (bs *BatchStore) Get(id []byte) (*postage.Batch, error) {
if bs.getErr != nil {
......
......@@ -65,3 +65,17 @@ func TestBatchStorePutChainState(t *testing.T) {
t.Fatal("expected error")
}
}
func TestBatchStoreWithBatch(t *testing.T) {
testBatch := postagetesting.MustNewBatch()
batchStore := mock.New(
mock.WithBatch(testBatch),
)
b, err := batchStore.Get(testBatch.ID)
if err != nil {
t.Fatal(err)
}
postagetesting.CompareBatches(t, testBatch, b)
}
......@@ -4,10 +4,7 @@
package postage
import (
"github.com/ethersphere/bee/pkg/swarm"
var (
IndexToBytes = indexToBytes
BytesToIndex = bytesToIndex
)
func (st *StampIssuer) Inc(a swarm.Address) error {
return st.inc(a)
}
......@@ -12,7 +12,7 @@ import (
// EventUpdater interface definitions reflect the updates triggered by events
// emitted by the postage contract on the blockchain.
type EventUpdater interface {
Create(id []byte, owner []byte, normalisedBalance *big.Int, depth uint8) error
Create(id []byte, owner []byte, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool) error
TopUp(id []byte, normalisedBalance *big.Int) error
UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int) error
UpdatePrice(price *big.Int) error
......
......@@ -31,7 +31,7 @@ const (
)
var (
postageStampABI = parseABI(postageabi.PostageStampABIv0_2_0)
postageStampABI = parseABI(postageabi.PostageStampABIv0_3_0)
// batchCreatedTopic is the postage contract's batch created event topic
batchCreatedTopic = postageStampABI.Events["BatchCreated"].ID
// batchTopupTopic is the postage contract's batch topup event topic
......@@ -116,6 +116,8 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
c.Owner.Bytes(),
c.NormalisedBalance,
c.Depth,
c.BucketDepth,
c.ImmutableFlag,
)
case batchTopupTopic:
c := &batchTopUpEvent{}
......@@ -301,6 +303,8 @@ type batchCreatedEvent struct {
NormalisedBalance *big.Int
Owner common.Address
Depth uint8
BucketDepth uint8
ImmutableFlag bool
}
type batchTopUpEvent struct {
......@@ -320,8 +324,8 @@ type priceUpdateEvent struct {
}
var (
GoerliPostageStampContractAddress = common.HexToAddress("0xB3B7f2eD97B735893316aEeA849235de5e8972a2")
GoerliStartBlock = uint64(4818979)
GoerliPostageStampContractAddress = common.HexToAddress("0x621e455C4a139f5C4e4A8122Ce55Dc21630769E4")
GoerliStartBlock = uint64(4933174)
)
// DiscoverAddresses returns the canonical contracts for this chainID
......
......@@ -47,6 +47,7 @@ func TestListener(t *testing.T) {
c.toLog(496),
),
)
l := listener.New(logger, mf, postageStampAddress, 1, nil)
l.Listen(0, ev)
......@@ -306,12 +307,14 @@ type updater struct {
eventC chan interface{}
}
func (u *updater) Create(id, owner []byte, normalisedAmount *big.Int, depth uint8) error {
func (u *updater) Create(id, owner []byte, normalisedAmount *big.Int, depth, bucketDepth uint8, immutable bool) error {
u.eventC <- createArgs{
id: id,
owner: owner,
normalisedAmount: normalisedAmount,
bucketDepth: bucketDepth,
depth: depth,
immutable: immutable,
}
return nil
}
......@@ -428,7 +431,9 @@ type createArgs struct {
owner []byte
amount *big.Int
normalisedAmount *big.Int
bucketDepth uint8
depth uint8
immutable bool
}
func (c createArgs) compare(t *testing.T, want createArgs) {
......@@ -444,7 +449,7 @@ func (c createArgs) compare(t *testing.T, want createArgs) {
}
func (c createArgs) toLog(blockNumber uint64) types.Log {
b, err := listener.PostageStampABI.Events["BatchCreated"].Inputs.NonIndexed().Pack(c.amount, c.normalisedAmount, common.BytesToAddress(c.owner), c.depth)
b, err := listener.PostageStampABI.Events["BatchCreated"].Inputs.NonIndexed().Pack(c.amount, c.normalisedAmount, common.BytesToAddress(c.owner), c.bucketDepth, c.depth, c.immutable)
if err != nil {
panic(err)
}
......
......@@ -25,7 +25,7 @@ import (
var (
BucketDepth = uint8(16)
postageStampABI = parseABI(postageabi.PostageStampABIv0_1_0)
postageStampABI = parseABI(postageabi.PostageStampABIv0_3_0)
erc20ABI = parseABI(sw3abi.ERC20ABIv0_3_1)
batchCreatedTopic = postageStampABI.Events["BatchCreated"].ID
......@@ -35,7 +35,7 @@ var (
)
type Interface interface {
CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, label string) ([]byte, error)
CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error)
}
type postageContract struct {
......@@ -91,8 +91,9 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi
return receipt, nil
}
func (c *postageContract) sendCreateBatchTransaction(ctx context.Context, owner common.Address, initialBalance *big.Int, depth uint8, nonce common.Hash) (*types.Receipt, error) {
callData, err := postageStampABI.Pack("createBatch", owner, initialBalance, depth, nonce)
func (c *postageContract) sendCreateBatchTransaction(ctx context.Context, owner common.Address, initialBalance *big.Int, depth uint8, nonce common.Hash, immutable bool) (*types.Receipt, error) {
callData, err := postageStampABI.Pack("createBatch", owner, initialBalance, depth, BucketDepth, nonce, immutable)
if err != nil {
return nil, err
}
......@@ -107,7 +108,7 @@ func (c *postageContract) sendCreateBatchTransaction(ctx context.Context, owner
txHash, err := c.transactionService.Send(ctx, request)
if err != nil {
return nil, err
return nil, fmt.Errorf("send: depth %d bucketDepth %d immutable %t: %w", depth, BucketDepth, immutable, err)
}
receipt, err := c.transactionService.WaitForReceipt(ctx, txHash)
......@@ -143,7 +144,7 @@ func (c *postageContract) getBalance(ctx context.Context) (*big.Int, error) {
return abi.ConvertType(results[0], new(big.Int)).(*big.Int), nil
}
func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, label string) ([]byte, error) {
func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) {
if depth < BucketDepth {
return nil, ErrInvalidDepth
......@@ -170,7 +171,7 @@ func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.I
return nil, err
}
receipt, err := c.sendCreateBatchTransaction(ctx, c.owner, initialBalance, depth, common.BytesToHash(nonce))
receipt, err := c.sendCreateBatchTransaction(ctx, c.owner, initialBalance, depth, common.BytesToHash(nonce), immutable)
if err != nil {
return nil, err
}
......@@ -190,7 +191,7 @@ func (c *postageContract) CreateBatch(ctx context.Context, initialBalance *big.I
c.owner.Hex(),
batchID,
depth,
BucketDepth,
createdEvent.BucketDepth,
))
return createdEvent.BatchId[:], nil
......@@ -206,6 +207,8 @@ type batchCreatedEvent struct {
NormalisedBalance *big.Int
Owner common.Address
Depth uint8
BucketDepth uint8
ImmutableFlag bool
}
func parseABI(json string) abi.ABI {
......
......@@ -33,6 +33,7 @@ func TestCreateBatch(t *testing.T) {
initialBalance := big.NewInt(100)
t.Run("ok", func(t *testing.T) {
depth := uint8(10)
totalAmount := big.NewInt(102400)
txHashApprove := common.HexToHash("abb0")
......@@ -40,7 +41,7 @@ func TestCreateBatch(t *testing.T) {
batchID := common.HexToHash("dddd")
postageMock := postageMock.New()
expectedCallData, err := postagecontract.PostageStampABI.Pack("createBatch", owner, initialBalance, depth, common.Hash{})
expectedCallData, err := postagecontract.PostageStampABI.Pack("createBatch", owner, initialBalance, depth, postagecontract.BucketDepth, common.Hash{}, false)
if err != nil {
t.Fatal(err)
}
......@@ -86,7 +87,7 @@ func TestCreateBatch(t *testing.T) {
postageMock,
)
returnedID, err := contract.CreateBatch(ctx, initialBalance, depth, label)
returnedID, err := contract.CreateBatch(ctx, initialBalance, depth, false, label)
if err != nil {
t.Fatal(err)
}
......@@ -116,7 +117,7 @@ func TestCreateBatch(t *testing.T) {
postageMock.New(),
)
_, err := contract.CreateBatch(ctx, initialBalance, depth, label)
_, err := contract.CreateBatch(ctx, initialBalance, depth, false, label)
if !errors.Is(err, postagecontract.ErrInvalidDepth) {
t.Fatalf("expected error %v. got %v", postagecontract.ErrInvalidDepth, err)
}
......@@ -141,7 +142,7 @@ func TestCreateBatch(t *testing.T) {
postageMock.New(),
)
_, err := contract.CreateBatch(ctx, initialBalance, depth, label)
_, err := contract.CreateBatch(ctx, initialBalance, depth, false, label)
if !errors.Is(err, postagecontract.ErrInsufficientFunds) {
t.Fatalf("expected error %v. got %v", postagecontract.ErrInsufficientFunds, err)
}
......@@ -154,6 +155,8 @@ func newCreateEvent(postageContractAddress common.Address, batchId common.Hash)
big.NewInt(0),
common.Address{},
uint8(1),
uint8(2),
false,
)
if err != nil {
panic(err)
......
......@@ -12,11 +12,11 @@ import (
)
type contractMock struct {
createBatch func(ctx context.Context, initialBalance *big.Int, depth uint8, label string) ([]byte, error)
createBatch func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error)
}
func (c *contractMock) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, label string) ([]byte, error) {
return c.createBatch(ctx, initialBalance, depth, label)
func (c *contractMock) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error) {
return c.createBatch(ctx, initialBalance, depth, immutable, label)
}
// Option is a an option passed to New
......@@ -33,7 +33,7 @@ func New(opts ...Option) postagecontract.Interface {
return bs
}
func WithCreateBatchFunc(f func(ctx context.Context, initialBalance *big.Int, depth uint8, label string) ([]byte, error)) Option {
func WithCreateBatchFunc(f func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) ([]byte, error)) Option {
return func(m *contractMock) {
m.createBatch = f
}
......
......@@ -15,50 +15,36 @@ import (
)
// StampSize is the number of bytes in the serialisation of a stamp
const StampSize = 97
const (
StampSize = 113
IndexSize = 8
BucketDepth = 16
)
var (
// ErrOwnerMismatch is the error given for invalid signatures.
ErrOwnerMismatch = errors.New("owner mismatch")
// ErrInvalidIndex the error given for invalid stamp index.
ErrInvalidIndex = errors.New("invalid index")
// ErrStampInvalid is the error given if stamp cannot deserialise.
ErrStampInvalid = errors.New("invalid stamp")
// ErrBucketMismatch is the error given if stamp index bucket verification fails.
ErrBucketMismatch = errors.New("bucket mismatch")
)
// Valid checks the validity of the postage stamp; in particular:
// - authenticity - check batch is valid on the blockchain
// - authorisation - the batch owner is the stamp signer
// the validity check is only meaningful in its association of a chunk
// this chunk address needs to be given as argument
func (s *Stamp) Valid(chunkAddr swarm.Address, ownerAddr []byte) error {
toSign, err := toSignDigest(chunkAddr, s.batchID)
if err != nil {
return err
}
signerPubkey, err := crypto.Recover(s.sig, toSign)
if err != nil {
return err
}
signerAddr, err := crypto.NewEthereumAddress(*signerPubkey)
if err != nil {
return err
}
if !bytes.Equal(signerAddr, ownerAddr) {
return ErrOwnerMismatch
}
return nil
}
var _ swarm.Stamp = (*Stamp)(nil)
// Stamp represents a postage stamp as attached to a chunk.
type Stamp struct {
batchID []byte // postage batch ID
sig []byte // common r[32]s[32]v[1]-style 65 byte ECDSA signature
index []byte // index of the batch
timestamp []byte // to signal order when assigning the indexes to multiple chunks
sig []byte // common r[32]s[32]v[1]-style 65 byte ECDSA signature of batchID|index|address by owner or grantee
}
// NewStamp constructs a new stamp from a given batch ID and signature.
func NewStamp(batchID, sig []byte) *Stamp {
return &Stamp{batchID, sig}
// NewStamp constructs a new stamp from a given batch ID, index and signatures.
func NewStamp(batchID, index, timestamp, sig []byte) *Stamp {
return &Stamp{batchID, index, timestamp, sig}
}
// BatchID returns the batch ID of the stamp.
......@@ -66,17 +52,29 @@ func (s *Stamp) BatchID() []byte {
return s.batchID
}
// Sig returns the signature of the stamp.
// Index returns the within-batch index of the stamp.
func (s *Stamp) Index() []byte {
return s.index
}
// Sig returns the signature of the stamp by the user
func (s *Stamp) Sig() []byte {
return s.sig
}
// Timestamp returns the timestamp of the stamp
func (s *Stamp) Timestamp() []byte {
return s.timestamp
}
// MarshalBinary gives the byte slice serialisation of a stamp:
// batchID[32]|Signature[65].
// batchID[32]|index[8]|timestamp[8]|Signature[65].
func (s *Stamp) MarshalBinary() ([]byte, error) {
buf := make([]byte, StampSize)
copy(buf, s.batchID)
copy(buf[32:], s.sig)
copy(buf[32:40], s.index)
copy(buf[40:48], s.timestamp)
copy(buf[48:], s.sig)
return buf, nil
}
......@@ -86,19 +84,29 @@ func (s *Stamp) UnmarshalBinary(buf []byte) error {
return ErrStampInvalid
}
s.batchID = buf[:32]
s.sig = buf[32:]
s.index = buf[32:40]
s.timestamp = buf[40:48]
s.sig = buf[48:]
return nil
}
// toSignDigest creates a digest to represent the stamp which is to be signed by
// the owner.
func toSignDigest(addr swarm.Address, id []byte) ([]byte, error) {
func toSignDigest(addr, batchId, index, timestamp []byte) ([]byte, error) {
h := swarm.NewHasher()
_, err := h.Write(addr.Bytes())
_, err := h.Write(addr)
if err != nil {
return nil, err
}
_, err = h.Write(batchId)
if err != nil {
return nil, err
}
_, err = h.Write(index)
if err != nil {
return nil, err
}
_, err = h.Write(id)
_, err = h.Write(timestamp)
if err != nil {
return nil, err
}
......@@ -120,9 +128,40 @@ func ValidStamp(batchStore Storer) func(chunk swarm.Chunk, stampBytes []byte) (s
}
return nil, err
}
if err = stamp.Valid(chunk.Address(), b.Owner); err != nil {
return nil, fmt.Errorf("chunk %s stamp invalid: %w", chunk.Address().String(), err)
if err = stamp.Valid(chunk.Address(), b.Owner, b.Depth, b.BucketDepth, b.Immutable); err != nil {
return nil, err
}
return chunk.WithStamp(stamp).WithBatch(b.Radius, b.Depth, b.BucketDepth, b.Immutable), nil
}
return chunk.WithStamp(stamp).WithBatch(b.Radius, b.Depth), nil
}
// Valid checks the validity of the postage stamp; in particular:
// - authenticity - check batch is valid on the blockchain
// - authorisation - the batch owner is the stamp signer
// the validity check is only meaningful in its association of a chunk
// this chunk address needs to be given as argument
func (s *Stamp) Valid(chunkAddr swarm.Address, ownerAddr []byte, depth, bucketDepth uint8, immutable bool) error {
toSign, err := toSignDigest(chunkAddr.Bytes(), s.batchID, s.index, s.timestamp)
if err != nil {
return err
}
signerPubkey, err := crypto.Recover(s.sig, toSign)
if err != nil {
return err
}
signerAddr, err := crypto.NewEthereumAddress(*signerPubkey)
if err != nil {
return err
}
bucket, index := bytesToIndex(s.index)
if toBucket(bucketDepth, chunkAddr) != bucket {
return ErrBucketMismatch
}
if index >= 1<<int(depth-bucketDepth) {
return ErrInvalidIndex
}
if !bytes.Equal(signerAddr, ownerAddr) {
return ErrOwnerMismatch
}
return nil
}
......@@ -6,47 +6,109 @@ package postage_test
import (
"bytes"
crand "crypto/rand"
"io"
"testing"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/batchstore/mock"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
chunktesting "github.com/ethersphere/bee/pkg/storage/testing"
)
// TestStampMarshalling tests the idempotence of binary marshal/unmarshals for Stamps.
func TestStampMarshalling(t *testing.T) {
sExp := newStamp(t)
sExp := postagetesting.MustNewStamp()
buf, _ := sExp.MarshalBinary()
if len(buf) != postage.StampSize {
t.Fatalf("invalid length for serialised stamp. expected %d, got %d", postage.StampSize, len(buf))
}
s := postage.NewStamp(nil, nil)
s := postage.NewStamp(nil, nil, nil, nil)
if err := s.UnmarshalBinary(buf); err != nil {
t.Fatalf("unexpected error unmarshalling stamp: %v", err)
}
if !bytes.Equal(sExp.BatchID(), s.BatchID()) {
t.Fatalf("id mismatch, expected %x, got %x", sExp.BatchID(), s.BatchID())
compareStamps(t, sExp, s)
}
func compareStamps(t *testing.T, s1, s2 *postage.Stamp) {
if !bytes.Equal(s1.BatchID(), s2.BatchID()) {
t.Fatalf("id mismatch, expected %x, got %x", s1.BatchID(), s2.BatchID())
}
if !bytes.Equal(s1.Index(), s2.Index()) {
t.Fatalf("index mismatch, expected %x, got %x", s1.Index(), s2.Index())
}
if !bytes.Equal(s1.Timestamp(), s2.Timestamp()) {
t.Fatalf("timestamp mismatch, expected %x, got %x", s1.Index(), s2.Index())
}
if !bytes.Equal(sExp.Sig(), s.Sig()) {
t.Fatalf("sig mismatch, expected %x, got %x", sExp.Sig(), s.Sig())
if !bytes.Equal(s1.Sig(), s2.Sig()) {
t.Fatalf("sig mismatch, expected %x, got %x", s1.Sig(), s2.Sig())
}
}
// TestStampIndexMarshalling tests the idempotence of stamp index serialisation.
func TestStampIndexMarshalling(t *testing.T) {
var (
expBucket uint32 = 11789
expIndex uint32 = 199999
)
index := postage.IndexToBytes(expBucket, expIndex)
bucket, idx := postage.BytesToIndex(index)
if bucket != expBucket {
t.Fatalf("bucket mismatch. want %d, got %d", expBucket, bucket)
}
if idx != expIndex {
t.Fatalf("index mismatch. want %d, got %d", expIndex, idx)
}
}
func newStamp(t *testing.T) *postage.Stamp {
const idSize = 32
const signatureSize = 65
func TestValidStamp(t *testing.T) {
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
owner, err := crypto.NewEthereumAddress(privKey.PublicKey)
if err != nil {
t.Fatal(err)
}
b := postagetesting.MustNewBatch(postagetesting.WithOwner(owner))
bs := mock.New(mock.WithBatch(b))
signer := crypto.NewDefaultSigner(privKey)
issuer := postage.NewStampIssuer("label", "keyID", b.ID, b.Depth, b.BucketDepth)
stamper := postage.NewStamper(issuer, signer)
// this creates a chunk with a mocked stamp. ValidStamp will override this
// stamp on execution
ch := chunktesting.GenerateTestRandomChunk()
st, err := stamper.Stamp(ch.Address())
if err != nil {
t.Fatal(err)
}
stBytes, err := st.MarshalBinary()
if err != nil {
t.Fatal(err)
}
id := make([]byte, idSize)
if _, err := io.ReadFull(crand.Reader, id); err != nil {
panic(err)
// ensure the chunk doesnt have the batch details filled before we validate stamp
if ch.Depth() == b.Depth || ch.BucketDepth() == b.BucketDepth {
t.Fatal("expected chunk to not have correct depth and bucket depth at start")
}
sig := make([]byte, signatureSize)
if _, err := io.ReadFull(crand.Reader, sig); err != nil {
ch, err = postage.ValidStamp(bs)(ch, stBytes)
if err != nil {
t.Fatal(err)
}
return postage.NewStamp(id, sig)
compareStamps(t, st, ch.Stamp().(*postage.Stamp))
if ch.Depth() != b.Depth {
t.Fatalf("invalid batch depth added on chunk exp %d got %d", b.Depth, ch.Depth())
}
if ch.BucketDepth() != b.BucketDepth {
t.Fatalf("invalid bucket depth added on chunk exp %d got %d", b.BucketDepth, ch.BucketDepth())
}
if ch.Immutable() != b.Immutable {
t.Fatalf("invalid batch immutablility added on chunk exp %t got %t", b.Immutable, ch.Immutable())
}
}
......@@ -5,7 +5,9 @@
package postage
import (
"encoding/binary"
"errors"
"time"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -36,16 +38,24 @@ func NewStamper(st *StampIssuer, signer crypto.Signer) Stamper {
// Stamp takes chunk, see if the chunk can included in the batch and
// signs it with the owner of the batch of this Stamp issuer.
func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) {
toSign, err := toSignDigest(addr, st.issuer.batchID)
index, err := st.issuer.inc(addr)
if err != nil {
return nil, err
}
sig, err := st.signer.Sign(toSign)
ts := timestamp()
toSign, err := toSignDigest(addr.Bytes(), st.issuer.batchID, index, ts)
if err != nil {
return nil, err
}
if err := st.issuer.inc(addr); err != nil {
sig, err := st.signer.Sign(toSign)
if err != nil {
return nil, err
}
return NewStamp(st.issuer.batchID, sig), nil
return NewStamp(st.issuer.batchID, index, ts, sig), nil
}
func timestamp() []byte {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(time.Now().UnixNano()))
return ts
}
......@@ -6,12 +6,12 @@ package postage_test
import (
crand "crypto/rand"
"errors"
"io"
"testing"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/postage"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -47,54 +47,64 @@ func TestStamperStamping(t *testing.T) {
st := newTestStampIssuer(t)
stamper := postage.NewStamper(st, signer)
chunkAddr, stamp := createStamp(t, stamper)
if err := stamp.Valid(chunkAddr, owner); err != nil {
t.Fatal(err)
if err := stamp.Valid(chunkAddr, owner, 12, 8, true); err != nil {
t.Fatalf("expected no error, got %v", err)
}
})
// invalid stamp, incorrect chunk address (it still returns postage.ErrOwnerMismatch)
t.Run("invalid stamp", func(t *testing.T) {
// tests that Stamps returns with postage.ErrBucketMismatch
t.Run("bucket mismatch", func(t *testing.T) {
st := newTestStampIssuer(t)
stamper := postage.NewStamper(st, signer)
chunkAddr, stamp := createStamp(t, stamper)
a := chunkAddr.Bytes()
a[0] ^= 0xff
if err := stamp.Valid(swarm.NewAddress(a), owner); err != postage.ErrOwnerMismatch {
t.Fatalf("expected ErrOwnerMismatch, got %v", err)
if err := stamp.Valid(swarm.NewAddress(a), owner, 12, 8, true); !errors.Is(err, postage.ErrBucketMismatch) {
t.Fatalf("expected ErrBucketMismatch, got %v", err)
}
})
// tests that Stamps returns with postage.ErrInvalidIndex
t.Run("invalid index", func(t *testing.T) {
st := newTestStampIssuer(t)
stamper := postage.NewStamper(st, signer)
// issue 1 stamp
chunkAddr, _ := createStamp(t, stamper)
// issue another 15
// collision depth is 8, committed batch depth is 12, bucket volume 2^4
for i := 0; i < 14; i++ {
_, err = stamper.Stamp(chunkAddr)
if err != nil {
t.Fatalf("error adding stamp at step %d: %v", i, err)
}
}
stamp, err := stamper.Stamp(chunkAddr)
if err != nil {
t.Fatalf("error adding last stamp: %v", err)
}
if err := stamp.Valid(chunkAddr, owner, 11, 8, true); !errors.Is(err, postage.ErrInvalidIndex) {
t.Fatalf("expected ErrInvalidIndex, got %v", err)
}
})
// tests that Stamps returns with postage.ErrBucketFull iff
// issuer has the corresponding collision bucket filled]
t.Run("bucket full", func(t *testing.T) {
b := postagetesting.MustNewBatch(
postagetesting.WithOwner(owner),
)
st := postage.NewStampIssuer("", "", b.ID, b.Depth, 8)
st := newTestStampIssuer(t)
st = postage.NewStampIssuer("", "", st.ID(), 12, 8)
stamper := postage.NewStamper(st, signer)
// issue 1 stamp
chunkAddr, _ := createStamp(t, stamper)
// issue another 255
// collision depth is 8, committed batch depth is 16, bucket volume 2^8
for i := 0; i < 255; i++ {
h := make([]byte, 32)
_, err = io.ReadFull(crand.Reader, h)
if err != nil {
t.Fatal(err)
}
// generate a chunks matching on the first 8 bits,
// i.e., fall into the same collision bucket
h[0] = chunkAddr.Bytes()[0]
// calling Inc we pretend a stamp was issued to the address
err = st.Inc(swarm.NewAddress(h))
// issue another 15
// collision depth is 8, committed batch depth is 12, bucket volume 2^4
for i := 0; i < 15; i++ {
_, err = stamper.Stamp(chunkAddr)
if err != nil {
t.Fatal(err)
t.Fatalf("error adding stamp at step %d: %v", i, err)
}
}
// the bucket should now be full, not allowing a stamp for the pivot chunk
_, err = stamper.Stamp(chunkAddr)
if err != postage.ErrBucketFull {
if _, err = stamper.Stamp(chunkAddr); !errors.Is(err, postage.ErrBucketFull) {
t.Fatalf("expected ErrBucketFull, got %v", err)
}
})
......@@ -105,7 +115,7 @@ func TestStamperStamping(t *testing.T) {
st := newTestStampIssuer(t)
stamper := postage.NewStamper(st, signer)
chunkAddr, stamp := createStamp(t, stamper)
if err := stamp.Valid(chunkAddr, owner); err != postage.ErrOwnerMismatch {
if err := stamp.Valid(chunkAddr, owner, 12, 8, true); !errors.Is(err, postage.ErrOwnerMismatch) {
t.Fatalf("expected ErrOwnerMismatch, got %v", err)
}
})
......
......@@ -22,6 +22,7 @@ type StampIssuer struct {
bucketDepth uint8 // Bucket depth: the depth of collision buckets uniformity.
mu sync.Mutex // Mutex for buckets.
buckets []uint32 // Collision buckets: counts per neighbourhoods (limited to 2^{batchdepth-bucketdepth}).
maxBucketCount uint32 // the count of the fullest bucket
}
// NewStampIssuer constructs a StampIssuer as an extension of a batch for local
......@@ -41,24 +42,45 @@ func NewStampIssuer(label, keyID string, batchID []byte, batchDepth, bucketDepth
// inc increments the count in the correct collision bucket for a newly stamped
// chunk with address addr.
func (st *StampIssuer) inc(addr swarm.Address) error {
func (st *StampIssuer) inc(addr swarm.Address) ([]byte, error) {
st.mu.Lock()
defer st.mu.Unlock()
b := toBucket(st.bucketDepth, addr)
if st.buckets[b] == 1<<(st.batchDepth-st.bucketDepth) {
return ErrBucketFull
bucketCount := st.buckets[b]
if bucketCount == 1<<(st.batchDepth-st.bucketDepth) {
return nil, ErrBucketFull
}
st.buckets[b]++
return nil
if st.buckets[b] > st.maxBucketCount {
st.maxBucketCount = st.buckets[b]
}
return indexToBytes(b, bucketCount), nil
}
// toBucket calculates the index of the collision bucket for a swarm address
// using depth as collision bucket depth
// bucket index := collision bucket depth number of bits as bigendian uint32
func toBucket(depth uint8, addr swarm.Address) uint32 {
i := binary.BigEndian.Uint32(addr.Bytes()[:4])
return i >> (32 - depth)
}
// indexToBytes creates an uint64 index from
// - bucket index (neighbourhood index, uint32 <2^depth, bytes 2-4)
// - and the within-bucket index (uint32 <2^(batchdepth-bucketdepth), bytes 5-8)
func indexToBytes(bucket, index uint32) []byte {
buf := make([]byte, IndexSize)
binary.BigEndian.PutUint32(buf, bucket)
binary.BigEndian.PutUint32(buf[4:], index)
return buf
}
func bytesToIndex(buf []byte) (bucket, index uint32) {
index64 := binary.BigEndian.Uint64(buf)
bucket = uint32(index64 >> 32)
index = uint32(index64)
return bucket, index
}
// Label returns the label of the issuer.
func (st *StampIssuer) Label() string {
return st.label
......@@ -67,7 +89,7 @@ func (st *StampIssuer) Label() string {
// MarshalBinary gives the byte slice serialisation of a StampIssuer:
// = label[32]|keyID[32]|batchID[32]|batchDepth[1]|bucketDepth[1]|size_0[4]|size_1[4]|....
func (st *StampIssuer) MarshalBinary() ([]byte, error) {
buf := make([]byte, 32+32+32+1+1+(1<<(st.bucketDepth+2)))
buf := make([]byte, 32+32+32+1+1+4*(1<<st.bucketDepth))
label := []byte(st.label)
copy(buf[32-len(label):32], label)
keyID := []byte(st.keyID)
......@@ -115,15 +137,9 @@ func toString(buf []byte) string {
// an integer between 0 and 4294967295. Batch fullness can be
// calculated with: max_bucket_value / 2 ^ (batch_depth - bucket_depth)
func (st *StampIssuer) Utilization() uint32 {
top := uint32(0)
for _, v := range st.buckets {
if v > top {
top = v
}
}
return top
st.mu.Lock()
defer st.mu.Unlock()
return st.maxBucketCount
}
// ID returns the BatchID for this batch.
......
......@@ -11,7 +11,6 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestStampIssuerMarshalling tests the idempotence of binary marshal/unmarshal.
......@@ -38,17 +37,5 @@ func newTestStampIssuer(t *testing.T) *postage.StampIssuer {
if err != nil {
t.Fatal(err)
}
st := postage.NewStampIssuer("label", "keyID", id, 16, 8)
addr := make([]byte, 32)
for i := 0; i < 1<<8; i++ {
_, err := io.ReadFull(crand.Reader, addr)
if err != nil {
t.Fatal(err)
}
err = st.Inc(swarm.NewAddress(addr))
if err != nil {
t.Fatal(err)
}
}
return st
return postage.NewStampIssuer("label", "keyID", id, 12, 8)
}
......@@ -15,7 +15,10 @@ import (
"github.com/ethersphere/bee/pkg/postage"
)
const defaultDepth = 16
const (
defaultBucketDepth = 12
defaultDepth = 16
)
// BatchOption is an optional parameter for NewBatch
type BatchOption func(c *postage.Batch)
......@@ -53,7 +56,9 @@ func MustNewBatch(opts ...BatchOption) *postage.Batch {
ID: MustNewID(),
Value: NewBigInt(),
Start: rand.Uint64(), // skipcq: GSC-G404
BucketDepth: defaultBucketDepth,
Depth: defaultDepth,
Immutable: true,
}
for _, opt := range opts {
......
......@@ -27,5 +27,5 @@ func MustNewSignature() []byte {
// MustNewStamp will generate a postage stamp with random data. Panics on
// errors.
func MustNewStamp() *postage.Stamp {
return postage.NewStamp(MustNewID(), MustNewSignature())
return postage.NewStamp(MustNewID(), MustNewID()[:8], MustNewID()[:8], MustNewSignature())
}
......@@ -801,7 +801,7 @@ func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices p
unwrap = func(swarm.Chunk) {}
}
validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch.WithStamp(postage.NewStamp(nil, nil)), nil
return ch.WithStamp(postage.NewStamp(nil, nil, nil, nil)), nil
}
return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil), storer, mtag
......
......@@ -233,7 +233,7 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.Callback) storage.Store
)
retrieve := retrieval.New(swarm.ZeroAddress, mockStorer, recorder, ps, logger, serverMockAccounting, pricerMock, nil)
validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch.WithStamp(postage.NewStamp(nil, nil)), nil
return ch.WithStamp(postage.NewStamp(nil, nil, nil, nil)), nil
}
ns := netstore.New(storer, validStamp, recoveryFunc, retrieve, logger)
......
......@@ -311,13 +311,11 @@ func Example_store() {
ch := testing.GenerateTestRandomChunk()
err = s.Put(context.Background(), ch)
if err != nil {
fmt.Println("put chunk:", err)
return
}
got, err := s.Get(context.Background(), ch.Address())
if err != nil {
fmt.Println("get chunk:", err)
return
}
......
......@@ -46,9 +46,13 @@ type Item struct {
PinCounter uint64 // maintains the no of time a chunk is pinned
Tag uint32
BatchID []byte // postage batch ID
Sig []byte // postage stamp
Depth uint8 // postage batch depth
Index []byte // postage stamp within-batch: index
Timestamp []byte // postage stamp validity
Sig []byte // postage stamp signature
BucketDepth uint8 // postage batch bucket depth (for collision sets)
Depth uint8 // postage batch depth (for size)
Radius uint8 // postage batch reserve radius, po upto and excluding which chunks are unpinned
Immutable bool // whether postage batch can be diluted and drained, and indexes overwritten - nullable bool
}
// Merge is a helper method to construct a new
......@@ -76,11 +80,20 @@ func (i Item) Merge(i2 Item) Item {
if i.Tag == 0 {
i.Tag = i2.Tag
}
if len(i.BatchID) == 0 {
i.BatchID = i2.BatchID
}
if len(i.Index) == 0 {
i.Index = i2.Index
}
if len(i.Timestamp) == 0 {
i.Timestamp = i2.Timestamp
}
if len(i.Sig) == 0 {
i.Sig = i2.Sig
}
if len(i.BatchID) == 0 {
i.BatchID = i2.BatchID
if i.BucketDepth == 0 {
i.BucketDepth = i2.BucketDepth
}
if i.Depth == 0 {
i.Depth = i2.Depth
......@@ -88,6 +101,9 @@ func (i Item) Merge(i2 Item) Item {
if i.Radius == 0 {
i.Radius = i2.Radius
}
if !i.Immutable {
i.Immutable = i2.Immutable
}
return i
}
......
......@@ -69,6 +69,10 @@ func (m ModePut) String() string {
return "Upload"
case ModePutUploadPin:
return "UploadPin"
case ModePutRequestPin:
return "RequestPin"
case ModePutRequestCache:
return "RequestCache"
default:
return "Unknown"
}
......
......@@ -142,8 +142,12 @@ type Chunk interface {
Radius() uint8
// Depth returns the batch depth of the stamp - allowed batch size = 2^{depth}.
Depth() uint8
// BucketDepth returns the bucket depth of the batch of the stamp - always < depth.
BucketDepth() uint8
// Immutable returns whether the batch is immutable
Immutable() bool
// WithBatch attaches batch parameters to the chunk.
WithBatch(radius, depth uint8) Chunk
WithBatch(radius, depth, bucketDepth uint8, immutable bool) Chunk
// Equal checks if the chunk is equal to another.
Equal(Chunk) bool
}
......@@ -151,7 +155,9 @@ type Chunk interface {
// Stamp interface for postage.Stamp to avoid circular dependency
type Stamp interface {
BatchID() []byte
Index() []byte
Sig() []byte
Timestamp() []byte
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
......@@ -163,6 +169,8 @@ type chunk struct {
stamp Stamp
radius uint8
depth uint8
bucketDepth uint8
immutable bool
}
func NewChunk(addr Address, data []byte) Chunk {
......@@ -182,9 +190,11 @@ func (c *chunk) WithStamp(stamp Stamp) Chunk {
return c
}
func (c *chunk) WithBatch(radius, depth uint8) Chunk {
func (c *chunk) WithBatch(radius, depth, bucketDepth uint8, immutable bool) Chunk {
c.radius = radius
c.depth = depth
c.bucketDepth = bucketDepth
c.immutable = immutable
return c
}
......@@ -212,6 +222,14 @@ func (c *chunk) Depth() uint8 {
return c.depth
}
func (c *chunk) BucketDepth() uint8 {
return c.bucketDepth
}
func (c *chunk) Immutable() bool {
return c.immutable
}
func (c *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", c.addr.String(), len(c.sdata))
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment