Commit 76aff41e authored by acud's avatar acud Committed by GitHub

feat: add resync and more pprof profiles (#2405)

parent 11cc37e4
...@@ -72,6 +72,9 @@ const ( ...@@ -72,6 +72,9 @@ const (
optionNameMainNet = "mainnet" optionNameMainNet = "mainnet"
optionNameRetrievalCaching = "cache-retrieval" optionNameRetrievalCaching = "cache-retrieval"
optionNameDevReserveCapacity = "dev-reserve-capacity" optionNameDevReserveCapacity = "dev-reserve-capacity"
optionNameResync = "resync"
optionNamePProfBlock = "pprof-profile"
optionNamePProfMutex = "pprof-mutex"
) )
func init() { func init() {
...@@ -254,6 +257,9 @@ func (c *command) setAllFlags(cmd *cobra.Command) { ...@@ -254,6 +257,9 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().Duration(optionWarmUpTime, time.Minute*20, "time to warmup the node before pull/push protocols can be kicked off.") cmd.Flags().Duration(optionWarmUpTime, time.Minute*20, "time to warmup the node before pull/push protocols can be kicked off.")
cmd.Flags().Bool(optionNameMainNet, false, "triggers connect to main net bootnodes.") cmd.Flags().Bool(optionNameMainNet, false, "triggers connect to main net bootnodes.")
cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching") cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching")
cmd.Flags().Bool(optionNameResync, false, "forces the node to resync postage contract data")
cmd.Flags().Bool(optionNamePProfBlock, false, "enable pprof block profile")
cmd.Flags().Bool(optionNamePProfMutex, false, "enable pprof mutex profile")
} }
func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) { func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) {
......
...@@ -199,6 +199,9 @@ inability to use, or your interaction with other nodes or the software.`) ...@@ -199,6 +199,9 @@ inability to use, or your interaction with other nodes or the software.`)
WarmupTime: c.config.GetDuration(optionWarmUpTime), WarmupTime: c.config.GetDuration(optionWarmUpTime),
ChainID: networkConfig.chainID, ChainID: networkConfig.chainID,
RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching), RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching),
Resync: c.config.GetBool(optionNameResync),
BlockProfile: c.config.GetBool(optionNamePProfBlock),
MutexProfile: c.config.GetBool(optionNamePProfMutex),
}) })
if err != nil { if err != nil {
return err return err
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sync" "sync"
"syscall" "syscall"
"time" "time"
...@@ -154,6 +155,9 @@ type Options struct { ...@@ -154,6 +155,9 @@ type Options struct {
DeployGasPrice string DeployGasPrice string
WarmupTime time.Duration WarmupTime time.Duration
ChainID int64 ChainID int64
Resync bool
BlockProfile bool
MutexProfile bool
} }
const ( const (
...@@ -238,6 +242,15 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -238,6 +242,15 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
if err != nil { if err != nil {
return nil, fmt.Errorf("eth address: %w", err) return nil, fmt.Errorf("eth address: %w", err)
} }
if o.MutexProfile {
_ = runtime.SetMutexProfileFraction(1)
}
if o.BlockProfile {
runtime.SetBlockProfileRate(1)
}
// set up basic debug api endpoints for debugging and /health endpoint // set up basic debug api endpoints for debugging and /health endpoint
debugAPIService = debugapi.New(*publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins, big.NewInt(int64(o.BlockTime)), transactionService) debugAPIService = debugapi.New(*publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins, big.NewInt(int64(o.BlockTime)), transactionService)
...@@ -435,7 +448,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -435,7 +448,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b}) eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b})
b.listenerCloser = eventListener b.listenerCloser = eventListener
batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256) batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -32,6 +32,7 @@ type batchService struct { ...@@ -32,6 +32,7 @@ type batchService struct {
batchListener postage.BatchCreationListener batchListener postage.BatchCreationListener
checksum hash.Hash // checksum hasher checksum hash.Hash // checksum hasher
resync bool
} }
type Interface interface { type Interface interface {
...@@ -47,6 +48,7 @@ func New( ...@@ -47,6 +48,7 @@ func New(
owner []byte, owner []byte,
batchListener postage.BatchCreationListener, batchListener postage.BatchCreationListener,
checksumFunc func() hash.Hash, checksumFunc func() hash.Hash,
resync bool,
) (Interface, error) { ) (Interface, error) {
if checksumFunc == nil { if checksumFunc == nil {
checksumFunc = sha3.New256 checksumFunc = sha3.New256
...@@ -56,25 +58,37 @@ func New( ...@@ -56,25 +58,37 @@ func New(
sum = checksumFunc() sum = checksumFunc()
) )
if err := stateStore.Get(checksumDBKey, &b); err != nil { dirty := false
if !errors.Is(err, storage.ErrNotFound) { err := stateStore.Get(dirtyDBKey, &dirty)
return nil, err if err != nil && !errors.Is(err, storage.ErrNotFound) {
} return nil, err
} else { }
s, err := hex.DecodeString(b)
if err != nil { if resync {
return nil, err if err := stateStore.Delete(checksumDBKey); err != nil {
}
n, err := sum.Write(s)
if err != nil {
return nil, err return nil, err
} }
if n != len(s) { } else if !dirty {
return nil, errors.New("batchstore checksum init") if err := stateStore.Get(checksumDBKey, &b); err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
} else {
s, err := hex.DecodeString(b)
if err != nil {
return nil, err
}
n, err := sum.Write(s)
if err != nil {
return nil, err
}
if n != len(s) {
return nil, errors.New("batchstore checksum init")
}
} }
} }
return &batchService{stateStore, storer, logger, listener, owner, batchListener, sum}, nil return &batchService{stateStore, storer, logger, listener, owner, batchListener, sum, resync}, nil
} }
// Create will create a new batch with the given ID, owner value and depth and // Create will create a new batch with the given ID, owner value and depth and
...@@ -195,15 +209,21 @@ func (svc *batchService) Start(startBlock uint64) (<-chan struct{}, error) { ...@@ -195,15 +209,21 @@ func (svc *batchService) Start(startBlock uint64) (<-chan struct{}, error) {
if err != nil && !errors.Is(err, storage.ErrNotFound) { if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err return nil, err
} }
if dirty {
svc.logger.Warning("batch service: dirty shutdown detected, resetting batch store") if dirty || svc.resync {
if svc.resync {
svc.logger.Warning("batch service: resync requested, resetting batch store")
} else {
svc.logger.Warning("batch service: dirty shutdown detected, resetting batch store")
}
if err := svc.storer.Reset(); err != nil { if err := svc.storer.Reset(); err != nil {
return nil, err return nil, err
} }
if err := svc.stateStore.Delete(dirtyDBKey); err != nil { if err := svc.stateStore.Delete(dirtyDBKey); err != nil {
return nil, err return nil, err
} }
svc.logger.Warning("batch service: batch store reset. your node will now resync chain data") svc.logger.Warning("batch service: batch store has been reset. your node will now resync chain data. this might take a while...")
} }
cs := svc.storer.GetChainState() cs := svc.storer.GetChainState()
......
...@@ -333,7 +333,7 @@ func TestTransactionOk(t *testing.T) { ...@@ -333,7 +333,7 @@ func TestTransactionOk(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil) svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -356,7 +356,7 @@ func TestTransactionFail(t *testing.T) { ...@@ -356,7 +356,7 @@ func TestTransactionFail(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil) svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -373,7 +373,7 @@ func TestChecksum(t *testing.T) { ...@@ -373,7 +373,7 @@ func TestChecksum(t *testing.T) {
s := mocks.NewStateStore() s := mocks.NewStateStore()
store := mock.New() store := mock.New()
mockHash := &hs{} mockHash := &hs{}
svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }) svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -389,6 +389,49 @@ func TestChecksum(t *testing.T) { ...@@ -389,6 +389,49 @@ func TestChecksum(t *testing.T) {
} }
} }
func TestChecksumResync(t *testing.T) {
s := mocks.NewStateStore()
store := mock.New()
mockHash := &hs{}
svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, true)
if err != nil {
t.Fatal(err)
}
testNormalisedBalance := big.NewInt(2000000000000)
testBatch := postagetesting.MustNewBatch()
putBatch(t, store, testBatch)
if err := svc.TopUp(testBatch.ID, testNormalisedBalance, testTxHash); err != nil {
t.Fatalf("top up: %v", err)
}
if m := mockHash.ctr; m != 2 {
t.Fatalf("expected %d calls got %d", 2, m)
}
// now start a new instance and check that the value gets read from statestore
store2 := mock.New()
mockHash2 := &hs{}
_, err = batchservice.New(s, store2, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash2 }, false)
if err != nil {
t.Fatal(err)
}
if m := mockHash2.ctr; m != 1 {
t.Fatalf("expected %d calls got %d", 1, m)
}
// now start a new instance and check that the value does not get written into the hasher
// when resyncing
store3 := mock.New()
mockHash3 := &hs{}
_, err = batchservice.New(s, store3, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash3 }, true)
if err != nil {
t.Fatal(err)
}
if m := mockHash3.ctr; m != 0 {
t.Fatalf("expected %d calls got %d", 0, m)
}
}
func newTestStoreAndServiceWithListener( func newTestStoreAndServiceWithListener(
t *testing.T, t *testing.T,
owner []byte, owner []byte,
...@@ -398,7 +441,7 @@ func newTestStoreAndServiceWithListener( ...@@ -398,7 +441,7 @@ func newTestStoreAndServiceWithListener(
t.Helper() t.Helper()
s := mocks.NewStateStore() s := mocks.NewStateStore()
store := mock.New(opts...) store := mock.New(opts...)
svc, err := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener, nil) svc, err := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener, nil, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment