Commit 822df50b authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Create logdb for each chain. (#11043)

parent 269b153c
...@@ -19,6 +19,9 @@ type Metricer interface { ...@@ -19,6 +19,9 @@ type Metricer interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool) CacheGet(chainID *big.Int, label string, hit bool)
RecordDBEntryCount(chainID *big.Int, count int64)
RecordDBSearchEntriesRead(chainID *big.Int, count int64)
Document() []opmetrics.DocumentedMetric Document() []opmetrics.DocumentedMetric
} }
...@@ -29,9 +32,12 @@ type Metrics struct { ...@@ -29,9 +32,12 @@ type Metrics struct {
opmetrics.RPCMetrics opmetrics.RPCMetrics
SizeVec *prometheus.GaugeVec CacheSizeVec *prometheus.GaugeVec
GetVec *prometheus.CounterVec CacheGetVec *prometheus.CounterVec
AddVec *prometheus.CounterVec CacheAddVec *prometheus.CounterVec
DBEntryCountVec *prometheus.GaugeVec
DBSearchEntriesReadVec *prometheus.HistogramVec
info prometheus.GaugeVec info prometheus.GaugeVec
up prometheus.Gauge up prometheus.Gauge
...@@ -71,32 +77,48 @@ func NewMetrics(procName string) *Metrics { ...@@ -71,32 +77,48 @@ func NewMetrics(procName string) *Metrics {
Help: "1 if the op-supervisor has finished starting up", Help: "1 if the op-supervisor has finished starting up",
}), }),
SizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{ CacheSizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "source_rpc_cache_size", Name: "source_rpc_cache_size",
Help: "source rpc cache cache size", Help: "Source rpc cache cache size",
}, []string{ }, []string{
"chain", "chain",
"type", "type",
}), }),
GetVec: factory.NewCounterVec(prometheus.CounterOpts{ CacheGetVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "source_rpc_cache_get", Name: "source_rpc_cache_get",
Help: "source rpc cache lookups, hitting or not", Help: "Source rpc cache lookups, hitting or not",
}, []string{ }, []string{
"chain", "chain",
"type", "type",
"hit", "hit",
}), }),
AddVec: factory.NewCounterVec(prometheus.CounterOpts{ CacheAddVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "source_rpc_cache_add", Name: "source_rpc_cache_add",
Help: "source rpc cache additions, evicting previous values or not", Help: "Source rpc cache additions, evicting previous values or not",
}, []string{ }, []string{
"chain", "chain",
"type", "type",
"evicted", "evicted",
}), }),
DBEntryCountVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "logdb_entries_current",
Help: "Current number of entries in the log database by chain ID",
}, []string{
"chain",
}),
DBSearchEntriesReadVec: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Name: "logdb_search_entries_read",
Help: "Entries read per search of the log database",
Buckets: []float64{1, 2, 5, 10, 100, 200, 256},
}, []string{
"chain",
}),
} }
} }
...@@ -120,20 +142,32 @@ func (m *Metrics) RecordUp() { ...@@ -120,20 +142,32 @@ func (m *Metrics) RecordUp() {
} }
func (m *Metrics) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) { func (m *Metrics) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) {
chain := chainID.String() chain := chainIDLabel(chainID)
m.SizeVec.WithLabelValues(chain, label).Set(float64(cacheSize)) m.CacheSizeVec.WithLabelValues(chain, label).Set(float64(cacheSize))
if evicted { if evicted {
m.AddVec.WithLabelValues(chain, label, "true").Inc() m.CacheAddVec.WithLabelValues(chain, label, "true").Inc()
} else { } else {
m.AddVec.WithLabelValues(chain, label, "false").Inc() m.CacheAddVec.WithLabelValues(chain, label, "false").Inc()
} }
} }
func (m *Metrics) CacheGet(chainID *big.Int, label string, hit bool) { func (m *Metrics) CacheGet(chainID *big.Int, label string, hit bool) {
chain := chainID.String() chain := chainIDLabel(chainID)
if hit { if hit {
m.GetVec.WithLabelValues(chain, label, "true").Inc() m.CacheGetVec.WithLabelValues(chain, label, "true").Inc()
} else { } else {
m.GetVec.WithLabelValues(chain, label, "false").Inc() m.CacheGetVec.WithLabelValues(chain, label, "false").Inc()
} }
} }
func (m *Metrics) RecordDBEntryCount(chainID *big.Int, count int64) {
m.DBEntryCountVec.WithLabelValues(chainIDLabel(chainID)).Set(float64(count))
}
func (m *Metrics) RecordDBSearchEntriesRead(chainID *big.Int, count int64) {
m.DBSearchEntriesReadVec.WithLabelValues(chainIDLabel(chainID)).Observe(float64(count))
}
func chainIDLabel(chainID *big.Int) string {
return chainID.Text(10)
}
...@@ -19,3 +19,6 @@ func (*noopMetrics) RecordUp() {} ...@@ -19,3 +19,6 @@ func (*noopMetrics) RecordUp() {}
func (m *noopMetrics) CacheAdd(_ *big.Int, _ string, _ int, _ bool) {} func (m *noopMetrics) CacheAdd(_ *big.Int, _ string, _ int, _ bool) {}
func (m *noopMetrics) CacheGet(_ *big.Int, _ string, _ bool) {} func (m *noopMetrics) CacheGet(_ *big.Int, _ string, _ bool) {}
func (m *noopMetrics) RecordDBEntryCount(_ *big.Int, _ int64) {}
func (m *noopMetrics) RecordDBSearchEntriesRead(_ *big.Int, _ int64) {}
...@@ -5,30 +5,28 @@ import ( ...@@ -5,30 +5,28 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math/big"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
type Metrics interface {
source.Metrics
}
type SupervisorBackend struct { type SupervisorBackend struct {
started atomic.Bool started atomic.Bool
logger log.Logger logger log.Logger
chainMonitors []*source.ChainMonitor chainMonitors []*source.ChainMonitor
logDBs []*db.DB
// TODO(protocol-quest#287): collection of logdbs per chain
// TODO(protocol-quest#288): collection of logdb updating services per chain
} }
var _ frontend.Backend = (*SupervisorBackend)(nil) var _ frontend.Backend = (*SupervisorBackend)(nil)
...@@ -37,8 +35,23 @@ var _ io.Closer = (*SupervisorBackend)(nil) ...@@ -37,8 +35,23 @@ var _ io.Closer = (*SupervisorBackend)(nil)
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) { func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs)) chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs))
logDBs := make([]*db.DB, len(cfg.L2RPCs))
for i, rpc := range cfg.L2RPCs { for i, rpc := range cfg.L2RPCs {
monitor, err := source.NewChainMonitor(ctx, logger, m, rpc) rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
if err != nil {
return nil, err
}
cm := newChainMetrics(chainID, m)
path, err := prepLogDBPath(chainID, cfg.Datadir)
if err != nil {
return nil, fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := db.NewFromFile(logger, cm, path)
if err != nil {
return nil, fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
logDBs[i] = logDB
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err) return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
} }
...@@ -47,14 +60,26 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg ...@@ -47,14 +60,26 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
return &SupervisorBackend{ return &SupervisorBackend{
logger: logger, logger: logger,
chainMonitors: chainMonitors, chainMonitors: chainMonitors,
logDBs: logDBs,
}, nil }, nil
} }
func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, *big.Int, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, nil, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
return client.NewBaseRPCClient(ethClient.Client()), chainID, nil
}
func (su *SupervisorBackend) Start(ctx context.Context) error { func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) { if !su.started.CompareAndSwap(false, true) {
return errors.New("already started") return errors.New("already started")
} }
// TODO(protocol-quest#288): start logdb updating services of all chains
for _, monitor := range su.chainMonitors { for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil { if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err) return fmt.Errorf("failed to start chain monitor: %w", err)
...@@ -67,13 +92,17 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { ...@@ -67,13 +92,17 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) { if !su.started.CompareAndSwap(true, false) {
return errors.New("already stopped") return errors.New("already stopped")
} }
// TODO(protocol-quest#288): stop logdb updating services of all chains
var errs error var errs error
for _, monitor := range su.chainMonitors { for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil { if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err)) errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
} }
} }
for _, logDB := range su.logDBs {
if err := logDB.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to close logdb: %w", err))
}
}
return errs return errs
} }
......
package source package backend
import ( import (
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/op-service/sources/caching" "github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
) )
type Metrics interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)
RecordDBEntryCount(chainID *big.Int, count int64)
RecordDBSearchEntriesRead(chainID *big.Int, count int64)
}
// chainMetrics is an adapter between the metrics API expected by clients that assume there's only a single chain // chainMetrics is an adapter between the metrics API expected by clients that assume there's only a single chain
// and the actual metrics implementation which requires a chain ID to identify the source chain. // and the actual metrics implementation which requires a chain ID to identify the source chain.
type chainMetrics struct { type chainMetrics struct {
...@@ -28,4 +37,13 @@ func (c *chainMetrics) CacheGet(label string, hit bool) { ...@@ -28,4 +37,13 @@ func (c *chainMetrics) CacheGet(label string, hit bool) {
c.delegate.CacheGet(c.chainID, label, hit) c.delegate.CacheGet(c.chainID, label, hit)
} }
func (c *chainMetrics) RecordDBEntryCount(count int64) {
c.delegate.RecordDBEntryCount(c.chainID, count)
}
func (c *chainMetrics) RecordDBSearchEntriesRead(count int64) {
c.delegate.RecordDBSearchEntriesRead(c.chainID, count)
}
var _ caching.Metrics = (*chainMetrics)(nil) var _ caching.Metrics = (*chainMetrics)(nil)
var _ db.Metrics = (*chainMetrics)(nil)
...@@ -28,8 +28,8 @@ const ( ...@@ -28,8 +28,8 @@ const (
) )
type Metrics interface { type Metrics interface {
RecordEntryCount(count int64) RecordDBEntryCount(count int64)
RecordSearchEntriesRead(count int64) RecordDBSearchEntriesRead(count int64)
} }
type logContext struct { type logContext struct {
...@@ -166,7 +166,7 @@ func (db *DB) trimInvalidTrailingEntries() error { ...@@ -166,7 +166,7 @@ func (db *DB) trimInvalidTrailingEntries() error {
} }
func (db *DB) updateEntryCountMetric() { func (db *DB) updateEntryCountMetric() {
db.m.RecordEntryCount(db.lastEntryIdx() + 1) db.m.RecordDBEntryCount(db.lastEntryIdx() + 1)
} }
// ClosestBlockInfo returns the block number and hash of the highest recorded block at or before blockNum. // ClosestBlockInfo returns the block number and hash of the highest recorded block at or before blockNum.
...@@ -244,7 +244,7 @@ func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (TruncatedHash, *itera ...@@ -244,7 +244,7 @@ func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (TruncatedHash, *itera
} }
db.log.Trace("Starting search", "entry", entryIdx, "blockNum", i.current.blockNum, "logIdx", i.current.logIdx) db.log.Trace("Starting search", "entry", entryIdx, "blockNum", i.current.blockNum, "logIdx", i.current.logIdx)
defer func() { defer func() {
db.m.RecordSearchEntriesRead(i.entriesRead) db.m.RecordDBSearchEntriesRead(i.entriesRead)
}() }()
for { for {
evtBlockNum, evtLogIdx, evtHash, err := i.NextLog() evtBlockNum, evtLogIdx, evtHash, err := i.NextLog()
......
...@@ -881,11 +881,11 @@ type stubMetrics struct { ...@@ -881,11 +881,11 @@ type stubMetrics struct {
entriesReadForSearch int64 entriesReadForSearch int64
} }
func (s *stubMetrics) RecordEntryCount(count int64) { func (s *stubMetrics) RecordDBEntryCount(count int64) {
s.entryCount = count s.entryCount = count
} }
func (s *stubMetrics) RecordSearchEntriesRead(count int64) { func (s *stubMetrics) RecordDBSearchEntriesRead(count int64) {
s.entriesReadForSearch = count s.entriesReadForSearch = count
} }
......
package backend
import (
"fmt"
"math/big"
"os"
"path/filepath"
)
func prepLogDBPath(chainID *big.Int, datadir string) (string, error) {
dir, err := prepChainDir(chainID, datadir)
if err != nil {
return "", err
}
return filepath.Join(dir, "log.db"), nil
}
func prepChainDir(chainID *big.Int, datadir string) (string, error) {
dir := filepath.Join(datadir, chainID.Text(10))
if err := os.MkdirAll(dir, 0755); err != nil {
return "", fmt.Errorf("failed to create chain directory %v: %w", dir, err)
}
return dir, nil
}
package backend
import (
"math/big"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
func TestLogDBPath(t *testing.T) {
base := t.TempDir()
chainIDStr := "42984928492928428424243444"
chainID, ok := new(big.Int).SetString(chainIDStr, 10)
require.True(t, ok)
expected := filepath.Join(base, "subdir", chainIDStr, "log.db")
path, err := prepLogDBPath(chainID, filepath.Join(base, "subdir"))
require.NoError(t, err)
require.Equal(t, expected, path)
// Check it still works when directories exist
require.NoError(t, os.WriteFile(path, []byte("test"), 0o644))
path, err = prepLogDBPath(chainID, filepath.Join(base, "subdir"))
require.NoError(t, err)
require.Equal(t, expected, path)
}
...@@ -7,13 +7,11 @@ import ( ...@@ -7,13 +7,11 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching" "github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
) )
// TODO(optimism#11032) Make these configurable and a sensible default // TODO(optimism#11032) Make these configurable and a sensible default
...@@ -23,8 +21,7 @@ const trustRpc = false ...@@ -23,8 +21,7 @@ const trustRpc = false
const rpcKind = sources.RPCKindStandard const rpcKind = sources.RPCKindStandard
type Metrics interface { type Metrics interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) caching.Metrics
CacheGet(chainID *big.Int, label string, hit bool)
} }
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform // ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
...@@ -34,19 +31,9 @@ type ChainMonitor struct { ...@@ -34,19 +31,9 @@ type ChainMonitor struct {
headMonitor *HeadMonitor headMonitor *HeadMonitor
} }
func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metrics, rpc string) (*ChainMonitor, error) { func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID *big.Int, rpc string, client client.RPC) (*ChainMonitor, error) {
// First dial a simple client and get the chain ID so we have a simple identifier for the chain.
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
logger = logger.New("chainID", chainID) logger = logger.New("chainID", chainID)
m := newChainMetrics(chainID, genericMetrics) cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
cl, err := newClient(ctx, logger, m, rpc, ethClient.Client(), pollInterval, trustRpc, rpcKind)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -85,8 +72,8 @@ func (n *loggingReceiptProcessor) ProcessLogs(_ context.Context, block eth.L1Blo ...@@ -85,8 +72,8 @@ func (n *loggingReceiptProcessor) ProcessLogs(_ context.Context, block eth.L1Blo
return nil return nil
} }
func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient *rpc.Client, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) { func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, client.NewBaseRPCClient(rpcClient), pollRate) c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create new RPC client: %w", err) return nil, fmt.Errorf("failed to create new RPC client: %w", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment