Commit 6ffa8bcb authored by acud's avatar acud Committed by GitHub

node, postage: persist and load batches (#1632)

parent 45ffe355
...@@ -91,6 +91,7 @@ type Bee struct { ...@@ -91,6 +91,7 @@ type Bee struct {
transactionMonitorCloser io.Closer transactionMonitorCloser io.Closer
recoveryHandleCleanup func() recoveryHandleCleanup func()
listenerCloser io.Closer listenerCloser io.Closer
postageServiceCloser io.Closer
} }
type Options struct { type Options struct {
...@@ -311,7 +312,11 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -311,7 +312,11 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
return nil, fmt.Errorf("batchstore: %w", err) return nil, fmt.Errorf("batchstore: %w", err)
} }
validStamp := postage.ValidStamp(batchStore) validStamp := postage.ValidStamp(batchStore)
post := postage.NewService(stateStore, chainID) post, err := postage.NewService(stateStore, chainID)
if err != nil {
return nil, fmt.Errorf("postage service load: %w", err)
}
b.postageServiceCloser = post
var ( var (
postageContractService postagecontract.Interface postageContractService postagecontract.Interface
...@@ -714,10 +719,14 @@ func (b *Bee) Shutdown(ctx context.Context) error { ...@@ -714,10 +719,14 @@ func (b *Bee) Shutdown(ctx context.Context) error {
if b.listenerCloser != nil { if b.listenerCloser != nil {
if err := b.listenerCloser.Close(); err != nil { if err := b.listenerCloser.Close(); err != nil {
errs.add(fmt.Errorf("error listener: %w", err)) errs.add(fmt.Errorf("listener: %w", err))
} }
} }
if err := b.postageServiceCloser.Close(); err != nil {
errs.add(fmt.Errorf("postage service: %w", err))
}
if err := b.stateStoreCloser.Close(); err != nil { if err := b.stateStoreCloser.Close(); err != nil {
errs.add(fmt.Errorf("statestore: %w", err)) errs.add(fmt.Errorf("statestore: %w", err))
} }
......
...@@ -64,10 +64,6 @@ func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, error) { ...@@ -64,10 +64,6 @@ func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, error) {
return nil, errors.New("stampissuer not found") return nil, errors.New("stampissuer not found")
} }
func (m *mockPostage) Load() error { func (m *mockPostage) Close() error {
panic("not implemented") // TODO: Implement return nil
}
func (m *mockPostage) Save() error {
panic("not implemented") // TODO: Implement
} }
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"io"
"sync" "sync"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
...@@ -27,8 +28,7 @@ type Service interface { ...@@ -27,8 +28,7 @@ type Service interface {
Add(*StampIssuer) Add(*StampIssuer)
StampIssuers() []*StampIssuer StampIssuers() []*StampIssuer
GetStampIssuer([]byte) (*StampIssuer, error) GetStampIssuer([]byte) (*StampIssuer, error)
Load() error io.Closer
Save() error
} }
// service handles postage batches // service handles postage batches
...@@ -41,11 +41,28 @@ type service struct { ...@@ -41,11 +41,28 @@ type service struct {
} }
// NewService constructs a new Service. // NewService constructs a new Service.
func NewService(store storage.StateStorer, chainID int64) Service { func NewService(store storage.StateStorer, chainID int64) (Service, error) {
return &service{ s := &service{
store: store, store: store,
chainID: chainID, chainID: chainID,
} }
n := 0
if err := s.store.Iterate(s.key(), func(_, _ []byte) (stop bool, err error) {
n++
return false, nil
}); err != nil {
return nil, err
}
for i := 0; i < n; i++ {
st := &StampIssuer{}
err := s.store.Get(s.keyForIndex(i), st)
if err != nil {
return nil, err
}
s.Add(st)
}
return s, nil
} }
// Add adds a stamp issuer to the active issuers. // Add adds a stamp issuer to the active issuers.
...@@ -74,28 +91,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, error) { ...@@ -74,28 +91,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, error) {
return nil, ErrNotFound return nil, ErrNotFound
} }
// Load loads all active batches (stamp issuers) from the statestore. // Close saves all the active stamp issuers to statestore.
func (ps *service) Load() error { func (ps *service) Close() error {
n := 0
if err := ps.store.Iterate(ps.key(), func(key, _ []byte) (stop bool, err error) {
n++
return false, nil
}); err != nil {
return err
}
for i := 0; i < n; i++ {
st := &StampIssuer{}
err := ps.store.Get(ps.keyForIndex(i), st)
if err != nil {
return err
}
ps.Add(st)
}
return nil
}
// Save saves all the active stamp issuers to statestore.
func (ps *service) Save() error {
for i, st := range ps.issuers { for i, st := range ps.issuers {
if err := ps.store.Put(ps.keyForIndex(i), st); err != nil { if err := ps.store.Put(ps.keyForIndex(i), st); err != nil {
return err return err
......
...@@ -19,18 +19,21 @@ import ( ...@@ -19,18 +19,21 @@ import (
func TestSaveLoad(t *testing.T) { func TestSaveLoad(t *testing.T) {
store := storemock.NewStateStore() store := storemock.NewStateStore()
saved := func(id int64) postage.Service { saved := func(id int64) postage.Service {
ps := postage.NewService(store, id) ps, err := postage.NewService(store, id)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 16; i++ { for i := 0; i < 16; i++ {
ps.Add(newTestStampIssuer(t)) ps.Add(newTestStampIssuer(t))
} }
if err := ps.Save(); err != nil { if err := ps.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
return ps return ps
} }
loaded := func(id int64) postage.Service { loaded := func(id int64) postage.Service {
ps := postage.NewService(store, id) ps, err := postage.NewService(store, id)
if err := ps.Load(); err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
return ps return ps
...@@ -48,7 +51,10 @@ func TestSaveLoad(t *testing.T) { ...@@ -48,7 +51,10 @@ func TestSaveLoad(t *testing.T) {
func TestGetStampIssuer(t *testing.T) { func TestGetStampIssuer(t *testing.T) {
store := storemock.NewStateStore() store := storemock.NewStateStore()
ps := postage.NewService(store, int64(0)) ps, err := postage.NewService(store, int64(0))
if err != nil {
t.Fatal(err)
}
ids := make([][]byte, 8) ids := make([][]byte, 8)
for i := range ids { for i := range ids {
id := make([]byte, 32) id := make([]byte, 32)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment