Commit 8ab4d3d8 authored by clabby's avatar clabby Committed by GitHub

feat(op-program): Use `PebbleDB` for `DiskKV` (#11705)

* feat(op-program): Use `PebbleDB` for `DiskKV`

* close db

* fix `testFaultProofProgramScenario` tests

* switch to snappy compression

https://github.com/cockroachdb/pebble/issues/3434

* fix tempdir

* update compat release

* defer k/v until preimage server and hinter have both exited
parent c198a893
...@@ -86,6 +86,7 @@ func applySpanBatchActivation(active bool, dp *genesis.DeployConfig) { ...@@ -86,6 +86,7 @@ func applySpanBatchActivation(active bool, dp *genesis.DeployConfig) {
// - update the state root via a tx // - update the state root via a tx
// - run program // - run program
func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActivated bool) { func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActivated bool) {
t.Helper()
InitParallel(t) InitParallel(t)
ctx := context.Background() ctx := context.Background()
...@@ -186,6 +187,7 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi ...@@ -186,6 +187,7 @@ func testVerifyL2OutputRootEmptyBlock(t *testing.T, detached bool, spanBatchActi
} }
func testVerifyL2OutputRoot(t *testing.T, detached bool, spanBatchActivated bool) { func testVerifyL2OutputRoot(t *testing.T, detached bool, spanBatchActivated bool) {
t.Helper()
InitParallel(t) InitParallel(t)
ctx := context.Background() ctx := context.Background()
...@@ -278,6 +280,7 @@ type FaultProofProgramTestScenario struct { ...@@ -278,6 +280,7 @@ type FaultProofProgramTestScenario struct {
// testFaultProofProgramScenario runs the fault proof program in several contexts, given a test scenario. // testFaultProofProgramScenario runs the fault proof program in several contexts, given a test scenario.
func testFaultProofProgramScenario(t *testing.T, ctx context.Context, sys *System, s *FaultProofProgramTestScenario) { func testFaultProofProgramScenario(t *testing.T, ctx context.Context, sys *System, s *FaultProofProgramTestScenario) {
preimageDir := t.TempDir() preimageDir := t.TempDir()
fppConfig := oppconf.NewConfig(sys.RollupConfig, sys.L2GenesisCfg.Config, s.L1Head, s.L2Head, s.L2OutputRoot, common.Hash(s.L2Claim), s.L2ClaimBlockNumber) fppConfig := oppconf.NewConfig(sys.RollupConfig, sys.L2GenesisCfg.Config, s.L1Head, s.L2Head, s.L2OutputRoot, common.Hash(s.L2Claim), s.L2ClaimBlockNumber)
fppConfig.L1URL = sys.NodeEndpoint("l1").RPC() fppConfig.L1URL = sys.NodeEndpoint("l1").RPC()
fppConfig.L2URL = sys.NodeEndpoint("sequencer").RPC() fppConfig.L2URL = sys.NodeEndpoint("sequencer").RPC()
......
...@@ -122,6 +122,10 @@ func FaultProofProgram(ctx context.Context, logger log.Logger, cfg *config.Confi ...@@ -122,6 +122,10 @@ func FaultProofProgram(ctx context.Context, logger log.Logger, cfg *config.Confi
func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config, preimageChannel preimage.FileChannel, hintChannel preimage.FileChannel) error { func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config, preimageChannel preimage.FileChannel, hintChannel preimage.FileChannel) error {
var serverDone chan error var serverDone chan error
var hinterDone chan error var hinterDone chan error
logger.Info("Starting preimage server")
var kv kvstore.KV
// Close the preimage/hint channels, and then kv store once the server and hinter have exited.
defer func() { defer func() {
preimageChannel.Close() preimageChannel.Close()
hintChannel.Close() hintChannel.Close()
...@@ -133,9 +137,12 @@ func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config, ...@@ -133,9 +137,12 @@ func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config,
// Wait for hinter to complete // Wait for hinter to complete
<-hinterDone <-hinterDone
} }
if kv != nil {
kv.Close()
}
}() }()
logger.Info("Starting preimage server")
var kv kvstore.KV
if cfg.DataDir == "" { if cfg.DataDir == "" {
logger.Info("Using in-memory storage") logger.Info("Using in-memory storage")
kv = kvstore.NewMemKV() kv = kvstore.NewMemKV()
......
package kvstore package kvstore
import ( import (
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"io" "runtime"
"os"
"path"
"sync" "sync"
"github.com/cockroachdb/pebble"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
// read/write mode for user/group/other, not executable. // DiskKV is a disk-backed key-value store, with PebbleDB as the underlying DBMS.
const diskPermission = 0666
// DiskKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content.
// DiskKV is safe for concurrent use with a single DiskKV instance. // DiskKV is safe for concurrent use with a single DiskKV instance.
// DiskKV is safe for concurrent use between different DiskKV instances of the same disk directory as long as the
// file system supports atomic renames.
type DiskKV struct { type DiskKV struct {
sync.RWMutex sync.RWMutex
path string db *pebble.DB
} }
// NewDiskKV creates a DiskKV that puts/gets pre-images as files in the given directory path. // NewDiskKV creates a DiskKV that puts/gets pre-images as files in the given directory path.
// The path must exist, or subsequent Put/Get calls will error when it does not. // The path must exist, or subsequent Put/Get calls will error when it does not.
func NewDiskKV(path string) *DiskKV { func NewDiskKV(path string) *DiskKV {
return &DiskKV{path: path} opts := &pebble.Options{
} Cache: pebble.NewCache(int64(32 * 1024 * 1024)),
MaxConcurrentCompactions: runtime.NumCPU,
Levels: []pebble.LevelOptions{
{Compression: pebble.SnappyCompression},
},
}
db, err := pebble.Open(path, opts)
if err != nil {
panic(fmt.Errorf("failed to open pebbledb at %s: %w", path, err))
}
func (d *DiskKV) pathKey(k common.Hash) string { return &DiskKV{db: db}
return path.Join(d.path, k.String()+".txt")
} }
func (d *DiskKV) Put(k common.Hash, v []byte) error { func (d *DiskKV) Put(k common.Hash, v []byte) error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
f, err := openTempFile(d.path, k.String()+".txt.*") return d.db.Set(k.Bytes(), v, pebble.NoSync)
if err != nil {
return fmt.Errorf("failed to open temp file for pre-image %s: %w", k, err)
}
defer os.Remove(f.Name()) // Clean up the temp file if it doesn't actually get moved into place
if _, err := f.Write([]byte(hex.EncodeToString(v))); err != nil {
_ = f.Close()
return fmt.Errorf("failed to write pre-image %s to disk: %w", k, err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("failed to close temp pre-image %s file: %w", k, err)
}
targetFile := d.pathKey(k)
if err := os.Rename(f.Name(), targetFile); err != nil {
return fmt.Errorf("failed to move temp dir %v to final destination %v: %w", f.Name(), targetFile, err)
}
return nil
}
func openTempFile(dir string, nameTemplate string) (*os.File, error) {
f, err := os.CreateTemp(dir, nameTemplate)
// Directory has been deleted out from underneath us. Recreate it.
if errors.Is(err, os.ErrNotExist) {
if mkdirErr := os.MkdirAll(dir, 0777); mkdirErr != nil {
return nil, errors.Join(fmt.Errorf("failed to create directory %v: %w", dir, mkdirErr), err)
}
f, err = os.CreateTemp(dir, nameTemplate)
}
if err != nil {
return nil, err
}
return f, nil
} }
func (d *DiskKV) Get(k common.Hash) ([]byte, error) { func (d *DiskKV) Get(k common.Hash) ([]byte, error) {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
f, err := os.OpenFile(d.pathKey(k), os.O_RDONLY, diskPermission)
dat, closer, err := d.db.Get(k.Bytes())
if err != nil { if err != nil {
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound return nil, ErrNotFound
} }
return nil, fmt.Errorf("failed to open pre-image file %s: %w", k, err) return nil, err
}
defer f.Close() // fine to ignore closing error here
dat, err := io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("failed to read pre-image from file %s: %w", k, err)
} }
return hex.DecodeString(string(dat)) ret := make([]byte, len(dat))
copy(ret, dat)
closer.Close()
return ret, nil
}
func (d *DiskKV) Close() error {
d.Lock()
defer d.Unlock()
return d.db.Close()
} }
var _ KV = (*DiskKV)(nil) var _ KV = (*DiskKV)(nil)
...@@ -19,4 +19,7 @@ type KV interface { ...@@ -19,4 +19,7 @@ type KV interface {
// It returns ErrNotFound when the pre-image cannot be found. // It returns ErrNotFound when the pre-image cannot be found.
// KV store implementations may return additional errors specific to the KV storage. // KV store implementations may return additional errors specific to the KV storage.
Get(k common.Hash) ([]byte, error) Get(k common.Hash) ([]byte, error)
// Closes the KV store.
Close() error
} }
...@@ -37,3 +37,7 @@ func (m *MemKV) Get(k common.Hash) ([]byte, error) { ...@@ -37,3 +37,7 @@ func (m *MemKV) Get(k common.Hash) ([]byte, error) {
} }
return slices.Clone(v), nil return slices.Clone(v), nil
} }
func (m *MemKV) Close() error {
return nil
}
...@@ -5,7 +5,7 @@ SCRIPTS_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) ...@@ -5,7 +5,7 @@ SCRIPTS_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
COMPAT_DIR="${SCRIPTS_DIR}/../temp/compat" COMPAT_DIR="${SCRIPTS_DIR}/../temp/compat"
TESTNAME="${1?Must specify compat file to run}" TESTNAME="${1?Must specify compat file to run}"
BASEURL="${2:-https://github.com/ethereum-optimism/chain-test-data/releases/download/2024-08-02}" BASEURL="${2:-https://github.com/ethereum-optimism/chain-test-data/releases/download/2024-09-01}"
URL="${BASEURL}/${TESTNAME}.tar.bz" URL="${BASEURL}/${TESTNAME}.tar.bz"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment