Commit de92c9f8 authored by Adrian Sutton's avatar Adrian Sutton

op-program: Use atomic write pattern for KV store

Ensures that pre-image files are always completely written before being moved into place.
parent 28be4fa2
...@@ -17,8 +17,8 @@ const diskPermission = 0666 ...@@ -17,8 +17,8 @@ const diskPermission = 0666
// DiskKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content. // DiskKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content.
// DiskKV is safe for concurrent use with a single DiskKV instance. // DiskKV is safe for concurrent use with a single DiskKV instance.
// DiskKV is not safe for concurrent use between different DiskKV instances of the same disk directory: // DiskKV is safe for concurrent use between different DiskKV instances of the same disk directory as long as the
// a Put needs to be completed before another DiskKV Get retrieves the values. // file system supports atomic renames.
type DiskKV struct { type DiskKV struct {
sync.RWMutex sync.RWMutex
path string path string
...@@ -37,19 +37,22 @@ func (d *DiskKV) pathKey(k common.Hash) string { ...@@ -37,19 +37,22 @@ func (d *DiskKV) pathKey(k common.Hash) string {
func (d *DiskKV) Put(k common.Hash, v []byte) error { func (d *DiskKV) Put(k common.Hash, v []byte) error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
f, err := os.OpenFile(d.pathKey(k), os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, diskPermission) f, err := os.CreateTemp(d.path, k.String()+".txt.*")
if err != nil { if err != nil {
if errors.Is(err, os.ErrExist) { return fmt.Errorf("failed to open temp file for pre-image %s: %w", k, err)
return ErrAlreadyExists
}
return fmt.Errorf("failed to open new pre-image file %s: %w", k, err)
} }
defer os.Remove(f.Name()) // Clean up the temp file if it doesn't actually get moved into place
if _, err := f.Write([]byte(hex.EncodeToString(v))); err != nil { if _, err := f.Write([]byte(hex.EncodeToString(v))); err != nil {
_ = f.Close() _ = f.Close()
return fmt.Errorf("failed to write pre-image %s to disk: %w", k, err) return fmt.Errorf("failed to write pre-image %s to disk: %w", k, err)
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
return fmt.Errorf("failed to close pre-image %s file: %w", k, err) return fmt.Errorf("failed to close temp pre-image %s file: %w", k, err)
}
targetFile := d.pathKey(k)
if err := os.Rename(f.Name(), targetFile); err != nil {
return fmt.Errorf("failed to move temp dir %v to final destination %v: %w", f.Name(), targetFile, err)
} }
return nil return nil
} }
......
...@@ -9,13 +9,9 @@ import ( ...@@ -9,13 +9,9 @@ import (
// ErrNotFound is returned when a pre-image cannot be found in the KV store. // ErrNotFound is returned when a pre-image cannot be found in the KV store.
var ErrNotFound = errors.New("not found") var ErrNotFound = errors.New("not found")
// ErrAlreadyExists is returned when a pre-image already exists in the KV store.
var ErrAlreadyExists = errors.New("already exists")
// KV is a Key-Value store interface for pre-image data. // KV is a Key-Value store interface for pre-image data.
type KV interface { type KV interface {
// Put puts the pre-image value v in the key-value store with key k. // Put puts the pre-image value v in the key-value store with key k.
// It returns ErrAlreadyExists when the key already exists.
// KV store implementations may return additional errors specific to the KV storage. // KV store implementations may return additional errors specific to the KV storage.
Put(k common.Hash, v []byte) error Put(k common.Hash, v []byte) error
......
...@@ -45,9 +45,9 @@ func kvTest(t *testing.T, kv KV) { ...@@ -45,9 +45,9 @@ func kvTest(t *testing.T, kv KV) {
require.Equal(t, []byte{4, 2}, dat, "pre-image must match") require.Equal(t, []byte{4, 2}, dat, "pre-image must match")
}) })
t.Run("not overwriting pre-image", func(t *testing.T) { t.Run("allowing multiple writes for same pre-image", func(t *testing.T) {
t.Parallel() t.Parallel()
require.NoError(t, kv.Put(common.Hash{0xdd}, []byte{4, 2})) require.NoError(t, kv.Put(common.Hash{0xdd}, []byte{4, 2}))
require.ErrorIs(t, kv.Put(common.Hash{0xdd}, []byte{4, 2}), ErrAlreadyExists) require.NoError(t, kv.Put(common.Hash{0xdd}, []byte{4, 2}))
}) })
} }
...@@ -23,9 +23,6 @@ func NewMemKV() *MemKV { ...@@ -23,9 +23,6 @@ func NewMemKV() *MemKV {
func (m *MemKV) Put(k common.Hash, v []byte) error { func (m *MemKV) Put(k common.Hash, v []byte) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if _, ok := m.m[k]; ok {
return ErrAlreadyExists
}
m.m[k] = v m.m[k] = v
return nil return nil
} }
......
...@@ -155,10 +155,7 @@ func (p *Prefetcher) storeTrieNodes(values []hexutil.Bytes) error { ...@@ -155,10 +155,7 @@ func (p *Prefetcher) storeTrieNodes(values []hexutil.Bytes) error {
_, nodes := mpt.WriteTrie(values) _, nodes := mpt.WriteTrie(values)
for _, node := range nodes { for _, node := range nodes {
key := preimage.Keccak256Key(crypto.Keccak256Hash(node)).PreimageKey() key := preimage.Keccak256Key(crypto.Keccak256Hash(node)).PreimageKey()
if err := p.kvStore.Put(key, node); errors.Is(err, kvstore.ErrAlreadyExists) { if err := p.kvStore.Put(key, node); err != nil {
// It's not uncommon for different tries to contain common nodes (esp for receipts)
continue
} else if err != nil {
return fmt.Errorf("failed to store node: %w", err) return fmt.Errorf("failed to store node: %w", err)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment