Commit d4321859 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-program: Record the kv format used. (#11900)

Automatically use the correct format if it has been recorded.

Change the default format to directory. Compatibility with op-challenger is preserved because it now uses the automatic format detection, defaulting to file if not specified (e.g for kona-host).
parent 8b61225d
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/vm" "github.com/ethereum-optimism/optimism/op-challenger/game/fault/trace/vm"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types" "github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore" "github.com/ethereum-optimism/optimism/op-program/host/kvstore"
kvtypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/ioutil" "github.com/ethereum-optimism/optimism/op-service/ioutil"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -44,8 +45,8 @@ func NewTraceProvider(logger log.Logger, m vm.Metricer, cfg vm.Config, vmCfg vm. ...@@ -44,8 +45,8 @@ func NewTraceProvider(logger log.Logger, m vm.Metricer, cfg vm.Config, vmCfg vm.
prestate: asteriscPrestate, prestate: asteriscPrestate,
generator: vm.NewExecutor(logger, m, cfg, vmCfg, asteriscPrestate, localInputs), generator: vm.NewExecutor(logger, m, cfg, vmCfg, asteriscPrestate, localInputs),
gameDepth: gameDepth, gameDepth: gameDepth,
preimageLoader: utils.NewPreimageLoader(func() utils.PreimageSource { preimageLoader: utils.NewPreimageLoader(func() (utils.PreimageSource, error) {
return kvstore.NewFileKV(vm.PreimageDir(dir)) return kvstore.NewDiskKV(logger, vm.PreimageDir(dir), kvtypes.DataFormatFile)
}), }),
PrestateProvider: prestateProvider, PrestateProvider: prestateProvider,
stateConverter: NewStateConverter(), stateConverter: NewStateConverter(),
...@@ -169,8 +170,8 @@ func NewTraceProviderForTest(logger log.Logger, m vm.Metricer, cfg *config.Confi ...@@ -169,8 +170,8 @@ func NewTraceProviderForTest(logger log.Logger, m vm.Metricer, cfg *config.Confi
prestate: cfg.AsteriscAbsolutePreState, prestate: cfg.AsteriscAbsolutePreState,
generator: vm.NewExecutor(logger, m, cfg.Asterisc, vm.NewOpProgramServerExecutor(), cfg.AsteriscAbsolutePreState, localInputs), generator: vm.NewExecutor(logger, m, cfg.Asterisc, vm.NewOpProgramServerExecutor(), cfg.AsteriscAbsolutePreState, localInputs),
gameDepth: gameDepth, gameDepth: gameDepth,
preimageLoader: utils.NewPreimageLoader(func() utils.PreimageSource { preimageLoader: utils.NewPreimageLoader(func() (utils.PreimageSource, error) {
return kvstore.NewFileKV(vm.PreimageDir(dir)) return kvstore.NewDiskKV(logger, vm.PreimageDir(dir), kvtypes.DataFormatFile)
}), }),
stateConverter: NewStateConverter(), stateConverter: NewStateConverter(),
cfg: cfg.Asterisc, cfg: cfg.Asterisc,
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
kvtypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -45,8 +46,8 @@ func NewTraceProvider(logger log.Logger, m vm.Metricer, cfg vm.Config, vmCfg vm. ...@@ -45,8 +46,8 @@ func NewTraceProvider(logger log.Logger, m vm.Metricer, cfg vm.Config, vmCfg vm.
prestate: prestate, prestate: prestate,
generator: vm.NewExecutor(logger, m, cfg, vmCfg, prestate, localInputs), generator: vm.NewExecutor(logger, m, cfg, vmCfg, prestate, localInputs),
gameDepth: gameDepth, gameDepth: gameDepth,
preimageLoader: utils.NewPreimageLoader(func() utils.PreimageSource { preimageLoader: utils.NewPreimageLoader(func() (utils.PreimageSource, error) {
return kvstore.NewFileKV(vm.PreimageDir(dir)) return kvstore.NewDiskKV(logger, vm.PreimageDir(dir), kvtypes.DataFormatFile)
}), }),
PrestateProvider: prestateProvider, PrestateProvider: prestateProvider,
stateConverter: &StateConverter{}, stateConverter: &StateConverter{},
...@@ -168,8 +169,8 @@ func NewTraceProviderForTest(logger log.Logger, m vm.Metricer, cfg *config.Confi ...@@ -168,8 +169,8 @@ func NewTraceProviderForTest(logger log.Logger, m vm.Metricer, cfg *config.Confi
prestate: cfg.CannonAbsolutePreState, prestate: cfg.CannonAbsolutePreState,
generator: vm.NewExecutor(logger, m, cfg.Cannon, vm.NewOpProgramServerExecutor(), cfg.CannonAbsolutePreState, localInputs), generator: vm.NewExecutor(logger, m, cfg.Cannon, vm.NewOpProgramServerExecutor(), cfg.CannonAbsolutePreState, localInputs),
gameDepth: gameDepth, gameDepth: gameDepth,
preimageLoader: utils.NewPreimageLoader(func() utils.PreimageSource { preimageLoader: utils.NewPreimageLoader(func() (utils.PreimageSource, error) {
return kvstore.NewFileKV(vm.PreimageDir(dir)) return kvstore.NewDiskKV(logger, vm.PreimageDir(dir), kvtypes.DataFormatFile)
}), }),
stateConverter: NewStateConverter(), stateConverter: NewStateConverter(),
cfg: cfg.Cannon, cfg: cfg.Cannon,
......
...@@ -31,7 +31,7 @@ type PreimageSource interface { ...@@ -31,7 +31,7 @@ type PreimageSource interface {
Close() error Close() error
} }
type PreimageSourceCreator func() PreimageSource type PreimageSourceCreator func() (PreimageSource, error)
type PreimageLoader struct { type PreimageLoader struct {
makeSource PreimageSourceCreator makeSource PreimageSourceCreator
...@@ -61,7 +61,10 @@ func (l *PreimageLoader) loadBlobPreimage(proof *ProofData) (*types.PreimageOrac ...@@ -61,7 +61,10 @@ func (l *PreimageLoader) loadBlobPreimage(proof *ProofData) (*types.PreimageOrac
// The key for a blob field element is a keccak hash of commitment++fieldElementIndex. // The key for a blob field element is a keccak hash of commitment++fieldElementIndex.
// First retrieve the preimage of the key as a keccak hash so we have the commitment and required field element // First retrieve the preimage of the key as a keccak hash so we have the commitment and required field element
inputsKey := preimage.Keccak256Key(proof.OracleKey).PreimageKey() inputsKey := preimage.Keccak256Key(proof.OracleKey).PreimageKey()
source := l.makeSource() source, err := l.makeSource()
if err != nil {
return nil, fmt.Errorf("failed to open preimage store: %w", err)
}
defer source.Close() defer source.Close()
inputs, err := source.Get(inputsKey) inputs, err := source.Get(inputsKey)
if err != nil { if err != nil {
...@@ -111,7 +114,10 @@ func (l *PreimageLoader) loadBlobPreimage(proof *ProofData) (*types.PreimageOrac ...@@ -111,7 +114,10 @@ func (l *PreimageLoader) loadBlobPreimage(proof *ProofData) (*types.PreimageOrac
func (l *PreimageLoader) loadPrecompilePreimage(proof *ProofData) (*types.PreimageOracleData, error) { func (l *PreimageLoader) loadPrecompilePreimage(proof *ProofData) (*types.PreimageOracleData, error) {
inputKey := preimage.Keccak256Key(proof.OracleKey).PreimageKey() inputKey := preimage.Keccak256Key(proof.OracleKey).PreimageKey()
source := l.makeSource() source, err := l.makeSource()
if err != nil {
return nil, fmt.Errorf("failed to open preimage store: %w", err)
}
defer source.Close() defer source.Close()
input, err := source.Get(inputKey) input, err := source.Get(inputKey)
if err != nil { if err != nil {
......
...@@ -21,8 +21,8 @@ import ( ...@@ -21,8 +21,8 @@ import (
func TestPreimageLoader_NoPreimage(t *testing.T) { func TestPreimageLoader_NoPreimage(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
actual, err := loader.LoadPreimage(&ProofData{}) actual, err := loader.LoadPreimage(&ProofData{})
require.NoError(t, err) require.NoError(t, err)
...@@ -31,8 +31,8 @@ func TestPreimageLoader_NoPreimage(t *testing.T) { ...@@ -31,8 +31,8 @@ func TestPreimageLoader_NoPreimage(t *testing.T) {
func TestPreimageLoader_LocalPreimage(t *testing.T) { func TestPreimageLoader_LocalPreimage(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
proof := &ProofData{ proof := &ProofData{
OracleKey: common.Hash{byte(preimage.LocalKeyType), 0xaa, 0xbb}.Bytes(), OracleKey: common.Hash{byte(preimage.LocalKeyType), 0xaa, 0xbb}.Bytes(),
...@@ -55,8 +55,8 @@ func TestPreimageLoader_SimpleTypes(t *testing.T) { ...@@ -55,8 +55,8 @@ func TestPreimageLoader_SimpleTypes(t *testing.T) {
keyType := keyType keyType := keyType
t.Run(fmt.Sprintf("type-%v", keyType), func(t *testing.T) { t.Run(fmt.Sprintf("type-%v", keyType), func(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
proof := &ProofData{ proof := &ProofData{
OracleKey: common.Hash{byte(keyType), 0xaa, 0xbb}.Bytes(), OracleKey: common.Hash{byte(keyType), 0xaa, 0xbb}.Bytes(),
...@@ -99,8 +99,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) { ...@@ -99,8 +99,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) {
t.Run("NoKeyPreimage", func(t *testing.T) { t.Run("NoKeyPreimage", func(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
proof := &ProofData{ proof := &ProofData{
OracleKey: common.Hash{byte(preimage.BlobKeyType), 0xaf}.Bytes(), OracleKey: common.Hash{byte(preimage.BlobKeyType), 0xaf}.Bytes(),
...@@ -113,8 +113,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) { ...@@ -113,8 +113,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) {
t.Run("InvalidKeyPreimage", func(t *testing.T) { t.Run("InvalidKeyPreimage", func(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
proof := &ProofData{ proof := &ProofData{
OracleKey: common.Hash{byte(preimage.BlobKeyType), 0xad}.Bytes(), OracleKey: common.Hash{byte(preimage.BlobKeyType), 0xad}.Bytes(),
...@@ -128,8 +128,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) { ...@@ -128,8 +128,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) {
t.Run("MissingBlobs", func(t *testing.T) { t.Run("MissingBlobs", func(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
proof := &ProofData{ proof := &ProofData{
OracleKey: common.Hash{byte(preimage.BlobKeyType), 0xae}.Bytes(), OracleKey: common.Hash{byte(preimage.BlobKeyType), 0xae}.Bytes(),
...@@ -143,8 +143,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) { ...@@ -143,8 +143,8 @@ func TestPreimageLoader_BlobPreimage(t *testing.T) {
t.Run("Valid", func(t *testing.T) { t.Run("Valid", func(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
storeBlob(t, kv, gokzg4844.KZGCommitment(commitment), gokzg4844.Blob(blob)) storeBlob(t, kv, gokzg4844.KZGCommitment(commitment), gokzg4844.Blob(blob))
actual, err := loader.LoadPreimage(proof) actual, err := loader.LoadPreimage(proof)
...@@ -178,16 +178,16 @@ func TestPreimageLoader_PrecompilePreimage(t *testing.T) { ...@@ -178,16 +178,16 @@ func TestPreimageLoader_PrecompilePreimage(t *testing.T) {
t.Run("NoInputPreimage", func(t *testing.T) { t.Run("NoInputPreimage", func(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
_, err := loader.LoadPreimage(proof) _, err := loader.LoadPreimage(proof)
require.ErrorIs(t, err, kvstore.ErrNotFound) require.ErrorIs(t, err, kvstore.ErrNotFound)
}) })
t.Run("Valid", func(t *testing.T) { t.Run("Valid", func(t *testing.T) {
kv := kvstore.NewMemKV() kv := kvstore.NewMemKV()
loader := NewPreimageLoader(func() PreimageSource { loader := NewPreimageLoader(func() (PreimageSource, error) {
return kv return kv, nil
}) })
require.NoError(t, kv.Put(preimage.Keccak256Key(proof.OracleKey).PreimageKey(), input)) require.NoError(t, kv.Put(preimage.Keccak256Key(proof.OracleKey).PreimageKey(), input))
actual, err := loader.LoadPreimage(proof) actual, err := loader.LoadPreimage(proof)
......
...@@ -138,7 +138,7 @@ func NewConfig( ...@@ -138,7 +138,7 @@ func NewConfig(
L2ClaimBlockNumber: l2ClaimBlockNum, L2ClaimBlockNumber: l2ClaimBlockNum,
L1RPCKind: sources.RPCKindStandard, L1RPCKind: sources.RPCKindStandard,
IsCustomChainConfig: isCustomConfig, IsCustomChainConfig: isCustomConfig,
DataFormat: types.DataFormatFile, DataFormat: types.DataFormatDirectory,
} }
} }
......
...@@ -40,7 +40,7 @@ var ( ...@@ -40,7 +40,7 @@ var (
Name: "data.format", Name: "data.format",
Usage: fmt.Sprintf("Format to use for preimage data storage. Available formats: %s", openum.EnumString(types.SupportedDataFormats)), Usage: fmt.Sprintf("Format to use for preimage data storage. Available formats: %s", openum.EnumString(types.SupportedDataFormats)),
EnvVars: prefixEnvVars("DATA_FORMAT"), EnvVars: prefixEnvVars("DATA_FORMAT"),
Value: string(types.DataFormatFile), Value: string(types.DataFormatDirectory),
} }
L2NodeAddr = &cli.StringFlag{ L2NodeAddr = &cli.StringFlag{
Name: "l2", Name: "l2",
......
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
"github.com/ethereum-optimism/optimism/op-program/host/flags" "github.com/ethereum-optimism/optimism/op-program/host/flags"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore" "github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher" "github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
"github.com/ethereum-optimism/optimism/op-program/host/types"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt" "github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
...@@ -175,20 +174,14 @@ func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config, ...@@ -175,20 +174,14 @@ func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config,
logger.Info("Using in-memory storage") logger.Info("Using in-memory storage")
kv = kvstore.NewMemKV() kv = kvstore.NewMemKV()
} else { } else {
logger.Info("Creating disk storage", "datadir", cfg.DataDir, "format", cfg.DataFormat)
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil { if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
return fmt.Errorf("creating datadir: %w", err) return fmt.Errorf("creating datadir: %w", err)
} }
switch cfg.DataFormat { store, err := kvstore.NewDiskKV(logger, cfg.DataDir, cfg.DataFormat)
case types.DataFormatFile: if err != nil {
kv = kvstore.NewFileKV(cfg.DataDir) return fmt.Errorf("creating kvstore: %w", err)
case types.DataFormatDirectory:
kv = kvstore.NewDirectoryKV(cfg.DataDir)
case types.DataFormatPebble:
kv = kvstore.NewPebbleKV(cfg.DataDir)
default:
return fmt.Errorf("invalid data format: %s", cfg.DataFormat)
} }
kv = store
} }
var ( var (
......
...@@ -12,30 +12,30 @@ import ( ...@@ -12,30 +12,30 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
// DirectoryKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content. // directoryKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content.
// DirectoryKV is safe for concurrent use with a single DirectoryKV instance. // directoryKV is safe for concurrent use with a single directoryKV instance.
// DirectoryKV is safe for concurrent use between different DirectoryKV instances of the same disk directory as long as the // directoryKV is safe for concurrent use between different directoryKV instances of the same disk directory as long as the
// file system supports atomic renames. // file system supports atomic renames.
type DirectoryKV struct { type directoryKV struct {
sync.RWMutex sync.RWMutex
path string path string
} }
// NewDirectoryKV creates a DirectoryKV that puts/gets pre-images as files in the given directory path. // newDirectoryKV creates a directoryKV that puts/gets pre-images as files in the given directory path.
// The path must exist, or subsequent Put/Get calls will error when it does not. // The path must exist, or subsequent Put/Get calls will error when it does not.
func NewDirectoryKV(path string) *DirectoryKV { func newDirectoryKV(path string) *directoryKV {
return &DirectoryKV{path: path} return &directoryKV{path: path}
} }
// pathKey returns the file path for the given key. // pathKey returns the file path for the given key.
// This is composed of the first characters of the non-0x-prefixed hex key as a directory, and the rest as the file name. // This is composed of the first characters of the non-0x-prefixed hex key as a directory, and the rest as the file name.
func (d *DirectoryKV) pathKey(k common.Hash) string { func (d *directoryKV) pathKey(k common.Hash) string {
key := k.String() key := k.String()
dir, name := key[2:6], key[6:] dir, name := key[2:6], key[6:]
return path.Join(d.path, dir, name+".txt") return path.Join(d.path, dir, name+".txt")
} }
func (d *DirectoryKV) Put(k common.Hash, v []byte) error { func (d *directoryKV) Put(k common.Hash, v []byte) error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
f, err := openTempFile(d.path, k.String()+".txt.*") f, err := openTempFile(d.path, k.String()+".txt.*")
...@@ -61,7 +61,7 @@ func (d *DirectoryKV) Put(k common.Hash, v []byte) error { ...@@ -61,7 +61,7 @@ func (d *DirectoryKV) Put(k common.Hash, v []byte) error {
return nil return nil
} }
func (d *DirectoryKV) Get(k common.Hash) ([]byte, error) { func (d *directoryKV) Get(k common.Hash) ([]byte, error) {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
f, err := os.OpenFile(d.pathKey(k), os.O_RDONLY, filePermission) f, err := os.OpenFile(d.pathKey(k), os.O_RDONLY, filePermission)
...@@ -79,8 +79,8 @@ func (d *DirectoryKV) Get(k common.Hash) ([]byte, error) { ...@@ -79,8 +79,8 @@ func (d *DirectoryKV) Get(k common.Hash) ([]byte, error) {
return hex.DecodeString(string(dat)) return hex.DecodeString(string(dat))
} }
func (d *DirectoryKV) Close() error { func (d *directoryKV) Close() error {
return nil return nil
} }
var _ KV = (*DirectoryKV)(nil) var _ KV = (*directoryKV)(nil)
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
func TestDirectoryKV(t *testing.T) { func TestDirectoryKV(t *testing.T) {
tmp := t.TempDir() // automatically removed by testing cleanup tmp := t.TempDir() // automatically removed by testing cleanup
kv := NewDirectoryKV(tmp) kv := newDirectoryKV(tmp)
t.Cleanup(func() { // Can't use defer because kvTest runs tests in parallel. t.Cleanup(func() { // Can't use defer because kvTest runs tests in parallel.
require.NoError(t, kv.Close()) require.NoError(t, kv.Close())
}) })
...@@ -20,7 +20,7 @@ func TestDirectoryKV(t *testing.T) { ...@@ -20,7 +20,7 @@ func TestDirectoryKV(t *testing.T) {
func TestDirectoryKV_CreateMissingDirectory(t *testing.T) { func TestDirectoryKV_CreateMissingDirectory(t *testing.T) {
tmp := t.TempDir() tmp := t.TempDir()
dir := filepath.Join(tmp, "data") dir := filepath.Join(tmp, "data")
kv := NewDirectoryKV(dir) kv := newDirectoryKV(dir)
defer kv.Close() defer kv.Close()
val := []byte{1, 2, 3, 4} val := []byte{1, 2, 3, 4}
key := crypto.Keccak256Hash(val) key := crypto.Keccak256Hash(val)
......
...@@ -15,26 +15,26 @@ import ( ...@@ -15,26 +15,26 @@ import (
// read/write mode for user/group/other, not executable. // read/write mode for user/group/other, not executable.
const filePermission = 0666 const filePermission = 0666
// FileKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content. // fileKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content.
// FileKV is safe for concurrent use with a single FileKV instance. // fileKV is safe for concurrent use with a single fileKV instance.
// FileKV is safe for concurrent use between different FileKV instances of the same disk directory as long as the // fileKV is safe for concurrent use between different fileKV instances of the same disk directory as long as the
// file system supports atomic renames. // file system supports atomic renames.
type FileKV struct { type fileKV struct {
sync.RWMutex sync.RWMutex
path string path string
} }
// NewFileKV creates a FileKV that puts/gets pre-images as files in the given directory path. // newFileKV creates a fileKV that puts/gets pre-images as files in the given directory path.
// The path must exist, or subsequent Put/Get calls will error when it does not. // The path must exist, or subsequent Put/Get calls will error when it does not.
func NewFileKV(path string) *FileKV { func newFileKV(path string) *fileKV {
return &FileKV{path: path} return &fileKV{path: path}
} }
func (d *FileKV) pathKey(k common.Hash) string { func (d *fileKV) pathKey(k common.Hash) string {
return path.Join(d.path, k.String()+".txt") return path.Join(d.path, k.String()+".txt")
} }
func (d *FileKV) Put(k common.Hash, v []byte) error { func (d *fileKV) Put(k common.Hash, v []byte) error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
f, err := openTempFile(d.path, k.String()+".txt.*") f, err := openTempFile(d.path, k.String()+".txt.*")
...@@ -72,7 +72,7 @@ func openTempFile(dir string, nameTemplate string) (*os.File, error) { ...@@ -72,7 +72,7 @@ func openTempFile(dir string, nameTemplate string) (*os.File, error) {
return f, nil return f, nil
} }
func (d *FileKV) Get(k common.Hash) ([]byte, error) { func (d *fileKV) Get(k common.Hash) ([]byte, error) {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
f, err := os.OpenFile(d.pathKey(k), os.O_RDONLY, filePermission) f, err := os.OpenFile(d.pathKey(k), os.O_RDONLY, filePermission)
...@@ -90,8 +90,8 @@ func (d *FileKV) Get(k common.Hash) ([]byte, error) { ...@@ -90,8 +90,8 @@ func (d *FileKV) Get(k common.Hash) ([]byte, error) {
return hex.DecodeString(string(dat)) return hex.DecodeString(string(dat))
} }
func (d *FileKV) Close() error { func (d *fileKV) Close() error {
return nil return nil
} }
var _ KV = (*FileKV)(nil) var _ KV = (*fileKV)(nil)
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
func TestFileKV(t *testing.T) { func TestFileKV(t *testing.T) {
tmp := t.TempDir() // automatically removed by testing cleanup tmp := t.TempDir() // automatically removed by testing cleanup
kv := NewFileKV(tmp) kv := newFileKV(tmp)
t.Cleanup(func() { // Can't use defer because kvTest runs tests in parallel. t.Cleanup(func() { // Can't use defer because kvTest runs tests in parallel.
require.NoError(t, kv.Close()) require.NoError(t, kv.Close())
}) })
...@@ -20,7 +20,7 @@ func TestFileKV(t *testing.T) { ...@@ -20,7 +20,7 @@ func TestFileKV(t *testing.T) {
func TestFileKV_CreateMissingDirectory(t *testing.T) { func TestFileKV_CreateMissingDirectory(t *testing.T) {
tmp := t.TempDir() tmp := t.TempDir()
dir := filepath.Join(tmp, "data") dir := filepath.Join(tmp, "data")
kv := NewFileKV(dir) kv := newFileKV(dir)
defer kv.Close() defer kv.Close()
val := []byte{1, 2, 3, 4} val := []byte{1, 2, 3, 4}
key := crypto.Keccak256Hash(val) key := crypto.Keccak256Hash(val)
......
package kvstore
import (
"errors"
"fmt"
"os"
"path/filepath"
"slices"
"github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum/go-ethereum/log"
)
const formatFilename = "kvformat"
var (
ErrFormatUnavailable = errors.New("format unavailable")
ErrUnsupportedFormat = errors.New("unsupported format")
)
func recordKVFormat(dir string, format types.DataFormat) error {
return os.WriteFile(filepath.Join(dir, formatFilename), []byte(format), 0o644)
}
func readKVFormat(dir string) (types.DataFormat, error) {
data, err := os.ReadFile(filepath.Join(dir, formatFilename))
if errors.Is(err, os.ErrNotExist) {
return "", ErrFormatUnavailable
} else if err != nil {
return "", fmt.Errorf("failed to read kv format: %w", err)
}
format := types.DataFormat(data)
if !slices.Contains(types.SupportedDataFormats, format) {
return "", fmt.Errorf("%w: %s", ErrUnsupportedFormat, format)
}
return format, nil
}
// NewDiskKV creates a new KV implementation. If the specified directly contains an existing KV store
// that has the format recorded, the recorded format is used ensuring compatibility with the existing data.
// If the directory does not contain existing data or doesn't have the format recorded, defaultFormat is used
// which may result in the existing data being unused.
// If the existing data records a format that is not supported, an error is returned.
// The format is automatically recorded if it wasn't previously stored.
func NewDiskKV(logger log.Logger, dir string, defaultFormat types.DataFormat) (KV, error) {
format, err := readKVFormat(dir)
if errors.Is(err, ErrFormatUnavailable) {
format = defaultFormat
logger.Info("Creating disk storage", "datadir", dir, "format", format)
if err := recordKVFormat(dir, format); err != nil {
return nil, fmt.Errorf("failed to record new kv store format: %w", err)
}
} else if err != nil {
return nil, err
} else {
logger.Info("Using existing disk storage", "datadir", dir, "format", format)
}
switch format {
case types.DataFormatFile:
return newFileKV(dir), nil
case types.DataFormatDirectory:
return newDirectoryKV(dir), nil
case types.DataFormatPebble:
return newPebbleKV(dir), nil
default:
return nil, fmt.Errorf("invalid data format: %s", format)
}
}
package kvstore
import (
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestRecordAndReadKVFormat(t *testing.T) {
for _, format := range types.SupportedDataFormats {
format := format
t.Run(string(format), func(t *testing.T) {
dir := t.TempDir()
require.NoError(t, recordKVFormat(dir, format))
actual, err := readKVFormat(dir)
require.NoError(t, err)
require.Equal(t, format, actual)
})
}
t.Run("Unsupported", func(t *testing.T) {
dir := t.TempDir()
require.NoError(t, recordKVFormat(dir, "nope"))
_, err := readKVFormat(dir)
require.ErrorIs(t, err, ErrUnsupportedFormat)
})
t.Run("NotRecorded", func(t *testing.T) {
dir := t.TempDir()
_, err := readKVFormat(dir)
require.ErrorIs(t, err, ErrFormatUnavailable)
})
}
func TestNewDiskKV(t *testing.T) {
for _, existingFormat := range types.SupportedDataFormats {
existingFormat := existingFormat
for _, specifiedFormat := range types.SupportedDataFormats {
specifiedFormat := specifiedFormat
t.Run(fmt.Sprintf("%v->%v", existingFormat, specifiedFormat), func(t *testing.T) {
dir := t.TempDir()
logger := testlog.Logger(t, log.LevelError)
hash := common.Hash{0xaa}
value := []byte{1, 2, 3, 4, 5, 6}
kv1, err := NewDiskKV(logger, dir, existingFormat)
require.NoError(t, err)
require.NoError(t, kv1.Put(hash, value))
require.NoError(t, kv1.Close())
// Should use existing format
kv2, err := NewDiskKV(logger, dir, specifiedFormat)
require.NoError(t, err)
actual, err := kv2.Get(hash)
require.NoError(t, err)
require.Equal(t, value, actual)
require.NoError(t, kv2.Close())
})
}
}
}
...@@ -10,16 +10,16 @@ import ( ...@@ -10,16 +10,16 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
// PebbleKV is a disk-backed key-value store, with PebbleDB as the underlying DBMS. // pebbleKV is a disk-backed key-value store, with PebbleDB as the underlying DBMS.
// PebbleKV is safe for concurrent use with a single PebbleKV instance. // pebbleKV is safe for concurrent use with a single pebbleKV instance.
type PebbleKV struct { type pebbleKV struct {
sync.RWMutex sync.RWMutex
db *pebble.DB db *pebble.DB
} }
// NewPebbleKV creates a PebbleKV that puts/gets pre-images as files in the given directory path. // newPebbleKV creates a pebbleKV that puts/gets pre-images as files in the given directory path.
// The path must exist, or subsequent Put/Get calls will error when it does not. // The path must exist, or subsequent Put/Get calls will error when it does not.
func NewPebbleKV(path string) *PebbleKV { func newPebbleKV(path string) *pebbleKV {
opts := &pebble.Options{ opts := &pebble.Options{
Cache: pebble.NewCache(int64(32 * 1024 * 1024)), Cache: pebble.NewCache(int64(32 * 1024 * 1024)),
MaxConcurrentCompactions: runtime.NumCPU, MaxConcurrentCompactions: runtime.NumCPU,
...@@ -32,16 +32,16 @@ func NewPebbleKV(path string) *PebbleKV { ...@@ -32,16 +32,16 @@ func NewPebbleKV(path string) *PebbleKV {
panic(fmt.Errorf("failed to open pebbledb at %s: %w", path, err)) panic(fmt.Errorf("failed to open pebbledb at %s: %w", path, err))
} }
return &PebbleKV{db: db} return &pebbleKV{db: db}
} }
func (d *PebbleKV) Put(k common.Hash, v []byte) error { func (d *pebbleKV) Put(k common.Hash, v []byte) error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
return d.db.Set(k.Bytes(), v, pebble.NoSync) return d.db.Set(k.Bytes(), v, pebble.NoSync)
} }
func (d *PebbleKV) Get(k common.Hash) ([]byte, error) { func (d *pebbleKV) Get(k common.Hash) ([]byte, error) {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
...@@ -58,11 +58,11 @@ func (d *PebbleKV) Get(k common.Hash) ([]byte, error) { ...@@ -58,11 +58,11 @@ func (d *PebbleKV) Get(k common.Hash) ([]byte, error) {
return ret, nil return ret, nil
} }
func (d *PebbleKV) Close() error { func (d *pebbleKV) Close() error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
return d.db.Close() return d.db.Close()
} }
var _ KV = (*PebbleKV)(nil) var _ KV = (*pebbleKV)(nil)
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
func TestPebbleKV(t *testing.T) { func TestPebbleKV(t *testing.T) {
tmp := t.TempDir() // automatically removed by testing cleanup tmp := t.TempDir() // automatically removed by testing cleanup
kv := NewPebbleKV(tmp) kv := newPebbleKV(tmp)
t.Cleanup(func() { // Can't use defer because kvTest runs tests in parallel. t.Cleanup(func() { // Can't use defer because kvTest runs tests in parallel.
require.NoError(t, kv.Close()) require.NoError(t, kv.Close())
}) })
...@@ -20,7 +20,7 @@ func TestPebbleKV(t *testing.T) { ...@@ -20,7 +20,7 @@ func TestPebbleKV(t *testing.T) {
func TestPebbleKV_CreateMissingDirectory(t *testing.T) { func TestPebbleKV_CreateMissingDirectory(t *testing.T) {
tmp := t.TempDir() tmp := t.TempDir()
dir := filepath.Join(tmp, "data") dir := filepath.Join(tmp, "data")
kv := NewPebbleKV(dir) kv := newPebbleKV(dir)
defer kv.Close() defer kv.Close()
val := []byte{1, 2, 3, 4} val := []byte{1, 2, 3, 4}
key := crypto.Keccak256Hash(val) key := crypto.Keccak256Hash(val)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment