Commit d6b0c35c authored by clabby's avatar clabby

WIP: `SyncClient` event loop

parent a08fc5e9
...@@ -280,13 +280,21 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error { ...@@ -280,13 +280,21 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) Start(ctx context.Context) error { func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Starting execution engine driver") n.log.Info("Starting execution engine driver")
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine // start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := n.l2Driver.Start() if err := n.l2Driver.Start(); err != nil {
if err != nil {
n.log.Error("Could not start a rollup node", "err", err) n.log.Error("Could not start a rollup node", "err", err)
return err return err
} }
// If the backup unsafe sync client is enabled, start its event loop
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Start(n.l2Driver.UnsafeL2Payloads); err != nil {
n.log.Error("Could not start the backup sync client", "err", err)
return err
}
}
return nil return nil
} }
...@@ -399,6 +407,13 @@ func (n *OpNode) Close() error { ...@@ -399,6 +407,13 @@ func (n *OpNode) Close() error {
if err := n.l2Driver.Close(); err != nil { if err := n.l2Driver.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err)) result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
} }
// If the L2 sync client is present & running, close it.
if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
}
}
} }
// close L2 engine RPC client // close L2 engine RPC client
......
...@@ -107,13 +107,13 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy ...@@ -107,13 +107,13 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy
snapshotLog: snapshotLog, snapshotLog: snapshotLog,
l1: l1, l1: l1,
l2: l2, l2: l2,
l2SyncCl: syncClient,
sequencer: sequencer, sequencer: sequencer,
network: network, network: network,
metrics: metrics, metrics: metrics,
l1HeadSig: make(chan eth.L1BlockRef, 10), l1HeadSig: make(chan eth.L1BlockRef, 10),
l1SafeSig: make(chan eth.L1BlockRef, 10), l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10), UnsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
L2SyncCl: syncClient,
} }
} }
...@@ -64,10 +64,13 @@ type Driver struct { ...@@ -64,10 +64,13 @@ type Driver struct {
l1SafeSig chan eth.L1BlockRef l1SafeSig chan eth.L1BlockRef
l1FinalizedSig chan eth.L1BlockRef l1FinalizedSig chan eth.L1BlockRef
// Backup unsafe sync client
L2SyncCl *sources.SyncClient
// L2 Signals: // L2 Signals:
unsafeL2Payloads chan *eth.ExecutionPayload
l2SyncCl *sources.SyncClient // Note: `UnsafeL2Payloads` is exposed so that the SyncClient can send payloads to the driver if it is enabled.
UnsafeL2Payloads chan *eth.ExecutionPayload
l1 L1Chain l1 L1Chain
l2 L2Chain l2 L2Chain
...@@ -134,7 +137,7 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa ...@@ -134,7 +137,7 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case s.unsafeL2Payloads <- payload: case s.UnsafeL2Payloads <- payload:
return nil return nil
} }
} }
...@@ -236,7 +239,7 @@ func (s *Driver) eventLoop() { ...@@ -236,7 +239,7 @@ func (s *Driver) eventLoop() {
// Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch // Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch
// missing payloads from the backup RPC (if it is configured). // missing payloads from the backup RPC (if it is configured).
s.checkForGapInUnsafeQueue(ctx) s.checkForGapInUnsafeQueue(ctx)
case payload := <-s.unsafeL2Payloads: case payload := <-s.UnsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
s.derivation.AddUnsafePayload(payload) s.derivation.AddUnsafePayload(payload)
...@@ -457,47 +460,19 @@ type hashAndErrorChannel struct { ...@@ -457,47 +460,19 @@ type hashAndErrorChannel struct {
} }
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC. // checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC.
// WARNING: This function fails silently (aside from warning logs). // WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides
// emitting warning logs) if the requests fail.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) { func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) {
start, end := s.derivation.GetUnsafeQueueGap() start, end := s.derivation.GetUnsafeQueueGap()
size := end - start size := end - start
// If there is a gap in the queue and a backup sync client is configured, attempt to retrieve the missing payloads from the backup RPC // If there is a gap in the queue and a backup sync client is configured, attempt to retrieve the missing payloads from the backup RPC
if size > 0 && s.l2SyncCl != nil { if size > 0 && s.L2SyncCl != nil {
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently. // Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue. // Concurrent requests are safe here due to the engine queue being a priority queue.
// TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited. // TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited.
for blockNumber := start; blockNumber < end; blockNumber++ { for blockNumber := start; blockNumber < end; blockNumber++ {
go s.fetchUnsafeBlockFromRpc(ctx, blockNumber) s.L2SyncCl.FetchUnsafeBlock <- blockNumber
} }
} }
} }
// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
func (s *Driver) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) {
s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber)
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
payload, err := s.l2SyncCl.PayloadByNumber(ctx, blockNumber)
if err != nil {
s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "err", err)
return
}
// TODO: Validate the integrity of the payload.
// Signature validation is not necessary here since the backup RPC is trusted.
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
return
}
s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel.
s.unsafeL2Payloads <- payload
s.log.Info("sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
}
package sources package sources
import ( import (
"context"
"errors"
"sync"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching" "github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type SyncClientInterface interface {
Start(unsafeL2Payloads chan *eth.ExecutionPayload) error
Close() error
fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64)
}
type SyncClient struct { type SyncClient struct {
*L2Client *L2Client
FetchUnsafeBlock chan uint64
done chan struct{}
unsafeL2Payloads chan *eth.ExecutionPayload
wg sync.WaitGroup
} }
var _ SyncClientInterface = (*SyncClient)(nil)
type SyncClientConfig struct { type SyncClientConfig struct {
L2ClientConfig L2ClientConfig
} }
...@@ -28,6 +45,72 @@ func NewSyncClient(client client.RPC, log log.Logger, metrics caching.Metrics, c ...@@ -28,6 +45,72 @@ func NewSyncClient(client client.RPC, log log.Logger, metrics caching.Metrics, c
} }
return &SyncClient{ return &SyncClient{
L2Client: l2Client, L2Client: l2Client,
FetchUnsafeBlock: make(chan uint64),
done: make(chan struct{}),
}, nil }, nil
} }
// Start starts up the state loop.
// The loop will have been started if err is not nil.
func (s *SyncClient) Start(unsafeL2Payloads chan *eth.ExecutionPayload) error {
if unsafeL2Payloads == nil {
return errors.New("unsafeL2Payloads channel must not be nil")
}
s.unsafeL2Payloads = unsafeL2Payloads
s.wg.Add(1)
go s.eventLoop()
return nil
}
// Close sends a signal to the event loop to stop.
func (s *SyncClient) Close() error {
s.done <- struct{}{}
s.wg.Wait()
return nil
}
// eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() {
defer s.wg.Done()
s.log.Info("starting sync client event loop")
for {
select {
case <-s.done:
return
case blockNumber := <-s.FetchUnsafeBlock:
s.fetchUnsafeBlockFromRpc(context.Background(), blockNumber)
}
}
}
// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) {
s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber)
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
payload, err := s.PayloadByNumber(ctx, blockNumber)
if err != nil {
s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "err", err)
return
}
// TODO: Validate the integrity of the payload. Is this required?
// Signature validation is not necessary here since the backup RPC is trusted.
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
return
}
s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel.
s.unsafeL2Payloads <- payload
s.log.Info("sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment