Commit a08fc5e9 authored by clabby's avatar clabby

WIP: Add `SyncClient` wrapper

parent 9df505ef
...@@ -19,6 +19,11 @@ type L2EndpointSetup interface { ...@@ -19,6 +19,11 @@ type L2EndpointSetup interface {
Check() error Check() error
} }
type L2SyncEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
Check() error
}
type L1EndpointSetup interface { type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from. // Setup a RPC client to a L1 node to pull rollup input-data from.
// The results of the RPC client may be trusted for faster processing, or strictly validated. // The results of the RPC client may be trusted for faster processing, or strictly validated.
...@@ -75,6 +80,31 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client ...@@ -75,6 +80,31 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client
return p.Client, nil return p.Client, nil
} }
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct {
// HTTP Address of the L2 RPC to use for backup sync
L2NodeAddr string
}
var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil {
return nil, err
}
return l2Node, nil
}
func (cfg *L2SyncEndpointConfig) Check() error {
if cfg.L2NodeAddr == "" {
return errors.New("empty L2 Node Address")
}
return nil
}
type L1EndpointConfig struct { type L1EndpointConfig struct {
L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required) L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required)
......
...@@ -13,8 +13,9 @@ import ( ...@@ -13,8 +13,9 @@ import (
) )
type Config struct { type Config struct {
L1 L1EndpointSetup L1 L1EndpointSetup
L2 L2EndpointSetup L2 L2EndpointSetup
L2Sync L2SyncEndpointSetup
Driver driver.Config Driver driver.Config
......
...@@ -197,7 +197,24 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -197,7 +197,24 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n.log, snapshotLog, n.metrics) var syncClient *sources.SyncClient
// If there is an RPC url present in the config, create a sync client
if err := cfg.L2Sync.Check(); err == nil {
rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
}
// The sync client's RPC is always trusted
config := sources.SyncClientDefaultConfig(&cfg.Rollup, true)
syncClient, err = sources.NewSyncClient(rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
if err != nil {
return fmt.Errorf("failed to create sync client: %w", err)
}
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics)
return nil return nil
} }
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
) )
type Metrics interface { type Metrics interface {
...@@ -81,7 +82,7 @@ type Network interface { ...@@ -81,7 +82,7 @@ type Network interface {
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, syncClient *sources.SyncClient, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
...@@ -106,6 +107,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne ...@@ -106,6 +107,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne
snapshotLog: snapshotLog, snapshotLog: snapshotLog,
l1: l1, l1: l1,
l2: l2, l2: l2,
l2SyncCl: syncClient,
sequencer: sequencer, sequencer: sequencer,
network: network, network: network,
metrics: metrics, metrics: metrics,
......
...@@ -11,13 +11,12 @@ import ( ...@@ -11,13 +11,12 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-service/backoff" "github.com/ethereum-optimism/optimism/op-service/backoff"
) )
...@@ -68,6 +67,8 @@ type Driver struct { ...@@ -68,6 +67,8 @@ type Driver struct {
// L2 Signals: // L2 Signals:
unsafeL2Payloads chan *eth.ExecutionPayload unsafeL2Payloads chan *eth.ExecutionPayload
l2SyncCl *sources.SyncClient
l1 L1Chain l1 L1Chain
l2 L2Chain l2 L2Chain
sequencer SequencerIface sequencer SequencerIface
...@@ -461,58 +462,42 @@ func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) { ...@@ -461,58 +462,42 @@ func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) {
start, end := s.derivation.GetUnsafeQueueGap() start, end := s.derivation.GetUnsafeQueueGap()
size := end - start size := end - start
// If there is a gap in the queue and a backup sync RPC is configured, attempt to retrieve the missing payloads from the backup RPC // If there is a gap in the queue and a backup sync client is configured, attempt to retrieve the missing payloads from the backup RPC
if size > 0 && s.config.BackupL2UnsafeSyncRPC != "" { if size > 0 && s.l2SyncCl != nil {
// Dial the backup unsafe sync RPC.
client, err := rpc.DialHTTP(s.config.BackupL2UnsafeSyncRPC)
if err != nil {
s.log.Warn("failed to dial backup unsafe sync RPC", "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
}
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently. // Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue. // Concurrent requests are safe here due to the engine queue being a priority queue.
// TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited. // TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited.
for blockNumber := start; blockNumber < end; blockNumber++ { for blockNumber := start; blockNumber < end; blockNumber++ {
go s.fetchUnsafeBlockFromRpc(ctx, blockNumber, client) go s.fetchUnsafeBlockFromRpc(ctx, blockNumber)
} }
} }
} }
// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC. // fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs). // WARNING: This function fails silently (aside from warning logs).
func (s *Driver) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64, client *rpc.Client) { func (s *Driver) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) {
s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC) s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber)
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now, // TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available. // the `eth_getBlockByNumber` method is more widely available.
// Fetch the next unsafe block from the backup unsafe sync RPC. payload, err := s.l2SyncCl.PayloadByNumber(ctx, blockNumber)
var block *types.Block
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err := client.CallContext(timeoutCtx, &block, "eth_getBlockByNumber", blockNumber); err != nil {
s.log.Warn("failed to retrieve unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
return
}
// Convert the received block to a `eth.ExecutionPayload`.
payload, err := eth.BlockAsPayload(block)
if err != nil { if err != nil {
s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err) s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "err", err)
return return
} }
// TODO: Validate the integrity of the payload. // TODO: Validate the integrity of the payload.
// Signature validation is not necessary here since the backup RPC is trusted. (?) // Signature validation is not necessary here since the backup RPC is trusted.
if _, ok := payload.CheckBlockHash(); !ok { if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC) s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
return return
} }
s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC) s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel. // Send the retrieved payload to the `unsafeL2Payloads` channel.
s.unsafeL2Payloads <- payload s.unsafeL2Payloads <- payload
s.log.Info("inserted received unsafe payload into priority queue", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC) s.log.Info("sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
} }
...@@ -36,11 +36,6 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -36,11 +36,6 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, err return nil, err
} }
// If the backup sync RPC flag is set, then use it over the value present in the config file.
if backupSyncRPC := ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name); backupSyncRPC != "" {
rollupConfig.BackupL2UnsafeSyncRPC = backupSyncRPC
}
driverConfig, err := NewDriverConfig(ctx) driverConfig, err := NewDriverConfig(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -66,9 +61,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -66,9 +61,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load l2 endpoints info: %w", err) return nil, fmt.Errorf("failed to load l2 endpoints info: %w", err)
} }
l2SyncEndpoint := NewL2SyncEndpointConfig(ctx)
cfg := &node.Config{ cfg := &node.Config{
L1: l1Endpoint, L1: l1Endpoint,
L2: l2Endpoint, L2: l2Endpoint,
L2Sync: l2SyncEndpoint,
Rollup: *rollupConfig, Rollup: *rollupConfig,
Driver: *driverConfig, Driver: *driverConfig,
RPC: node.RPCConfig{ RPC: node.RPCConfig{
...@@ -139,6 +137,12 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf ...@@ -139,6 +137,12 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
}, nil }, nil
} }
func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
return &node.L2SyncEndpointConfig{
L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name),
}
}
func NewDriverConfig(ctx *cli.Context) (*driver.Config, error) { func NewDriverConfig(ctx *cli.Context) (*driver.Config, error) {
return &driver.Config{ return &driver.Config{
VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name), VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name),
......
package sources
import (
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum/go-ethereum/log"
)
type SyncClient struct {
*L2Client
}
type SyncClientConfig struct {
L2ClientConfig
}
func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientConfig {
return &SyncClientConfig{
*L2ClientDefaultConfig(config, trustRPC),
}
}
func NewSyncClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) {
l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig)
if err != nil {
return nil, err
}
return &SyncClient{
L2Client: l2Client,
}, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment