Commit 5c971c0f authored by clabby's avatar clabby

Pull logic out of state loop; Update `GetUnsafeQueueGap` to return an exclusive (`[a, b)`) range

parent 2a59c43f
......@@ -663,23 +663,20 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return io.EOF
}
// GetUnsafeQueueGap retrieves the current size, start, and end of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, all values will be 0.
// Note: The range returned by this function is *inclusive*.
func (eq *EngineQueue) GetUnsafeQueueGap() (size uint64, start uint64, end uint64) {
// GetUnsafeQueueGap retrieves the current [start, end) range of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the start and end will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap() (start uint64, end uint64) {
first := eq.unsafePayloads.Peek()
// If the parent hash of the first unsafe payload does not match the current unsafe head, then there is a gap.
if first.ParentHash != eq.unsafeHead.Hash {
// The gap starts at the unsafe head + 1
start = eq.unsafeHead.Number + 1
// The gap ends at the parent block of the first unsafe payload in the priority queue.
end = first.ID().Number - 1
// The size of the gap is the difference between the exclusive end and inclusive start.
size = first.ID().Number - start
// The gap ends at the parent block of the first unsafe payload in the priority queue, but we return the exclusive bound.
end = first.ID().Number
return size, start, end
return start, end
} else {
return 0, 0, 0
return 0, 0
}
}
......@@ -51,7 +51,7 @@ type EngineQueueStage interface {
Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap() (uint64, uint64, uint64)
GetUnsafeQueueGap() (uint64, uint64)
Step(context.Context) error
}
......@@ -161,9 +161,9 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload)
}
// GetUnsafeQueueGap retrieves the current size, start, and end of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, all values will be 0.
func (dp *DerivationPipeline) GetUnsafeQueueGap() (uint64, uint64, uint64) {
// GetUnsafeQueueGap retrieves the current [start, end) range of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the start and end will be 0.
func (dp *DerivationPipeline) GetUnsafeQueueGap() (uint64, uint64) {
return dp.eng.GetUnsafeQueueGap()
}
......
......@@ -48,7 +48,7 @@ type DerivationPipeline interface {
Reset()
Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap() (uint64, uint64, uint64)
GetUnsafeQueueGap() (uint64, uint64)
Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef
......
......@@ -232,59 +232,9 @@ func (s *Driver) eventLoop() {
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
// TODO: Should this be lower-priority in the switch case?
case <-altSyncTicker.C:
size, start, end := s.derivation.GetUnsafeQueueGap()
// If there is a gap in the queue and a backup sync RPC is configured, attempt to retrieve the missing payloads from the backup RPC
if size > 0 && s.config.BackupL2UnsafeSyncRPC != "" {
// Dial the backup unsafe sync RPC.
client, err := rpc.DialHTTP(s.config.BackupL2UnsafeSyncRPC)
if err != nil {
s.log.Warn("failed to dial backup unsafe sync RPC", "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
continue
}
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue.
// TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited.
for i := start; i <= end; i++ {
go func(blockNumber uint64) {
s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC)
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
// Fetch the next unsafe block from the backup unsafe sync RPC.
var block *types.Block
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err = client.CallContext(timeoutCtx, &block, "eth_getBlockByNumber", blockNumber); err != nil {
s.log.Warn("failed to retireve unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
return
}
// Convert the received block to a `eth.ExecutionPayload`.
payload, err := eth.BlockAsPayload(block)
if err != nil {
s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
return
}
// TODO: Validate the integrity of the payload.
// Signature validation is not necessary here since the backup RPC is trusted. (?)
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
return
}
s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
// Send the retrieved payload to the `unsafeL2Payloads` channel.
s.unsafeL2Payloads <- payload
s.log.Info("inserted received unsafe payload into priority queue", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
}(i)
}
}
// Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch
// missing payloads from the backup RPC (if it is configured).
s.checkForGapInUnsafeQueue(ctx)
case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
......@@ -504,3 +454,65 @@ type hashAndErrorChannel struct {
hash common.Hash
err chan error
}
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC.
// WARNING: This function fails silently (aside from warning logs).
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) {
start, end := s.derivation.GetUnsafeQueueGap()
size := end - start
// If there is a gap in the queue and a backup sync RPC is configured, attempt to retrieve the missing payloads from the backup RPC
if size > 0 && s.config.BackupL2UnsafeSyncRPC != "" {
// Dial the backup unsafe sync RPC.
client, err := rpc.DialHTTP(s.config.BackupL2UnsafeSyncRPC)
if err != nil {
s.log.Warn("failed to dial backup unsafe sync RPC", "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
}
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue.
// TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited.
for blockNumber := start; blockNumber < end; blockNumber++ {
go s.fetchUnsafeBlockFromRpc(ctx, blockNumber, client)
}
}
}
// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
func (s *Driver) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64, client *rpc.Client) {
s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC)
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
// Fetch the next unsafe block from the backup unsafe sync RPC.
var block *types.Block
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err := client.CallContext(timeoutCtx, &block, "eth_getBlockByNumber", blockNumber); err != nil {
s.log.Warn("failed to retrieve unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
return
}
// Convert the received block to a `eth.ExecutionPayload`.
payload, err := eth.BlockAsPayload(block)
if err != nil {
s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
return
}
// TODO: Validate the integrity of the payload.
// Signature validation is not necessary here since the backup RPC is trusted. (?)
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
return
}
s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
// Send the retrieved payload to the `unsafeL2Payloads` channel.
s.unsafeL2Payloads <- payload
s.log.Info("inserted received unsafe payload into priority queue", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment