Commit 590fc814 authored by Tyler Smith's avatar Tyler Smith Committed by GitHub

feat,interop: add db syncing between nodes (#13357)

* feature: Allow adding arbitrary HTTP handlers to RPC HTTP Server.

* feature: Add sync package to handle streaming db files between nodes.

* tweak: Add DB sync server handler to op-supervisor server.

* feature: Use db sync if it's configured and there's no existing db.

* tweak: Use retry lib, which also adds max retries.

* rename: SyncRPC -> SyncEndpoint.

* tweak: Make the initial chain set part of the Server constructor.

* cleanup: Remove unused functions.

* rename: client -> syncClient to avoid package collision.

* fix: Load DepSet and use for sync server.

* tweak: Use client.BasicHTTPClient for sync client.

* fix: Don't double concat the path.

* tests,fix: Set sync resume to true.

* tweak: Add error context.

* tweak: Make http body close lint-friendly.

* tweak: Change retry policy to backoff from 1s to 30s, up to 10 minutes.

* tests,fix: Use require.ErrorIs to handle wrapped error.
parent 2a903ea4
...@@ -40,6 +40,7 @@ type Server struct { ...@@ -40,6 +40,7 @@ type Server struct {
tls *ServerTLSConfig tls *ServerTLSConfig
middlewares []Middleware middlewares []Middleware
rpcServer *rpc.Server rpcServer *rpc.Server
handlers map[string]http.Handler
} }
type ServerTLSConfig struct { type ServerTLSConfig struct {
...@@ -147,6 +148,7 @@ func NewServer(host string, port int, appVersion string, opts ...ServerOption) * ...@@ -147,6 +148,7 @@ func NewServer(host string, port int, appVersion string, opts ...ServerOption) *
}, },
log: log.Root(), log: log.Root(),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
handlers: make(map[string]http.Handler),
} }
for _, opt := range opts { for _, opt := range opts {
opt(bs) opt(bs)
...@@ -172,6 +174,14 @@ func (b *Server) AddAPI(api rpc.API) { ...@@ -172,6 +174,14 @@ func (b *Server) AddAPI(api rpc.API) {
b.apis = append(b.apis, api) b.apis = append(b.apis, api)
} }
// AddHandler adds a custom http.Handler to the server, mapped to an absolute path
func (b *Server) AddHandler(path string, handler http.Handler) {
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
b.handlers[path] = handler
}
func (b *Server) Start() error { func (b *Server) Start() error {
// Register all APIs to the RPC server. // Register all APIs to the RPC server.
for _, api := range b.apis { for _, api := range b.apis {
...@@ -208,6 +218,9 @@ func (b *Server) Start() error { ...@@ -208,6 +218,9 @@ func (b *Server) Start() error {
handler = opmetrics.NewHTTPRecordingMiddleware(b.httpRecorder, handler) handler = opmetrics.NewHTTPRecordingMiddleware(b.httpRecorder, handler)
handler = oplog.NewLoggingMiddleware(b.log, handler) handler = oplog.NewLoggingMiddleware(b.log, handler)
// Add custom handlers
handler = b.newUserHandlersMiddleware(handler)
b.httpServer.Handler = handler b.httpServer.Handler = handler
listener, err := net.Listen("tcp", b.endpoint) listener, err := net.Listen("tcp", b.endpoint)
...@@ -277,6 +290,18 @@ func (b *Server) newWsMiddleWare(next http.Handler) http.Handler { ...@@ -277,6 +290,18 @@ func (b *Server) newWsMiddleWare(next http.Handler) http.Handler {
}) })
} }
func (b *Server) newUserHandlersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for path, handler := range b.handlers {
if strings.HasPrefix(r.URL.Path, path) {
handler.ServeHTTP(w, r)
return
}
}
next.ServeHTTP(w, r)
})
}
func (b *Server) Stop() error { func (b *Server) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
......
...@@ -40,6 +40,7 @@ type Config struct { ...@@ -40,6 +40,7 @@ type Config struct {
SyncSources syncnode.SyncNodeCollection SyncSources syncnode.SyncNodeCollection
Datadir string Datadir string
DatadirSyncEndpoint string
} }
func (c *Config) Check() error { func (c *Config) Check() error {
......
...@@ -45,6 +45,11 @@ var ( ...@@ -45,6 +45,11 @@ var (
Usage: "Directory to store data generated as part of responding to games", Usage: "Directory to store data generated as part of responding to games",
EnvVars: prefixEnvVars("DATADIR"), EnvVars: prefixEnvVars("DATADIR"),
} }
DataDirSyncEndpointFlag = &cli.PathFlag{
Name: "datadir.sync-endpoint",
Usage: "op-supervisor endpoint to sync databases from",
EnvVars: prefixEnvVars("DATADIR_SYNC_ENDPOINT"),
}
DependencySetFlag = &cli.PathFlag{ DependencySetFlag = &cli.PathFlag{
Name: "dependency-set", Name: "dependency-set",
Usage: "Dependency-set configuration, point at JSON file.", Usage: "Dependency-set configuration, point at JSON file.",
...@@ -69,6 +74,7 @@ var requiredFlags = []cli.Flag{ ...@@ -69,6 +74,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
MockRunFlag, MockRunFlag,
DataDirSyncEndpointFlag,
} }
func init() { func init() {
...@@ -105,6 +111,7 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config { ...@@ -105,6 +111,7 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config {
L1RPC: ctx.String(L1RPCFlag.Name), L1RPC: ctx.String(L1RPCFlag.Name),
SyncSources: syncSourceSetups(ctx), SyncSources: syncSourceSetups(ctx),
Datadir: ctx.Path(DataDirFlag.Name), Datadir: ctx.Path(DataDirFlag.Name),
DatadirSyncEndpoint: ctx.Path(DataDirSyncEndpointFlag.Name),
} }
} }
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/sync"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
...@@ -72,6 +73,19 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg ...@@ -72,6 +73,19 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
return nil, fmt.Errorf("failed to load dependency set: %w", err) return nil, fmt.Errorf("failed to load dependency set: %w", err)
} }
// Sync the databases from the remote server if configured
// We only attempt to sync a database if it doesn't exist; we don't update existing databases
if cfg.DatadirSyncEndpoint != "" {
syncCfg := sync.Config{DataDir: cfg.Datadir, Logger: logger}
syncClient, err := sync.NewClient(syncCfg, cfg.DatadirSyncEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to create db sync client: %w", err)
}
if err := syncClient.SyncAll(ctx, depSet.Chains(), false); err != nil {
return nil, fmt.Errorf("failed to sync databases: %w", err)
}
}
// create initial per-chain resources // create initial per-chain resources
chainsDBs := db.NewChainsDB(logger, depSet) chainsDBs := db.NewChainsDB(logger, depSet)
......
package sync
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
var (
maxRetries = 25
retryStrategy = &retry.ExponentialStrategy{
Min: 1 * time.Second,
Max: 30 * time.Second,
MaxJitter: 250 * time.Millisecond,
}
)
// Client handles downloading files from a sync server.
type Client struct {
config Config
baseURL string
httpClient *client.BasicHTTPClient
}
// NewClient creates a new Client with the given config and server URL.
func NewClient(config Config, serverURL string) (*Client, error) {
// Verify root directory exists and is actually a directory
root, err := filepath.Abs(config.DataDir)
if err != nil {
return nil, fmt.Errorf("invalid root directory: %w", err)
}
rootInfo, err := os.Stat(root)
if err != nil {
return nil, fmt.Errorf("cannot access root directory: %w", err)
}
if !rootInfo.IsDir() {
return nil, fmt.Errorf("root path is not a directory: %s", root)
}
// Create the HTTP client
httpClient := client.NewBasicHTTPClient(serverURL, config.Logger)
return &Client{
config: config,
baseURL: serverURL,
httpClient: httpClient,
}, nil
}
// SyncAll syncs all known databases for the given chains.
func (c *Client) SyncAll(ctx context.Context, chains []types.ChainID, resume bool) error {
for _, chain := range chains {
for fileAlias := range Databases {
if err := c.SyncDatabase(ctx, chain, fileAlias, resume); err != nil {
return fmt.Errorf("failed to sync %s for chain %s: %w", fileAlias, chain, err)
}
}
}
return nil
}
// SyncDatabase downloads the named file from the server.
// If the local file exists, it will attempt to resume the download if resume is true.
func (c *Client) SyncDatabase(ctx context.Context, chainID types.ChainID, database Database, resume bool) error {
// Validate file alias
filePath, exists := Databases[database]
if !exists {
return fmt.Errorf("unknown file alias: %s", database)
}
// Ensure the chain directory exists
chainDir := filepath.Join(c.config.DataDir, chainID.String())
if err := os.MkdirAll(chainDir, 0755); err != nil {
return fmt.Errorf("failed to create chain directory: %w", err)
}
// Ensure the database file exists and get initial size
filePath = filepath.Join(chainDir, filePath)
var initialSize int64
if stat, err := os.Stat(filePath); err == nil {
initialSize = stat.Size()
}
// If we have data already and don't want to resume then stop now
if initialSize > 0 && !resume {
return nil
}
// Attempt to sync the file and retry until successful
err := retry.Do0(ctx, maxRetries, retryStrategy, func() error {
err := c.attemptSync(ctx, chainID, database, filePath, initialSize)
if err != nil {
c.logError("sync attempt failed", err, database)
return err
}
return nil
})
if err != nil {
return fmt.Errorf("failed to sync file: %w", err)
}
return nil
}
// attemptSync makes a single attempt to sync the file
func (c *Client) attemptSync(ctx context.Context, chainID types.ChainID, database Database, absPath string, initialSize int64) error {
// First do a HEAD request to get the file size
path := c.buildURLPath(chainID, database)
resp, err := c.httpClient.Get(ctx, path, nil, http.Header{"X-HTTP-Method-Override": []string{"HEAD"}})
if err != nil {
return fmt.Errorf("HEAD request failed: %w", err)
}
if err := resp.Body.Close(); err != nil {
return fmt.Errorf("HEAD request body failed to close: %w", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("HEAD request failed with status %d", resp.StatusCode)
}
totalSize, err := parseContentLength(resp.Header)
if err != nil {
return fmt.Errorf("invalid Content-Length: %w", err)
}
// If we already have the whole file, we're done
if initialSize == totalSize {
return nil
}
// Create the GET request
headers := make(http.Header)
if initialSize > 0 {
headers.Set("Range", fmt.Sprintf("bytes=%d-", initialSize))
}
resp, err = c.httpClient.Get(ctx, path, nil, headers)
if err != nil {
return fmt.Errorf("GET request failed: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.logError("failed to close response body", err, database)
}
}()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("GET request failed with status %d", resp.StatusCode)
}
// Open the output file in the appropriate mode
flag := os.O_CREATE | os.O_WRONLY
if resp.StatusCode == http.StatusPartialContent {
flag |= os.O_APPEND
}
f, err := os.OpenFile(absPath, flag, 0644)
if err != nil {
return fmt.Errorf("failed to open output file: %w", err)
}
defer func(f *os.File) {
if err := f.Close(); err != nil {
c.logError("failed to close output file", err, database)
}
}(f)
// Copy the data to disk
_, err = io.Copy(f, resp.Body)
if err != nil {
return fmt.Errorf("failed to copy data: %s", database)
}
return nil
}
// buildURLPath creates the URL path for a given database download request
func (c *Client) buildURLPath(chainID types.ChainID, database Database) string {
return fmt.Sprintf("dbsync/%s/%s", chainID.String(), database)
}
// parseContentLength parses the Content-Length header
func parseContentLength(h http.Header) (int64, error) {
v := h.Get("Content-Length")
if v == "" {
return 0, fmt.Errorf("missing Content-Length header")
}
return strconv.ParseInt(v, 10, 64)
}
// logError logs an error if a logger is configured
func (c *Client) logError(msg string, err error, database Database) {
if c.config.Logger != nil {
c.config.Logger.Error(msg,
"error", err,
"database", database,
)
}
}
package sync
import (
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// Server handles sync requests
type Server struct {
config Config
validChains map[types.ChainID]struct{}
}
// NewServer creates a new Server with the given config.
func NewServer(config Config, chains []types.ChainID) (*Server, error) {
// Convert root to absolute path for security
root, err := filepath.Abs(config.DataDir)
if err != nil {
return nil, fmt.Errorf("invalid root directory: %w", err)
}
// Verify root directory exists and is actually a directory
rootInfo, err := os.Stat(root)
if err != nil {
return nil, fmt.Errorf("cannot access root directory: %w", err)
}
if !rootInfo.IsDir() {
return nil, fmt.Errorf("root path is not a directory: %s", root)
}
// Build map of valid chains for efficient lookup
validChains := make(map[types.ChainID]struct{}, len(chains))
for _, chain := range chains {
validChains[chain] = struct{}{}
}
return &Server{
config: config,
validChains: validChains,
}, nil
}
func parsePath(path string) (types.ChainID, string, error) {
var (
chainID types.ChainID
fileAlias string
)
// Trim leading and trailing slashes and split into segments
segments := strings.Split(strings.Trim(path, "/"), "/")
if len(segments) < 2 {
return chainID, fileAlias, fmt.Errorf("invalid path: %s", path)
}
chainIDStr := segments[len(segments)-2]
fileAlias = segments[len(segments)-1]
if err := chainID.UnmarshalText([]byte(chainIDStr)); err != nil {
return chainID, fileAlias, fmt.Errorf("invalid chainID: %w", err)
}
return chainID, fileAlias, nil
}
// ServeHTTP implements http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Parse and validate the path
chainID, dbName, err := parsePath(r.URL.Path)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if _, ok := s.validChains[chainID]; !ok {
http.Error(w, "unsupported chainID", http.StatusNotFound)
return
}
// Get the path to the file based on the database name
db := Database(dbName)
fileName, exists := Databases[db]
if !exists {
http.Error(w, "file not found", http.StatusNotFound)
return
}
filePath := filepath.Join(s.config.DataDir, chainID.String(), fileName)
// Open the file for reading
file, err := os.Open(filePath)
if err != nil {
s.logError("error opening file", err, dbName)
http.Error(w, "file not found", http.StatusNotFound)
return
}
defer func(file *os.File) {
if file.Close() != nil {
s.logError("error closing file", err, dbName)
}
}(file)
// Get file info and set the headers
fileInfo, err := file.Stat()
if err != nil {
s.logError("error stating file", err, dbName)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
w.Header().Set("Last-Modified", fileInfo.ModTime().UTC().Format(http.TimeFormat))
// Handle HEAD requests by returning and GET requests by streaming the file
switch r.Method {
case http.MethodHead:
return
case http.MethodGet:
// Stream the file contents, including handling range requests
http.ServeContent(w, r, dbName, fileInfo.ModTime(), file)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
// logError logs an error iff a logger is configured.
func (s *Server) logError(msg string, err error, fileName string) {
if s.config.Logger != nil {
s.config.Logger.Error(msg, "error", err, "file", fileName)
}
}
package sync
import (
"github.com/ethereum/go-ethereum/log"
)
const (
DBLocalSafe Database = "local_safe"
DBCrossSafe Database = "cross_safe"
)
// Databases maps a database alias to its actual name on disk
var Databases = map[Database]string{
DBLocalSafe: "local_safe.db",
DBCrossSafe: "cross_safe.db",
}
type Database string
func (d Database) String() string {
return string(d)
}
func (d Database) File() string {
return Databases[d]
}
// Config contains all configuration for the Server or Client.
type Config struct {
DataDir string
Logger log.Logger
}
package sync
import (
"context"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func TestSyncBasic(t *testing.T) {
serverRoot, clientRoot := setupTest(t)
chainID := types.ChainID{1}
// Create a test file on the server
serverFile := filepath.Join(serverRoot, chainID.String(), DBLocalSafe.File())
createTestFile(t, serverFile, 1024)
// Setup server
serverCfg := Config{
DataDir: serverRoot,
}
server, err := NewServer(serverCfg, []types.ChainID{chainID})
require.NoError(t, err)
ts := httptest.NewServer(server)
defer ts.Close()
// Setup client
clientCfg := Config{
DataDir: clientRoot,
}
client, err := NewClient(clientCfg, ts.URL)
require.NoError(t, err)
// Perform sync
err = client.SyncDatabase(context.Background(), chainID, DBLocalSafe, false)
require.NoError(t, err)
compareFiles(t, serverFile, filepath.Join(clientRoot, chainID.String(), DBLocalSafe.File()))
}
func TestSyncResume(t *testing.T) {
serverRoot, clientRoot := setupTest(t)
chainID := types.ChainID{1} // Use chain ID 1 for testing
// Create a test file on the server and partial file on the client
serverFile := filepath.Join(serverRoot, chainID.String(), DBLocalSafe.File())
createTestFile(t, serverFile, 2*1024) // 2KB file
clientFile := filepath.Join(clientRoot, chainID.String(), DBLocalSafe.File())
createTestFile(t, clientFile, 1024) // 1KB partial file
// Setup server and client
serverCfg := Config{
DataDir: serverRoot,
}
server, err := NewServer(serverCfg, []types.ChainID{chainID})
require.NoError(t, err)
ts := httptest.NewServer(server)
defer ts.Close()
clientCfg := Config{
DataDir: clientRoot,
}
client, err := NewClient(clientCfg, ts.URL)
require.NoError(t, err)
// Perform sync
err = client.SyncDatabase(context.Background(), chainID, DBLocalSafe, true)
require.NoError(t, err)
compareFiles(t, serverFile, clientFile)
}
func TestSyncRetry(t *testing.T) {
serverRoot, clientRoot := setupTest(t)
chainID := types.ChainID{1}
// Create a test file
serverFile := filepath.Join(serverRoot, chainID.String(), DBLocalSafe.File())
createTestFile(t, serverFile, 1024)
// Setup server with flaky handler that fails twice before succeeding
serverCfg := Config{
DataDir: serverRoot,
}
server, err := NewServer(serverCfg, []types.ChainID{chainID})
require.NoError(t, err)
failureCount := 0
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if failureCount < 2 {
failureCount++
w.WriteHeader(http.StatusInternalServerError)
return
}
server.ServeHTTP(w, r)
})
ts := httptest.NewServer(handler)
defer ts.Close()
clientCfg := Config{
DataDir: clientRoot,
}
client, err := NewClient(clientCfg, ts.URL)
require.NoError(t, err)
// Perform sync
err = client.SyncDatabase(context.Background(), chainID, DBLocalSafe, false)
require.NoError(t, err)
require.Equal(t, 2, failureCount, "expected exactly 2 failures")
compareFiles(t, serverFile, filepath.Join(clientRoot, chainID.String(), DBLocalSafe.File()))
}
func TestSyncErrors(t *testing.T) {
serverRoot, clientRoot := setupTest(t)
chainID := types.ChainID{1}
serverCfg := Config{
DataDir: serverRoot,
}
server, err := NewServer(serverCfg, []types.ChainID{chainID})
require.NoError(t, err)
ts := httptest.NewServer(server)
defer ts.Close()
clientCfg := Config{
DataDir: clientRoot,
}
client, err := NewClient(clientCfg, ts.URL)
require.NoError(t, err)
t.Run("NonexistentFile", func(t *testing.T) {
err := client.SyncDatabase(context.Background(), chainID, "nonexistent", false)
require.Error(t, err)
})
t.Run("CancelledContext", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
err := client.SyncDatabase(ctx, chainID, DBLocalSafe, false)
require.ErrorIs(t, err, context.Canceled)
})
}
// setupTest creates test directories and test files
func setupTest(t *testing.T) (serverRoot, clientRoot string) {
return t.TempDir(), t.TempDir()
}
// createTestFile creates a file with given size and content
func createTestFile(t *testing.T, path string, size int64) {
t.Helper()
err := os.MkdirAll(filepath.Dir(path), 0755)
require.NoError(t, err)
f, err := os.Create(path)
require.NoError(t, err)
defer f.Close()
// Create deterministic content for testing; ASCII A-Za-z0-9
data := make([]byte, 1024)
for i := range data {
data[i] = byte((i % 62) + 65)
}
// Write the test data over and over until the desired size is reached
var written int64
for written < size {
toWrite := size - written
if toWrite > int64(len(data)) {
toWrite = int64(len(data))
}
n, err := f.Write(data[:toWrite])
require.NoError(t, err)
written += int64(n)
}
}
// compareFiles verifies two files are identical
func compareFiles(t *testing.T, path1, path2 string) {
file1, err := os.Open(path1)
require.NoError(t, err)
content1, err := io.ReadAll(file1)
require.NoError(t, err)
require.NoError(t, file1.Close())
file2, err := os.Open(path2)
require.NoError(t, err)
content2, err := io.ReadAll(file2)
require.NoError(t, err)
require.NoError(t, file2.Close())
require.Equal(t, content1, content2)
}
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics" "github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/sync"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
) )
...@@ -64,6 +65,9 @@ func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config. ...@@ -64,6 +65,9 @@ func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config.
if err := su.initRPCServer(cfg); err != nil { if err := su.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err) return fmt.Errorf("failed to start RPC server: %w", err)
} }
if err := su.initDBSync(ctx, cfg); err != nil {
return fmt.Errorf("failed to start DB sync server: %w", err)
}
return nil return nil
} }
...@@ -132,7 +136,7 @@ func (su *SupervisorService) initRPCServer(cfg *config.Config) error { ...@@ -132,7 +136,7 @@ func (su *SupervisorService) initRPCServer(cfg *config.Config) error {
cfg.RPC.ListenPort, cfg.RPC.ListenPort,
cfg.Version, cfg.Version,
oprpc.WithLogger(su.log), oprpc.WithLogger(su.log),
//oprpc.WithHTTPRecorder(su.metrics), // TODO(protocol-quest#286) hook up metrics to RPC server // oprpc.WithHTTPRecorder(su.metrics), // TODO(protocol-quest#286) hook up metrics to RPC server
) )
if cfg.RPC.EnableAdmin { if cfg.RPC.EnableAdmin {
su.log.Info("Admin RPC enabled") su.log.Info("Admin RPC enabled")
...@@ -152,10 +156,28 @@ func (su *SupervisorService) initRPCServer(cfg *config.Config) error { ...@@ -152,10 +156,28 @@ func (su *SupervisorService) initRPCServer(cfg *config.Config) error {
Service: &frontend.UpdatesFrontend{Supervisor: su.backend}, Service: &frontend.UpdatesFrontend{Supervisor: su.backend},
Authenticated: false, Authenticated: false,
}) })
su.rpcServer = server su.rpcServer = server
return nil return nil
} }
func (su *SupervisorService) initDBSync(ctx context.Context, cfg *config.Config) error {
syncCfg := sync.Config{
DataDir: cfg.Datadir,
Logger: su.log,
}
depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
if err != nil {
return fmt.Errorf("failed to load dependency set: %w", err)
}
handler, err := sync.NewServer(syncCfg, depSet.Chains())
if err != nil {
return fmt.Errorf("failed to create db sync handler: %w", err)
}
su.rpcServer.AddHandler("/dbsync", handler)
return nil
}
func (su *SupervisorService) Start(ctx context.Context) error { func (su *SupervisorService) Start(ctx context.Context) error {
su.log.Info("Starting JSON-RPC server") su.log.Info("Starting JSON-RPC server")
if err := su.rpcServer.Start(); err != nil { if err := su.rpcServer.Start(); err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment