Commit 411de197 authored by vicotor's avatar vicotor

add metadb and update flags for op-node

parent 0b7e45e6
......@@ -4,7 +4,6 @@ import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/exchain/go-exchain/exchain"
nebulav1 "github.com/exchain/go-exchain/exchain/protocol/gen/go/nebula/v1"
"github.com/exchain/go-exchain/op-node/p2p"
......@@ -17,21 +16,6 @@ import (
type EngineAPI struct {
}
func (e *EngineAPI) SignalSuperchainV1(ctx context.Context, recommended, required params.ProtocolVersion) (params.ProtocolVersion, error) {
//TODO implement me
panic("implement me")
}
func (e *EngineAPI) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) {
//TODO implement me
panic("implement me")
}
func (e *EngineAPI) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) {
//TODO implement me
panic("implement me")
}
func (e *EngineAPI) BlockRefByNumber(ctx context.Context, num uint64) (eth.BlockRef, error) {
//TODO implement me
panic("implement me")
......@@ -62,10 +46,10 @@ func (e *EngineAPI) ProcessPayload(block *nebulav1.Block) error {
panic("implement me")
}
func (e *EngineAPI) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayloadEnvelope, error) {
//TODO implement me
panic("implement me")
}
//func (e *EngineAPI) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayloadEnvelope, error) {
// //TODO implement me
// panic("implement me")
//}
func (e *EngineAPI) PayloadByNumber(ctx context.Context, u uint64) (*eth.ExecutionPayloadEnvelope, error) {
//TODO implement me
......
......@@ -3,6 +3,7 @@ package mockengine
import (
"github.com/exchain/go-exchain/exchain"
nebulav1 "github.com/exchain/go-exchain/exchain/protocol/gen/go/nebula/v1"
"github.com/exchain/go-exchain/metadb"
)
type MockEngine struct {
......@@ -23,6 +24,6 @@ func (m MockEngine) ProcessPayload(block *nebulav1.Block) error {
panic("implement me")
}
func NewEngine() exchain.Engine {
func NewEngine(database metadb.Database) exchain.Engine {
return &MockEngine{}
}
package metadb
import "io"
// IdealBatchSize defines the size of the data batches should ideally add in one
// write.
const IdealBatchSize = 100 * 1024
type Encodable interface {
Encode() ([]byte, error)
}
type Database interface {
KeyValueReader
KeyValueWriter
Batcher
io.Closer
}
type CacheKV interface {
Get(key interface{}) (interface{}, bool)
Set(key interface{}, value interface{}) error
Delete(key interface{}) error
Copy() CacheKV
}
// KeyValueReader wraps the Has and Get method of a backing data store.
type KeyValueReader interface {
// Has retrieves if a key is present in the key-value data store.
Has(key []byte) (bool, error)
// Get retrieves the given key if it's present in the key-value data store.
Get(key []byte) ([]byte, error)
}
// KeyValueWriter wraps the Put method of a backing data store.
type KeyValueWriter interface {
// Put inserts the given value into the key-value data store.
Put(key []byte, value []byte) error
// Delete removes the key from the key-value data store.
Delete(key []byte) error
}
// KeyValueStater wraps the Stat method of a backing data store.
type KeyValueStater interface {
// Stat returns a particular internal stat of the database.
Stat(property string) (string, error)
}
// Compacter wraps the Compact method of a backing data store.
type Compacter interface {
// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
Compact(start []byte, limit []byte) error
}
// Batch is a write-only database that commits changes to its host database
// when Write is called. A batch cannot be used concurrently.
type Batch interface {
KeyValueWriter
// ValueSize retrieves the amount of data queued up for writing.
ValueSize() int
// Write flushes any accumulated data to disk.
Write() error
// Reset resets the batch for reuse.
Reset()
// Replay replays the batch contents.
Replay(w KeyValueWriter) error
}
// Batcher wraps the NewBatch method of a backing data store.
type Batcher interface {
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
NewBatch() Batch
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
NewBatchWithSize(size int) Batch
}
// HookedBatch wraps an arbitrary batch where each operation may be hooked into
// to monitor from black box code.
type HookedBatch struct {
Batch
OnPut func(key []byte, value []byte) // Callback if a key is inserted
OnDelete func(key []byte) // Callback if a key is deleted
}
// Put inserts the given value into the key-value data store.
func (b HookedBatch) Put(key []byte, value []byte) error {
if b.OnPut != nil {
b.OnPut(key, value)
}
return b.Batch.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (b HookedBatch) Delete(key []byte) error {
if b.OnDelete != nil {
b.OnDelete(key)
}
return b.Batch.Delete(key)
}
// KeyValueStore contains all the methods required to allow handling different
// key-value data stores backing the high level database.
type KeyValueStore interface {
KeyValueReader
KeyValueWriter
//KeyValueStater
Batcher
//Iteratee
//Compacter
//Snapshotter
io.Closer
}
package groupdb
import (
"errors"
"sync"
"github.com/exchain/go-exchain/metadb"
)
var (
_ metadb.Batch = &gBatch{}
)
type gBatch struct {
gdb *GroupDb
batches []metadb.Batch
}
func (g *gBatch) instance(k []byte) metadb.Batch {
if len(k) == 0 {
return g.batches[0]
}
idx := int(k[len(k)-1]) % len(g.batches)
return g.batches[idx]
}
func (g *gBatch) Put(key []byte, value []byte) error {
return g.instance(key).Put(key, value)
}
func (g *gBatch) Delete(key []byte) error {
return g.instance(key).Delete(key)
}
func (g *gBatch) ValueSize() int {
v := 0
for _, b := range g.batches {
v += b.ValueSize()
}
return v
}
func (g *gBatch) Write() error {
wg := sync.WaitGroup{}
wg.Add(len(g.batches))
var dberr error
for _, b := range g.batches {
go func(b metadb.Batch) {
defer wg.Done()
if err := b.Write(); err != nil {
dberr = err
}
}(b)
}
wg.Wait()
return dberr
}
func (g *gBatch) Reset() {
for _, b := range g.batches {
b.Reset()
}
}
func (g *gBatch) Replay(w metadb.KeyValueWriter) error {
return errors.New("replay not support")
}
func newGBatch(gdb *GroupDb) *gBatch {
g := &gBatch{
gdb: gdb,
batches: make([]metadb.Batch, gdb.dbcnt),
}
for i := 0; i < gdb.dbcnt; i++ {
g.batches[i] = gdb.dbIns[i].NewBatch()
}
return g
}
func newGBatchWithSize(gdb *GroupDb, size int) *gBatch {
g := &gBatch{
gdb: gdb,
batches: make([]metadb.Batch, gdb.dbcnt),
}
for i := 0; i < gdb.dbcnt; i++ {
g.batches[i] = gdb.dbIns[i].NewBatchWithSize(size)
}
return g
}
package groupdb
import "github.com/exchain/go-exchain/metadb"
type gIterator struct {
gdb *GroupDb
iterators []metadb.Iterator
curidx int
}
var (
_ metadb.Iterator = (*gIterator)(nil)
)
func newGIterator(g *GroupDb, prefix []byte, start []byte) *gIterator {
gi := &gIterator{
gdb: g,
curidx: 0,
iterators: make([]metadb.Iterator, g.dbcnt),
}
for i := 0; i < len(gi.iterators); i++ {
gi.iterators[i] = g.dbIns[i].NewIterator(prefix, start)
}
return gi
}
func (g *gIterator) Next() bool {
iter := g.iterators[g.curidx]
if iter.Next() == false {
g.curidx++
if g.curidx >= len(g.iterators) {
return false
}
return g.Next()
} else {
return true
}
}
func (g *gIterator) Error() error {
iter := g.iterators[g.curidx]
return iter.Error()
}
func (g *gIterator) Key() []byte {
iter := g.iterators[g.curidx]
return iter.Key()
}
func (g *gIterator) Value() []byte {
iter := g.iterators[g.curidx]
return iter.Value()
}
func (g *gIterator) Release() {
for _, iter := range g.iterators {
iter.Release()
}
}
package groupdb
import (
"fmt"
"path/filepath"
"github.com/exchain/go-exchain/metadb"
)
var (
_ metadb.Database = (*GroupDb)(nil)
)
const (
dbGroupNum = 16
)
func (g *GroupDb) instance(k []byte) *levelDB {
if len(k) == 0 {
return g.dbIns[0]
}
idx := int(k[len(k)-1]) % g.dbcnt
return g.dbIns[idx]
}
type GroupDb struct {
dbcnt int
dbIns []*levelDB // levelDB instance
}
func NewGroupDB(root string, dir string) *GroupDb {
var err error
gdb := &GroupDb{
dbcnt: dbGroupNum,
dbIns: make([]*levelDB, dbGroupNum),
}
for i := 0; i < len(gdb.dbIns); i++ {
file := fmt.Sprintf("%s-%d", "db", i)
full := filepath.Join(root, dir, file)
gdb.dbIns[i], err = newLevelDB(full)
if err != nil {
panic(fmt.Sprintf("create groupdb failed:%s", err))
}
}
return gdb
}
func (g *GroupDb) Close() error {
for _, db := range g.dbIns {
if err := db.Close(); err != nil {
return err
}
}
return nil
}
func (g *GroupDb) Delete(k []byte) error {
return g.instance(k).Delete(k)
}
func (g *GroupDb) Has(k []byte) (bool, error) {
return g.instance(k).Has(k)
}
func (g *GroupDb) Set(k []byte, value []byte) error {
return g.instance(k).Set(k, value)
}
func (g *GroupDb) Put(k []byte, value []byte) error {
return g.instance(k).Set(k, value)
}
func (g *GroupDb) Get(k []byte) ([]byte, error) {
return g.instance(k).Get(k)
}
func (g *GroupDb) NewBatch() metadb.Batch {
return newGBatch(g)
}
func (g *GroupDb) NewBatchWithSize(size int) metadb.Batch {
return newGBatchWithSize(g, size)
}
func (g *GroupDb) NewIterator(prefix []byte, start []byte) metadb.Iterator {
return newGIterator(g, prefix, start)
}
package groupdb
import (
"fmt"
"testing"
)
func TestGroupDb_NewIterator(t *testing.T) {
type testtype struct {
key string
value string
}
gdb := NewGroupDB("node", "groupdata")
testdata := make([]testtype, 1000)
for i := 0; i < len(testdata); i++ {
testdata[i] = testtype{
key: fmt.Sprintf("key%d", i),
value: fmt.Sprintf("value%d", i),
}
}
for i := 0; i < len(testdata); i++ {
err := gdb.Set([]byte(testdata[i].key), []byte(testdata[i].value))
if err != nil {
t.Error(err)
}
}
iter := gdb.NewIterator([]byte("key1"), []byte("key11"))
if iter == nil {
t.Error("NewIterator returned nil")
}
cnt := 0
for iter.Next() {
key := iter.Key()
value := iter.Value()
fmt.Printf("key:%s, value:%s\n", key, value)
cnt += 1
}
if cnt != len(testdata) {
t.Errorf("cnt:%d, len(testdata):%d", cnt, len(testdata))
}
}
package groupdb
import (
"sync"
"github.com/exchain/go-exchain/metadb"
log "github.com/sirupsen/logrus"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
// Database is a persistent key-value store. Apart from basic data storage
// functionality it also supports batch writes and iterating over the keyspace in
// binary-alphabetical order.
type levelDB struct {
fn string // filename for reporting
db *leveldb.DB // levelDB instance
quitLock sync.Mutex // Mutex protecting the quit channel access
logger *log.Entry // Contextual logger tracking the database path
}
// newLevelDB returns a wrapped levelDB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
// The customize function allows the caller to modify the leveldb options.
func newLevelDB(file string) (*levelDB, error) {
logger := log.WithField("dbpath", file)
// Open the ethdb and recover any potential corruptions
db, err := leveldb.OpenFile(file, &opt.Options{
Filter: filter.NewBloomFilter(10),
DisableSeeksCompaction: true,
WriteBuffer: 4 * 1024 * 1024,
})
//db, err := leveldb.OpenFile(file, options)
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
db, err = leveldb.RecoverFile(file, nil)
}
if err != nil {
return nil, err
}
// Assemble the wrapper with all the registered metrics
ldb := &levelDB{
fn: file,
db: db,
logger: logger,
}
return ldb, nil
}
// Close stops the metrics collection, flushes any pending data to disk and closes
// all io accesses to the underlying key-value store.
func (db *levelDB) Close() error {
db.quitLock.Lock()
defer db.quitLock.Unlock()
return db.db.Close()
}
// Has retrieves if a key is present in the key-value store.
func (db *levelDB) Has(k []byte) (bool, error) {
return db.db.Has(k, nil)
}
// Get retrieves the given key if it's present in the key-value store.
func (db *levelDB) Get(k []byte) ([]byte, error) {
dat, err := db.db.Get(k, nil)
if err != nil {
return nil, errors.New("not found")
}
return dat, nil
}
// Set inserts the given value into the key-value store.
func (db *levelDB) Set(k []byte, value []byte) error {
return db.db.Put(k, value, nil)
}
// Delete removes the key from the key-value store.
func (db *levelDB) Delete(k []byte) error {
return db.db.Delete(k, nil)
}
// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (db *levelDB) NewBatch() metadb.Batch {
return &batch{
db: db.db,
b: new(leveldb.Batch),
}
}
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (db *levelDB) NewBatchWithSize(size int) metadb.Batch {
return &batch{
db: db.db,
b: leveldb.MakeBatch(size),
}
}
// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
func (db *levelDB) NewIterator(prefix []byte, start []byte) metadb.Iterator {
return db.db.NewIterator(bytesPrefixRange(prefix, start), nil)
}
// batch is a write-only leveldb batch that commits changes to its host database
// when Write is called. A batch cannot be used concurrently.
type batch struct {
db *leveldb.DB
b *leveldb.Batch
mux sync.Mutex
size int
}
func (b *batch) Replay(w metadb.KeyValueWriter) error {
return errors.New("unsupport replay")
}
// Put inserts the given value into the batch for later committing.
func (b *batch) Set(k []byte, value []byte) error {
b.mux.Lock()
defer b.mux.Unlock()
b.b.Put(k, value)
b.size += len(k) + len(value)
return nil
}
// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(k []byte) error {
b.mux.Lock()
defer b.mux.Unlock()
b.b.Delete(k)
b.size += len(k)
return nil
}
// Put inserts the given value into the batch for later committing.
func (b *batch) Put(key, value []byte) error {
b.mux.Lock()
defer b.mux.Unlock()
b.b.Put(key, value)
b.size += len(key) + len(value)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
}
// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
b.mux.Lock()
defer b.mux.Unlock()
return b.db.Write(b.b, nil)
}
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.mux.Lock()
defer b.mux.Unlock()
b.b.Reset()
b.size = 0
}
// bytesPrefixRange returns key range that satisfy
// - the given prefix, and
// - the given seek position
func bytesPrefixRange(prefix, start []byte) *util.Range {
r := util.BytesPrefix(prefix)
r.Start = append(r.Start, start...)
return r
}
package metadb
// Iterator iterates over a database's key/value pairs in ascending key order.
//
// When it encounters an error any seek will return false and will yield no key/
// value pairs. The error can be queried by calling the Error method. Calling
// Release is still necessary.
//
// An iterator must be released after use, but it is not necessary to read an
// iterator until exhaustion. An iterator is not safe for concurrent use, but it
// is safe to use multiple iterators concurrently.
type Iterator interface {
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
Next() bool
// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error.
Error() error
// Key returns the key of the current key/value pair, or nil if done. The caller
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
Key() []byte
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
Value() []byte
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
Release()
}
// Iteratee wraps the NewIterator methods of a backing data store.
type Iteratee interface {
// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
//
// Note: This method assumes that the prefix is NOT part of the start, so there's
// no need for the caller to prepend the prefix to the start
NewIterator(prefix []byte, start []byte) Iterator
}
package memdb
import (
"errors"
"sort"
"strings"
"sync"
"github.com/exchain/go-exchain/metadb"
)
var (
// errMemorydbClosed is returned if a memory database was already closed at the
// invocation of a data access operation.
errMemorydbClosed = errors.New("database closed")
// errMemorydbNotFound is returned if a key is requested that is not found in
// the provided memory database.
errMemorydbNotFound = errors.New("not found")
// errSnapshotReleased is returned if callers want to retrieve data from a
// released snapshot.
errSnapshotReleased = errors.New("snapshot released")
)
// cacheDb is an ephemeral key-value store. Apart from basic data storage
// functionality it also supports batch writes and iterating over the keyspace in
// binary-alphabetical order.
type cacheDb struct {
db map[string][]byte
lock sync.RWMutex
}
// New returns a wrapped map with all the required database interface methods
// implemented.
func New() *cacheDb {
return &cacheDb{
db: make(map[string][]byte),
}
}
// NewWithCap returns a wrapped map pre-allocated to the provided capacity with
// all the required database interface methods implemented.
func NewWithCap(size int) *cacheDb {
return &cacheDb{
db: make(map[string][]byte, size),
}
}
// Close deallocates the internal map and ensures any consecutive data access op
// fails with an error.
func (db *cacheDb) Close() error {
db.lock.Lock()
defer db.lock.Unlock()
db.db = nil
return nil
}
// Has retrieves if a key is present in the key-value store.
func (db *cacheDb) Has(key []byte) (bool, error) {
db.lock.RLock()
defer db.lock.RUnlock()
if db.db == nil {
return false, errMemorydbClosed
}
_, ok := db.db[string(key)]
return ok, nil
}
// Get retrieves the given key if it's present in the key-value store.
func (db *cacheDb) Get(key []byte) ([]byte, error) {
db.lock.RLock()
defer db.lock.RUnlock()
if db.db == nil {
return nil, errMemorydbClosed
}
if entry, ok := db.db[string(key)]; ok {
return cloneBytes(entry), nil
}
return nil, errMemorydbNotFound
}
// Put inserts the given value into the key-value store.
func (db *cacheDb) Put(key []byte, value []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
if db.db == nil {
return errMemorydbClosed
}
db.db[string(key)] = cloneBytes(value)
return nil
}
// Delete removes the key from the key-value store.
func (db *cacheDb) Delete(key []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
if db.db == nil {
return errMemorydbClosed
}
delete(db.db, string(key))
return nil
}
// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (db *cacheDb) NewBatch() metadb.Batch {
return &batch{
db: db,
}
}
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (db *cacheDb) NewBatchWithSize(size int) metadb.Batch {
return &batch{
db: db,
}
}
// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
func (db *cacheDb) NewIterator(prefix []byte, start []byte) metadb.Iterator {
db.lock.RLock()
defer db.lock.RUnlock()
var (
pr = string(prefix)
st = string(append(prefix, start...))
keys = make([]string, 0, len(db.db))
values = make([][]byte, 0, len(db.db))
)
// Collect the keys from the memory database corresponding to the given prefix
// and start
for key := range db.db {
if !strings.HasPrefix(key, pr) {
continue
}
if key >= st {
keys = append(keys, key)
}
}
// Sort the items and retrieve the associated values
sort.Strings(keys)
for _, key := range keys {
values = append(values, db.db[key])
}
return &iterator{
index: -1,
keys: keys,
values: values,
}
}
// NewSnapshot creates a database snapshot based on the current state.
// The created snapshot will not be affected by all following mutations
// happened on the database.
func (db *cacheDb) NewSnapshot() (metadb.Snapshot, error) {
return newSnapshot(db), nil
}
// Stat returns a particular internal stat of the database.
func (db *cacheDb) Stat(property string) (string, error) {
return "", errors.New("unknown property")
}
// Compact is not supported on a memory database, but there's no need either as
// a memory database doesn't waste space anyway.
func (db *cacheDb) Compact(start []byte, limit []byte) error {
return nil
}
// Len returns the number of entries currently present in the memory database.
//
// Note, this method is only used for testing (i.e. not public in general) and
// does not have explicit checks for closed-ness to allow simpler testing code.
func (db *cacheDb) Len() int {
db.lock.RLock()
defer db.lock.RUnlock()
return len(db.db)
}
// keyvalue is a key-value tuple tagged with a deletion field to allow creating
// memory-database write batches.
type keyvalue struct {
key []byte
value []byte
delete bool
}
// batch is a write-only memory batch that commits changes to its host
// database when Write is called. A batch cannot be used concurrently.
type batch struct {
db *cacheDb
writes []keyvalue
size int
}
// Put inserts the given value into the batch for later committing.
func (b *batch) Put(key, value []byte) error {
b.writes = append(b.writes, keyvalue{cloneBytes(key), cloneBytes(value), false})
b.size += len(key) + len(value)
return nil
}
// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyvalue{cloneBytes(key), nil, true})
b.size += len(key)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
}
// Write flushes any accumulated data to the memory database.
func (b *batch) Write() error {
b.db.lock.Lock()
defer b.db.lock.Unlock()
for _, keyvalue := range b.writes {
if keyvalue.delete {
delete(b.db.db, string(keyvalue.key))
continue
}
b.db.db[string(keyvalue.key)] = keyvalue.value
}
return nil
}
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.writes = b.writes[:0]
b.size = 0
}
// Replay replays the batch contents.
func (b *batch) Replay(w metadb.KeyValueWriter) error {
for _, keyvalue := range b.writes {
if keyvalue.delete {
if err := w.Delete(keyvalue.key); err != nil {
return err
}
continue
}
if err := w.Put(keyvalue.key, keyvalue.value); err != nil {
return err
}
}
return nil
}
// iterator can walk over the (potentially partial) keyspace of a memory key
// value store. Internally it is a deep copy of the entire iterated state,
// sorted by keys.
type iterator struct {
index int
keys []string
values [][]byte
}
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
func (it *iterator) Next() bool {
// Short circuit if iterator is already exhausted in the forward direction.
if it.index >= len(it.keys) {
return false
}
it.index += 1
return it.index < len(it.keys)
}
// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error. A memory iterator cannot encounter errors.
func (it *iterator) Error() error {
return nil
}
// Key returns the key of the current key/value pair, or nil if done. The caller
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
func (it *iterator) Key() []byte {
// Short circuit if iterator is not in a valid position
if it.index < 0 || it.index >= len(it.keys) {
return nil
}
return []byte(it.keys[it.index])
}
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
func (it *iterator) Value() []byte {
// Short circuit if iterator is not in a valid position
if it.index < 0 || it.index >= len(it.keys) {
return nil
}
return it.values[it.index]
}
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (it *iterator) Release() {
it.index, it.keys, it.values = -1, nil, nil
}
// snapshot wraps a batch of key-value entries deep copied from the in-memory
// database for implementing the Snapshot interface.
type snapshot struct {
db map[string][]byte
lock sync.RWMutex
}
// newSnapshot initializes the snapshot with the given database instance.
func newSnapshot(db *cacheDb) *snapshot {
db.lock.RLock()
defer db.lock.RUnlock()
copied := make(map[string][]byte)
for key, val := range db.db {
copied[key] = cloneBytes(val)
}
return &snapshot{db: copied}
}
// Has retrieves if a key is present in the snapshot backing by a key-value
// data store.
func (snap *snapshot) Has(key []byte) (bool, error) {
snap.lock.RLock()
defer snap.lock.RUnlock()
if snap.db == nil {
return false, errSnapshotReleased
}
_, ok := snap.db[string(key)]
return ok, nil
}
// Get retrieves the given key if it's present in the snapshot backing by
// key-value data store.
func (snap *snapshot) Get(key []byte) ([]byte, error) {
snap.lock.RLock()
defer snap.lock.RUnlock()
if snap.db == nil {
return nil, errSnapshotReleased
}
if entry, ok := snap.db[string(key)]; ok {
return cloneBytes(entry), nil
}
return nil, errMemorydbNotFound
}
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (snap *snapshot) Release() {
snap.lock.Lock()
defer snap.lock.Unlock()
snap.db = nil
}
func cloneBytes(data []byte) []byte {
m := make([]byte, len(data))
copy(m, data)
return m
}
package memdb
import (
"sync"
"github.com/exchain/go-exchain/metadb"
)
type syncmapdb struct {
db sync.Map
}
func (s *syncmapdb) Get(key interface{}) (interface{}, bool) {
v, ok := s.db.Load(key)
if !ok {
return nil, false
} else {
return v, true
}
}
func (s *syncmapdb) Set(key, value interface{}) error {
s.db.Store(key, value)
return nil
}
func (s *syncmapdb) Delete(key interface{}) error {
s.db.Delete(key)
return nil
}
func (s *syncmapdb) Copy() metadb.CacheKV {
nd := NewMemDB()
s.db.Range(func(k, v interface{}) bool {
nd.Set(k, v)
return true
})
return nd
}
func NewMemDB() metadb.CacheKV {
return &syncmapdb{}
}
package rawdb
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
type RawDb struct {
path string
}
func NewRawDb(root string, dir string) *RawDb {
full := filepath.Join(root, dir)
if !dirExists(full) {
if err := os.Mkdir(full, 0755); err != nil {
panic(fmt.Sprintf("create rawdb failed:%s:%s", full, err.Error()))
}
}
return &RawDb{path: full}
}
func (r *RawDb) Get(key []byte) ([]byte, error) {
d := filepath.Join(r.path, string(key))
return ioutil.ReadFile(d)
}
func (r *RawDb) Set(key []byte, value []byte) error {
d := filepath.Join(r.path, string(key))
return ioutil.WriteFile(d, value, 0644)
}
func dirExists(path string) bool {
_, err := os.Stat(path)
if os.IsNotExist(err) {
return false
}
return true
}
package metadb
type Snapshot interface {
// Has retrieves if a key is present in the snapshot backing by a key-value
// data store.
Has(key []byte) (bool, error)
// Get retrieves the given key if it's present in the snapshot backing by
// key-value data store.
Get(key []byte) ([]byte, error)
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
Release()
}
// Snapshotter wraps the Snapshot method of a backing data store.
type Snapshotter interface {
// NewSnapshot creates a database snapshot based on the current state.
// The created snapshot will not be affected by all following mutations
// happened on the database.
// Note don't forget to release the snapshot once it's used up, otherwise
// the stale data will never be cleaned up by the underlying compactor.
NewSnapshot() (Snapshot, error)
}
This diff is collapsed.
package storagedb
import (
"github.com/exchain/go-exchain/metadb"
"path/filepath"
)
func NewStorageDB(root string, dir string) metadb.Database {
file := filepath.Join(root, dir)
return newLevelDB(file)
}
......@@ -45,6 +45,13 @@ func prefixEnvVars(names ...string) []string {
var (
/* Required Flags */
DataDir = &cli.StringFlag{
Name: "data-dir",
Usage: "Path to the directory where the node will store its data",
Value: "node",
EnvVars: prefixEnvVars("DATA_DIR"),
Category: RollupCategory,
}
L1NodeAddr = &cli.StringFlag{
Name: "l1",
Usage: "Address of L1 User JSON-RPC endpoint to use (eth namespace required)",
......@@ -380,6 +387,7 @@ var requiredFlags = []cli.Flag{
}
var optionalFlags = []cli.Flag{
DataDir,
BeaconAddr,
BeaconHeader,
BeaconFallbackAddrs,
......
......@@ -17,10 +17,9 @@ import (
)
type l2Client interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
// GetProof returns a proof of the account, it may return a nil result without error if the address was not found.
// Optionally keys of the account storage trie can be specified to include with corresponding values in the proof.
GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error)
//GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error)
OutputV0AtBlock(ctx context.Context, blockHash common.Hash) (*eth.OutputV0, error)
}
......
......@@ -47,6 +47,8 @@ type Config struct {
ConfigPersistence ConfigPersistence
NodeDataPath string
// Path to store safe head database. Disabled when set to empty string
SafeDBPath string
......
......@@ -6,6 +6,9 @@ import (
"fmt"
"github.com/exchain/go-exchain/engine"
"github.com/exchain/go-exchain/exchain"
"github.com/exchain/go-exchain/exchain/mockengine"
"github.com/exchain/go-exchain/metadb"
"github.com/exchain/go-exchain/metadb/groupdb"
"io"
gosync "sync"
"sync/atomic"
......@@ -96,6 +99,10 @@ type OpNode struct {
// cancels execution prematurely, e.g. to halt. This may be nil.
cancel context.CancelCauseFunc
halted atomic.Bool
// database
db metadb.Database
engineIns exchain.Engine
}
// The OpNode handles incoming gossip
......@@ -388,14 +395,17 @@ func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error {
func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
var err error
// todo: create l2Source by inner.
n.l2Source = engine.NewEngineAPI(exchain.Database(nil))
//n.l2Source, err = sources.NewEngineClient(n.log, n.metrics.L2SourceCache, sources.EngineClientDefaultConfig(&cfg.Rollup))
//if err != nil {
// return fmt.Errorf("failed to create Engine client: %w", err)
//}
if err := cfg.Rollup.ValidateL2Config(ctx, n.l2Source, cfg.Sync.SyncMode == sync.ELSync); err != nil {
n.db = groupdb.NewGroupDB(n.cfg.NodeDataPath, "engine")
n.engineIns = mockengine.NewEngine(n.db)
n.l2Source = engine.NewEngineAPI(n.db)
if n.engineIns == nil {
return errors.New("failed to create L2 engine")
}
if err = n.engineIns.Start(); err != nil {
return fmt.Errorf("failed to start L2 engine: %w", err)
}
if err = cfg.Rollup.ValidateL2Config(ctx, n.l2Source, cfg.Sync.SyncMode == sync.ELSync); err != nil {
return err
}
......
......@@ -4,9 +4,9 @@ import (
"context"
"errors"
"github.com/exchain/go-exchain/op-node/rollup"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/params"
"github.com/exchain/go-exchain/op-node/rollup"
)
var errNodeHalt = errors.New("opted to halt, unprepared for protocol change")
......@@ -19,16 +19,6 @@ func (n *OpNode) handleProtocolVersionsUpdate(ctx context.Context) error {
return nil
}
local := rollup.OPStackSupport
// forward to execution engine, and get back the protocol version that op-geth supports
engineSupport, err := n.l2Source.SignalSuperchainV1(ctx, recommended, required)
if err != nil {
n.log.Warn("failed to notify engine of protocol version", "err", err)
// engineSupport may still be available, or otherwise zero to signal as unknown
} else {
catalyst.LogProtocolVersionSupport(n.log.New("node", "op-node"), engineSupport, recommended, "recommended")
catalyst.LogProtocolVersionSupport(n.log.New("node", "op-node"), engineSupport, required, "required")
}
n.metrics.ReportProtocolVersions(local, engineSupport, recommended, required)
catalyst.LogProtocolVersionSupport(n.log.New("node", "engine"), local, recommended, "recommended")
catalyst.LogProtocolVersionSupport(n.log.New("node", "engine"), local, required, "required")
......
......@@ -52,7 +52,7 @@ type ForkTransformer interface {
}
type L2Source interface {
PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayloadEnvelope, error)
//PayloadByHash(context.Context, common.Hash) (*eth.ExecutionPayloadEnvelope, error)
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
......
......@@ -95,6 +95,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
L1EpochPollInterval: ctx.Duration(flags.L1EpochPollIntervalFlag.Name),
RuntimeConfigReloadInterval: ctx.Duration(flags.RuntimeConfigReloadIntervalFlag.Name),
ConfigPersistence: configPersistence,
NodeDataPath: ctx.String(flags.DataDir.Name),
SafeDBPath: ctx.String(flags.SafeDBPath.Name),
Sync: *syncConfig,
RollupHalt: haltOption,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment