Commit 54d5ab5b authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Remove alt RPC sync (#8593)

The P2P Alt sync protocol has proven it's worth & we no longer need this
alternate sync mechanism. Removing it will simplify how prepare for the
op-node changes required for snap sync.
parent 75a2cd9a
...@@ -563,11 +563,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -563,11 +563,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
for name, rollupCfg := range cfg.Nodes { for name, rollupCfg := range cfg.Nodes {
configureL1(rollupCfg, sys.EthInstances["l1"]) configureL1(rollupCfg, sys.EthInstances["l1"])
configureL2(rollupCfg, sys.EthInstances[name], cfg.JWTSecret) configureL2(rollupCfg, sys.EthInstances[name], cfg.JWTSecret)
rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: nil,
TrustRPC: false,
}
} }
// Geth Clients // Geth Clients
......
...@@ -604,79 +604,6 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -604,79 +604,6 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptSeq.BlockHash) require.Contains(t, received, receiptSeq.BlockHash)
} }
// TestSystemRPCAltSync sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
//
// Test steps:
// 1. Spin up the nodes (P2P is disabled on the verifier)
// 2. Send a transaction to the sequencer.
// 3. Wait for the TX to be mined on the sequencer chain.
// 5. Wait for the verifier to detect a gap in the payload queue vs. the unsafe head
// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain.
// 7. Wait for the verifier to sync the unsafe chain into the safe chain.
// 8. Verify that the TX is included in the verifier's safe chain.
func TestSystemRPCAltSync(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
// the default is nil, but this may change in the future.
// This test must ensure the blocks are not synced via Gossip, but instead via the alt RPC based sync.
cfg.P2PTopology = nil
// Disable batcher, so there will not be any L1 data to sync from
cfg.DisableBatcher = true
var published, received []string
seqTracer, verifTracer := new(FnTracer), new(FnTracer)
// The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.ID().String())
}
// Blocks are now received via the RPC based alt-sync method
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received = append(received, payload.ID().String())
}
cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["verifier"].Tracer = verifTracer
sys, err := cfg.Start(t, SystemConfigOption{
key: "afterRollupNodeStart",
role: "sequencer",
action: func(sCfg *SystemConfig, system *System) {
cfg.Nodes["verifier"].L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: client.NewBaseRPCClient(system.RawClients["sequencer"]),
}
},
})
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
l2Verif := sys.Clients["verifier"]
// Transactor Account
ethPrivKey := cfg.Secrets.Alice
// Submit a TX to L2 sequencer node
receiptSeq := SendL2Tx(t, cfg, l2Seq, ethPrivKey, func(opts *TxOpts) {
opts.ToAddr = &common.Address{0xff, 0xff}
opts.Value = big.NewInt(1_000_000_000)
// Wait for alt RPC sync to pick up the blocks on the sequencer chain
opts.VerifyOnClients(l2Verif)
})
// Sometimes we get duplicate blocks on the sequencer which makes this test flaky
published = slices.Compact(published)
received = slices.Compact(received)
// Verify that the tx was received via RPC sync (P2P is disabled)
require.Contains(t, received, eth.BlockID{Hash: receiptSeq.BlockHash, Number: receiptSeq.BlockNumber.Uint64()}.String())
// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received))
require.ElementsMatch(t, received, published[:len(received)])
}
func TestSystemP2PAltSync(t *testing.T) { func TestSystemP2PAltSync(t *testing.T) {
InitParallel(t) InitParallel(t)
...@@ -760,7 +687,6 @@ func TestSystemP2PAltSync(t *testing.T) { ...@@ -760,7 +687,6 @@ func TestSystemP2PAltSync(t *testing.T) {
// Configure the new rollup node that'll be syncing // Configure the new rollup node that'll be syncing
var syncedPayloads []string var syncedPayloads []string
syncNodeCfg := &rollupNode.Config{ syncNodeCfg := &rollupNode.Config{
L2Sync: &rollupNode.PreparedL2SyncEndpoint{Client: nil},
Driver: driver.Config{VerifierConfDepth: 0}, Driver: driver.Config{VerifierConfDepth: 0},
Rollup: *sys.RollupConfig, Rollup: *sys.RollupConfig,
P2PSigner: nil, P2PSigner: nil,
......
...@@ -225,18 +225,6 @@ var ( ...@@ -225,18 +225,6 @@ var (
EnvVars: prefixEnvVars("HEARTBEAT_URL"), EnvVars: prefixEnvVars("HEARTBEAT_URL"),
Value: "https://heartbeat.optimism.io", Value: "https://heartbeat.optimism.io",
} }
BackupL2UnsafeSyncRPC = &cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc",
Usage: "Set the backup L2 unsafe sync RPC endpoint.",
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC"),
}
BackupL2UnsafeSyncRPCTrustRPC = &cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc.trustrpc",
Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." +
"This does not include checks if the blockhash is part of the canonical chain.",
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
}
RollupHalt = &cli.StringFlag{ RollupHalt = &cli.StringFlag{
Name: "rollup.halt", Name: "rollup.halt",
Usage: "Opt-in option to halt on incompatible protocol version requirements of the given level (major/minor/patch/none), as signaled onchain in L1", Usage: "Opt-in option to halt on incompatible protocol version requirements of the given level (major/minor/patch/none), as signaled onchain in L1",
...@@ -281,6 +269,19 @@ var ( ...@@ -281,6 +269,19 @@ var (
EnvVars: prefixEnvVars("BETA_EXTRA_NETWORKS"), EnvVars: prefixEnvVars("BETA_EXTRA_NETWORKS"),
Hidden: true, // hidden, this is deprecated, the flag is not used anymore. Hidden: true, // hidden, this is deprecated, the flag is not used anymore.
} }
BackupL2UnsafeSyncRPC = &cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc",
Usage: "Set the backup L2 unsafe sync RPC endpoint.",
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC"),
Hidden: true,
}
BackupL2UnsafeSyncRPCTrustRPC = &cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc.trustrpc",
Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." +
"This does not include checks if the blockhash is part of the canonical chain.",
EnvVars: prefixEnvVars("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Hidden: true,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -320,8 +321,6 @@ var optionalFlags = []cli.Flag{ ...@@ -320,8 +321,6 @@ var optionalFlags = []cli.Flag{
HeartbeatEnabledFlag, HeartbeatEnabledFlag,
HeartbeatMonikerFlag, HeartbeatMonikerFlag,
HeartbeatURLFlag, HeartbeatURLFlag,
BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC,
RollupHalt, RollupHalt,
RollupLoadProtocolVersions, RollupLoadProtocolVersions,
L1RethDBPath, L1RethDBPath,
...@@ -333,6 +332,8 @@ var DeprecatedFlags = []cli.Flag{ ...@@ -333,6 +332,8 @@ var DeprecatedFlags = []cli.Flag{
L2EngineSyncEnabled, L2EngineSyncEnabled,
SkipSyncStartCheck, SkipSyncStartCheck,
BetaExtraNetworks, BetaExtraNetworks,
BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC,
// Deprecated P2P Flags are added at the init step // Deprecated P2P Flags are added at the init step
} }
......
...@@ -21,13 +21,6 @@ type L2EndpointSetup interface { ...@@ -21,13 +21,6 @@ type L2EndpointSetup interface {
Check() error Check() error
} }
type L2SyncEndpointSetup interface {
// Setup a RPC client to another L2 node to sync L2 blocks from.
// It may return a nil client with nil error if RPC based sync is not enabled.
Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (cl client.RPC, rpcCfg *sources.SyncClientConfig, err error)
Check() error
}
type L1EndpointSetup interface { type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from. // Setup a RPC client to a L1 node to pull rollup input-data from.
// The results of the RPC client may be trusted for faster processing, or strictly validated. // The results of the RPC client may be trusted for faster processing, or strictly validated.
...@@ -89,50 +82,6 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger, rollupC ...@@ -89,50 +82,6 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger, rollupC
return p.Client, sources.EngineClientDefaultConfig(rollupCfg), nil return p.Client, sources.EngineClientDefaultConfig(rollupCfg), nil
} }
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct {
// Address of the L2 RPC to use for backup sync, may be empty if RPC alt-sync is disabled.
L2NodeAddr string
TrustRPC bool
}
var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)
// Setup creates an RPC client to sync from.
// It will return nil without error if no sync method is configured.
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.SyncClientConfig, error) {
if cfg.L2NodeAddr == "" {
return nil, nil, nil
}
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil {
return nil, nil, err
}
return l2Node, sources.SyncClientDefaultConfig(rollupCfg, cfg.TrustRPC), nil
}
func (cfg *L2SyncEndpointConfig) Check() error {
// empty addr is valid, as it is optional.
return nil
}
type PreparedL2SyncEndpoint struct {
// RPC endpoint to use for syncing, may be nil if RPC alt-sync is disabled.
Client client.RPC
TrustRPC bool
}
var _ L2SyncEndpointSetup = (*PreparedL2SyncEndpoint)(nil)
func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.SyncClientConfig, error) {
return cfg.Client, sources.SyncClientDefaultConfig(rollupCfg, cfg.TrustRPC), nil
}
func (cfg *PreparedL2SyncEndpoint) Check() error {
return nil
}
type L1EndpointConfig struct { type L1EndpointConfig struct {
L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required) L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required)
......
...@@ -17,9 +17,8 @@ import ( ...@@ -17,9 +17,8 @@ import (
) )
type Config struct { type Config struct {
L1 L1EndpointSetup L1 L1EndpointSetup
L2 L2EndpointSetup L2 L2EndpointSetup
L2Sync L2SyncEndpointSetup
Driver driver.Config Driver driver.Config
...@@ -119,11 +118,11 @@ func (cfg *Config) LoadPersisted(log log.Logger) error { ...@@ -119,11 +118,11 @@ func (cfg *Config) LoadPersisted(log log.Logger) error {
// Check verifies that the given configuration makes sense // Check verifies that the given configuration makes sense
func (cfg *Config) Check() error { func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil { if err := cfg.L1.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %w", err) return fmt.Errorf("l2 endpoint config error: %w", err)
} }
if err := cfg.L2Sync.Check(); err != nil { if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("sync config error: %w", err) return fmt.Errorf("l2 endpoint config error: %w", err)
} }
if err := cfg.Rollup.Check(); err != nil { if err := cfg.Rollup.Check(); err != nil {
return fmt.Errorf("rollup config error: %w", err) return fmt.Errorf("rollup config error: %w", err)
......
...@@ -47,7 +47,6 @@ type OpNode struct { ...@@ -47,7 +47,6 @@ type OpNode struct {
l1Source *sources.L1Client // L1 Client to fetch data from l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
rpcSync *sources.SyncClient // Alt-sync RPC client, optional (may be nil)
server *rpcServer // RPC server hosting the rollup-node API server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
...@@ -121,9 +120,6 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) ...@@ -121,9 +120,6 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initRuntimeConfig(ctx, cfg); err != nil { // depends on L2, to signal initial runtime values to if err := n.initRuntimeConfig(ctx, cfg); err != nil { // depends on L2, to signal initial runtime values to
return fmt.Errorf("failed to init the runtime config: %w", err) return fmt.Errorf("failed to init the runtime config: %w", err)
} }
if err := n.initRPCSync(ctx, cfg); err != nil {
return fmt.Errorf("failed to init RPC sync: %w", err)
}
if err := n.initP2PSigner(ctx, cfg); err != nil { if err := n.initP2PSigner(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the P2P signer: %w", err) return fmt.Errorf("failed to init the P2P signer: %w", err)
} }
...@@ -314,22 +310,6 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -314,22 +310,6 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return nil return nil
} }
func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error {
rpcSyncClient, rpcCfg, err := cfg.L2Sync.Setup(ctx, n.log, &cfg.Rollup)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
}
if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client
return nil
}
syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, rpcCfg)
if err != nil {
return fmt.Errorf("failed to create sync client: %w", err)
}
n.rpcSync = syncClient
return nil
}
func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error { func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
server, err := newRPCServer(ctx, &cfg.RPC, &cfg.Rollup, n.l2Source.L2Client, n.l2Driver, n.log, n.appVersion, n.metrics) server, err := newRPCServer(ctx, &cfg.RPC, &cfg.Rollup, n.l2Source.L2Client, n.l2Driver, n.log, n.appVersion, n.metrics)
if err != nil { if err != nil {
...@@ -432,22 +412,11 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error { ...@@ -432,22 +412,11 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) Start(ctx context.Context) error { func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Starting execution engine driver") n.log.Info("Starting execution engine driver")
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine // start driving engine: sync blocks by deriving them from L1 and driving them into the engine
if err := n.l2Driver.Start(); err != nil { if err := n.l2Driver.Start(); err != nil {
n.log.Error("Could not start a rollup node", "err", err) n.log.Error("Could not start a rollup node", "err", err)
return err return err
} }
// If the backup unsafe sync client is enabled, start its event loop
if n.rpcSync != nil {
if err := n.rpcSync.Start(); err != nil {
n.log.Error("Could not start the backup sync client", "err", err)
return err
}
n.log.Info("Started L2-RPC sync service")
}
log.Info("Rollup node started") log.Info("Rollup node started")
return nil return nil
} }
...@@ -527,9 +496,6 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e ...@@ -527,9 +496,6 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e
} }
func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.rpcSync != nil {
return n.rpcSync.RequestL2Range(ctx, start, end)
}
if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() { if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
if unixTimeStale(start.Time, 12*time.Hour) { if unixTimeStale(start.Time, 12*time.Hour) {
n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time) n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
...@@ -601,13 +567,6 @@ func (n *OpNode) Stop(ctx context.Context) error { ...@@ -601,13 +567,6 @@ func (n *OpNode) Stop(ctx context.Context) error {
if err := n.l2Driver.Close(); err != nil { if err := n.l2Driver.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err)) result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
} }
// If the L2 sync client is present & running, close it.
if n.rpcSync != nil {
if err := n.rpcSync.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
}
}
} }
// Wait for the runtime config loader to be done using the data sources before closing them // Wait for the runtime config loader to be done using the data sources before closing them
......
...@@ -63,8 +63,6 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -63,8 +63,6 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load l2 endpoints info: %w", err) return nil, fmt.Errorf("failed to load l2 endpoints info: %w", err)
} }
l2SyncEndpoint := NewL2SyncEndpointConfig(ctx)
syncConfig, err := NewSyncConfig(ctx, log) syncConfig, err := NewSyncConfig(ctx, log)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create the sync config: %w", err) return nil, fmt.Errorf("failed to create the sync config: %w", err)
...@@ -78,7 +76,6 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -78,7 +76,6 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
cfg := &node.Config{ cfg := &node.Config{
L1: l1Endpoint, L1: l1Endpoint,
L2: l2Endpoint, L2: l2Endpoint,
L2Sync: l2SyncEndpoint,
Rollup: *rollupConfig, Rollup: *rollupConfig,
Driver: *driverConfig, Driver: *driverConfig,
RPC: node.RPCConfig{ RPC: node.RPCConfig{
...@@ -163,15 +160,6 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf ...@@ -163,15 +160,6 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
}, nil }, nil
} }
// NewL2SyncEndpointConfig returns a pointer to a L2SyncEndpointConfig if the
// flag is set, otherwise nil.
func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
return &node.L2SyncEndpointConfig{
L2NodeAddr: ctx.String(flags.BackupL2UnsafeSyncRPC.Name),
TrustRPC: ctx.Bool(flags.BackupL2UnsafeSyncRPCTrustRPC.Name),
}
}
func NewConfigPersistence(ctx *cli.Context) node.ConfigPersistence { func NewConfigPersistence(ctx *cli.Context) node.ConfigPersistence {
stateFile := ctx.String(flags.RPCAdminPersistence.Name) stateFile := ctx.String(flags.RPCAdminPersistence.Name)
if stateFile == "" { if stateFile == "" {
......
package sources
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
)
var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not be nil")
// RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"
// receivePayload queues the received payload for processing.
// This may return an error if there's no capacity for the payload.
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error
type RPCSync interface {
io.Closer
// Start starts an additional worker syncing job
Start() error
// RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface.
RequestL2Range(ctx context.Context, start uint64, end eth.L2BlockRef) error
}
// SyncClient implements the driver AltSync interface, including support for fetching an open-ended chain of L2 blocks.
type SyncClient struct {
*L2Client
requests chan uint64
resCtx context.Context
resCancel context.CancelFunc
receivePayload receivePayload
wg sync.WaitGroup
}
type SyncClientConfig struct {
L2ClientConfig
}
func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientConfig {
return &SyncClientConfig{
*L2ClientDefaultConfig(config, trustRPC),
}
}
func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) {
l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig)
if err != nil {
return nil, err
}
// This resource context is shared between all workers that may be started
resCtx, resCancel := context.WithCancel(context.Background())
return &SyncClient{
L2Client: l2Client,
resCtx: resCtx,
resCancel: resCancel,
requests: make(chan uint64, 128),
receivePayload: receiver,
}, nil
}
// Start starts the syncing background work. This may not be called after Close().
func (s *SyncClient) Start() error {
// TODO(CLI-3635): we can start multiple event loop runners as workers, to parallelize the work
s.wg.Add(1)
go s.eventLoop()
return nil
}
// Close sends a signal to close all concurrent syncing work.
func (s *SyncClient) Close() error {
s.resCancel()
s.wg.Wait()
return nil
}
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
// Drain previous requests now that we have new information
for len(s.requests) > 0 {
select { // in case requests is being read at the same time, don't block on draining it.
case <-s.requests:
default:
break
}
}
endNum := end.Number
if end == (eth.L2BlockRef{}) {
n, err := s.rollupCfg.TargetBlockNumber(uint64(time.Now().Unix()))
if err != nil {
return err
}
if n <= start.Number {
return nil
}
endNum = n
}
// TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.
s.log.Info("Scheduling to fetch trailing missing payloads from backup RPC", "start", start, "end", endNum, "size", endNum-start.Number-1)
for i := start.Number + 1; i < endNum; i++ {
select {
case s.requests <- i:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() {
defer s.wg.Done()
s.log.Info("Starting sync client event loop")
backoffStrategy := &retry.ExponentialStrategy{
Min: 1000 * time.Millisecond,
Max: 20_000 * time.Millisecond,
MaxJitter: 250 * time.Millisecond,
}
for {
select {
case <-s.resCtx.Done():
s.log.Debug("Shutting down RPC sync worker")
return
case reqNum := <-s.requests:
_, err := retry.Do(s.resCtx, 5, backoffStrategy, func() (interface{}, error) {
// Limit the maximum time for fetching payloads
ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
defer cancel()
// We are only fetching one block at a time here.
return nil, s.fetchUnsafeBlockFromRpc(ctx, reqNum)
})
if err != nil {
if err == s.resCtx.Err() {
return
}
s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum)
// Reschedule at end of queue
select {
case s.requests <- reqNum:
default:
// drop syncing job if we are too busy with sync jobs already.
}
}
}
}
}
// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
//
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) error {
s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
payload, err := s.PayloadByNumber(ctx, blockNumber)
if err != nil {
return fmt.Errorf("failed to fetch payload by number (%d): %w", blockNumber, err)
}
// Note: the underlying RPC client used for syncing verifies the execution payload blockhash, if set to untrusted.
s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel.
if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
return fmt.Errorf("failed to send payload %s into the driver's unsafeL2Payloads channel: %w", payload.ID(), err)
} else {
s.log.Debug("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
return nil
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment