Commit 97420333 authored by acud's avatar acud Committed by GitHub

shed, localstore, cmd: expose leveldb configuration parameters to cli (#1454)

parent b22fda83
...@@ -19,6 +19,10 @@ import ( ...@@ -19,6 +19,10 @@ import (
const ( const (
optionNameDataDir = "data-dir" optionNameDataDir = "data-dir"
optionNameDBCapacity = "db-capacity" optionNameDBCapacity = "db-capacity"
optionNameDBOpenFilesLimit = "db-open-files-limit"
optionNameDBBlockCacheCapacity = "db-block-cache-capacity"
optionNameDBWriteBufferSize = "db-write-buffer-size"
optionNameDBDisableSeeksCompaction = "db-disable-seeks-compaction"
optionNamePassword = "password" optionNamePassword = "password"
optionNamePasswordFile = "password-file" optionNamePasswordFile = "password-file"
optionNameAPIAddr = "api-addr" optionNameAPIAddr = "api-addr"
...@@ -180,6 +184,10 @@ func (c *command) setHomeDir() (err error) { ...@@ -180,6 +184,10 @@ func (c *command) setHomeDir() (err error) {
func (c *command) setAllFlags(cmd *cobra.Command) { func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory") cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory")
cmd.Flags().Uint64(optionNameDBCapacity, 5000000, fmt.Sprintf("db capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize)) cmd.Flags().Uint64(optionNameDBCapacity, 5000000, fmt.Sprintf("db capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks")
cmd.Flags().String(optionNamePassword, "", "password for decrypting keys") cmd.Flags().String(optionNamePassword, "", "password for decrypting keys")
cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys") cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys")
cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address") cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address")
......
...@@ -121,33 +121,37 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz ...@@ -121,33 +121,37 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
} }
b, err := node.NewBee(c.config.GetString(optionNameP2PAddr), signerConfig.address, *signerConfig.publicKey, signerConfig.signer, c.config.GetUint64(optionNameNetworkID), logger, signerConfig.libp2pPrivateKey, signerConfig.pssPrivateKey, node.Options{ b, err := node.NewBee(c.config.GetString(optionNameP2PAddr), signerConfig.address, *signerConfig.publicKey, signerConfig.signer, c.config.GetUint64(optionNameNetworkID), logger, signerConfig.libp2pPrivateKey, signerConfig.pssPrivateKey, node.Options{
DataDir: c.config.GetString(optionNameDataDir), DataDir: c.config.GetString(optionNameDataDir),
DBCapacity: c.config.GetUint64(optionNameDBCapacity), DBCapacity: c.config.GetUint64(optionNameDBCapacity),
APIAddr: c.config.GetString(optionNameAPIAddr), DBOpenFilesLimit: c.config.GetUint64(optionNameDBOpenFilesLimit),
DebugAPIAddr: debugAPIAddr, DBBlockCacheCapacity: c.config.GetUint64(optionNameDBBlockCacheCapacity),
Addr: c.config.GetString(optionNameP2PAddr), DBWriteBufferSize: c.config.GetUint64(optionNameDBWriteBufferSize),
NATAddr: c.config.GetString(optionNameNATAddr), DBDisableSeeksCompaction: c.config.GetBool(optionNameDBDisableSeeksCompaction),
EnableWS: c.config.GetBool(optionNameP2PWSEnable), APIAddr: c.config.GetString(optionNameAPIAddr),
EnableQUIC: c.config.GetBool(optionNameP2PQUICEnable), DebugAPIAddr: debugAPIAddr,
WelcomeMessage: c.config.GetString(optionWelcomeMessage), Addr: c.config.GetString(optionNameP2PAddr),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes), NATAddr: c.config.GetString(optionNameNATAddr),
CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins), EnableWS: c.config.GetBool(optionNameP2PWSEnable),
Standalone: c.config.GetBool(optionNameStandalone), EnableQUIC: c.config.GetBool(optionNameP2PQUICEnable),
TracingEnabled: c.config.GetBool(optionNameTracingEnabled), WelcomeMessage: c.config.GetString(optionWelcomeMessage),
TracingEndpoint: c.config.GetString(optionNameTracingEndpoint), Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
TracingServiceName: c.config.GetString(optionNameTracingServiceName), CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins),
Logger: logger, Standalone: c.config.GetBool(optionNameStandalone),
GlobalPinningEnabled: c.config.GetBool(optionNameGlobalPinningEnabled), TracingEnabled: c.config.GetBool(optionNameTracingEnabled),
PaymentThreshold: c.config.GetString(optionNamePaymentThreshold), TracingEndpoint: c.config.GetString(optionNameTracingEndpoint),
PaymentTolerance: c.config.GetString(optionNamePaymentTolerance), TracingServiceName: c.config.GetString(optionNameTracingServiceName),
PaymentEarly: c.config.GetString(optionNamePaymentEarly), Logger: logger,
ResolverConnectionCfgs: resolverCfgs, GlobalPinningEnabled: c.config.GetBool(optionNameGlobalPinningEnabled),
GatewayMode: c.config.GetBool(optionNameGatewayMode), PaymentThreshold: c.config.GetString(optionNamePaymentThreshold),
BootnodeMode: c.config.GetBool(optionNameBootnodeMode), PaymentTolerance: c.config.GetString(optionNamePaymentTolerance),
SwapEndpoint: c.config.GetString(optionNameSwapEndpoint), PaymentEarly: c.config.GetString(optionNamePaymentEarly),
SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress), ResolverConnectionCfgs: resolverCfgs,
SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit), GatewayMode: c.config.GetBool(optionNameGatewayMode),
SwapEnable: c.config.GetBool(optionNameSwapEnable), BootnodeMode: c.config.GetBool(optionNameBootnodeMode),
SwapEndpoint: c.config.GetString(optionNameSwapEndpoint),
SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress),
SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit),
SwapEnable: c.config.GetBool(optionNameSwapEnable),
}) })
if err != nil { if err != nil {
return err return err
......
...@@ -147,6 +147,19 @@ type Options struct { ...@@ -147,6 +147,19 @@ type Options struct {
// Capacity is a limit that triggers garbage collection when // Capacity is a limit that triggers garbage collection when
// number of items in gcIndex equals or exceeds it. // number of items in gcIndex equals or exceeds it.
Capacity uint64 Capacity uint64
// OpenFilesLimit defines the upper bound of open files that the
// the localstore should maintain at any point of time. It is
// passed on to the shed constructor.
OpenFilesLimit uint64
// BlockCacheCapacity defines the block cache capacity and is passed
// on to shed.
BlockCacheCapacity uint64
// WriteBuffer defines the size of writer buffer and is passed on to shed.
WriteBufferSize uint64
// DisableSeeksCompaction toggles the seek driven compactions feature on leveldb
// and is passed on to shed.
DisableSeeksCompaction bool
// MetricsPrefix defines a prefix for metrics names. // MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string MetricsPrefix string
Tags *tags.Tags Tags *tags.Tags
...@@ -193,7 +206,14 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB ...@@ -193,7 +206,14 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
db.updateGCSem = make(chan struct{}, maxParallelUpdateGC) db.updateGCSem = make(chan struct{}, maxParallelUpdateGC)
} }
db.shed, err = shed.NewDB(path) shedOpts := &shed.Options{
OpenFilesLimit: o.OpenFilesLimit,
BlockCacheCapacity: o.BlockCacheCapacity,
WriteBufferSize: o.WriteBufferSize,
DisableSeeksCompaction: o.DisableSeeksCompaction,
}
db.shed, err = shed.NewDB(path, shedOpts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -82,33 +82,37 @@ type Bee struct { ...@@ -82,33 +82,37 @@ type Bee struct {
} }
type Options struct { type Options struct {
DataDir string DataDir string
DBCapacity uint64 DBCapacity uint64
APIAddr string DBOpenFilesLimit uint64
DebugAPIAddr string DBWriteBufferSize uint64
Addr string DBBlockCacheCapacity uint64
NATAddr string DBDisableSeeksCompaction bool
EnableWS bool APIAddr string
EnableQUIC bool DebugAPIAddr string
WelcomeMessage string Addr string
Bootnodes []string NATAddr string
CORSAllowedOrigins []string EnableWS bool
Logger logging.Logger EnableQUIC bool
Standalone bool WelcomeMessage string
TracingEnabled bool Bootnodes []string
TracingEndpoint string CORSAllowedOrigins []string
TracingServiceName string Logger logging.Logger
GlobalPinningEnabled bool Standalone bool
PaymentThreshold string TracingEnabled bool
PaymentTolerance string TracingEndpoint string
PaymentEarly string TracingServiceName string
ResolverConnectionCfgs []multiresolver.ConnectionConfig GlobalPinningEnabled bool
GatewayMode bool PaymentThreshold string
BootnodeMode bool PaymentTolerance string
SwapEndpoint string PaymentEarly string
SwapFactoryAddress string ResolverConnectionCfgs []multiresolver.ConnectionConfig
SwapInitialDeposit string GatewayMode bool
SwapEnable bool BootnodeMode bool
SwapEndpoint string
SwapFactoryAddress string
SwapInitialDeposit string
SwapEnable bool
} }
func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (b *Bee, err error) { func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (b *Bee, err error) {
...@@ -381,7 +385,11 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -381,7 +385,11 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
path = filepath.Join(o.DataDir, "localstore") path = filepath.Join(o.DataDir, "localstore")
} }
lo := &localstore.Options{ lo := &localstore.Options{
Capacity: o.DBCapacity, Capacity: o.DBCapacity,
OpenFilesLimit: o.DBOpenFilesLimit,
BlockCacheCapacity: o.DBBlockCacheCapacity,
WriteBufferSize: o.DBWriteBufferSize,
DisableSeeksCompaction: o.DBDisableSeeksCompaction,
} }
storer, err := localstore.New(path, swarmAddress.Bytes(), lo, logger) storer, err := localstore.New(path, swarmAddress.Bytes(), lo, logger)
if err != nil { if err != nil {
......
...@@ -32,11 +32,19 @@ import ( ...@@ -32,11 +32,19 @@ import (
) )
var ( var (
openFileLimit = 128 // The limit for LevelDB OpenFilesCacheCapacity. defaultOpenFilesLimit = uint64(256)
blockCacheCapacity = 32 * 1024 * 1024 defaultBlockCacheCapacity = uint64(32 * 1024 * 1024)
writeBuffer = 32 * 1024 * 1024 defaultWriteBufferSize = uint64(32 * 1024 * 1024)
defaultDisableSeeksCompaction = false
) )
type Options struct {
BlockCacheCapacity uint64
WriteBufferSize uint64
OpenFilesLimit uint64
DisableSeeksCompaction bool
}
// DB provides abstractions over LevelDB in order to // DB provides abstractions over LevelDB in order to
// implement complex structures using fields and ordered indexes. // implement complex structures using fields and ordered indexes.
// It provides a schema functionality to store fields and indexes // It provides a schema functionality to store fields and indexes
...@@ -50,16 +58,24 @@ type DB struct { ...@@ -50,16 +58,24 @@ type DB struct {
// NewDB constructs a new DB and validates the schema // NewDB constructs a new DB and validates the schema
// if it exists in database on the given path. // if it exists in database on the given path.
// metricsPrefix is used for metrics collection for the given DB. // metricsPrefix is used for metrics collection for the given DB.
func NewDB(path string) (db *DB, err error) { func NewDB(path string, o *Options) (db *DB, err error) {
if o == nil {
o = &Options{
OpenFilesLimit: defaultOpenFilesLimit,
BlockCacheCapacity: defaultBlockCacheCapacity,
WriteBufferSize: defaultWriteBufferSize,
DisableSeeksCompaction: defaultDisableSeeksCompaction,
}
}
var ldb *leveldb.DB var ldb *leveldb.DB
if path == "" { if path == "" {
ldb, err = leveldb.Open(storage.NewMemStorage(), nil) ldb, err = leveldb.Open(storage.NewMemStorage(), nil)
} else { } else {
ldb, err = leveldb.OpenFile(path, &opt.Options{ ldb, err = leveldb.OpenFile(path, &opt.Options{
OpenFilesCacheCapacity: openFileLimit, OpenFilesCacheCapacity: int(o.OpenFilesLimit),
BlockCacheCapacity: blockCacheCapacity, BlockCacheCapacity: int(o.BlockCacheCapacity),
DisableSeeksCompaction: true, WriteBuffer: int(o.WriteBufferSize),
WriteBuffer: writeBuffer, DisableSeeksCompaction: o.DisableSeeksCompaction,
}) })
} }
......
...@@ -54,7 +54,7 @@ func TestDB_persistence(t *testing.T) { ...@@ -54,7 +54,7 @@ func TestDB_persistence(t *testing.T) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
db, err := NewDB(dir) db, err := NewDB(dir, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -72,7 +72,7 @@ func TestDB_persistence(t *testing.T) { ...@@ -72,7 +72,7 @@ func TestDB_persistence(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
db2, err := NewDB(dir) db2, err := NewDB(dir, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -94,7 +94,7 @@ func TestDB_persistence(t *testing.T) { ...@@ -94,7 +94,7 @@ func TestDB_persistence(t *testing.T) {
// be called to remove the data. // be called to remove the data.
func newTestDB(t *testing.T) *DB { func newTestDB(t *testing.T) *DB {
t.Helper() t.Helper()
db, err := NewDB("") db, err := NewDB("", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -51,7 +51,7 @@ type Store struct { ...@@ -51,7 +51,7 @@ type Store struct {
// and possible conflicts with schema from existing database is checked // and possible conflicts with schema from existing database is checked
// automatically. // automatically.
func New(path string) (s *Store, err error) { func New(path string) (s *Store, err error) {
db, err := shed.NewDB(path) db, err := shed.NewDB(path, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment