Commit 2a59c43f authored by clabby's avatar clabby

Move logic into driver loop

parent 9309f9d4
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -371,54 +370,6 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { ...@@ -371,54 +370,6 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID()) eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop() eq.unsafePayloads.Pop()
} }
// Request the payload that builds upon the current unsafe head from the fallback RPC.
// This is a temporary alternative sync method- in the future, this will be done over the p2p network.
if eq.cfg.BackupL2UnsafeSyncRPC != "" {
eq.log.Info("requesting unsafe payload from backup RPC", "unsafe head", eq.unsafeHead.ID(), "first unsafe payload", first.ID(), "backup rpc", eq.cfg.BackupL2UnsafeSyncRPC)
// TODO: Create a client for the backup RPC and request the payload from the backup sync RPC via the `eth_getBlockByNumber` method.
// Once the payload has been received, verify its integrity and push it into the priority queue.
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
// Dial the backup unsafe sync RPC.
// TODO: Should this request block this thread (with a reasonable timeout) so that we can attempt to continue when the payload
// has been received and pushed into the priority queue? Or should it be made concurrently?
client, err := rpc.DialHTTP(eq.cfg.BackupL2UnsafeSyncRPC)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to dial backup unsafe sync RPC: %w", err))
}
// Fetch the next unsafe block from the backup unsafe sync RPC.
var block *types.Block
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err = client.CallContext(timeoutCtx, &block, "eth_getBlockByNumber", eq.unsafeHead.Number+1); err != nil {
return NewTemporaryError(fmt.Errorf("failed to get next unsafe block from backup unsafe sync RPC: %w", err))
}
// Convert the received block to a `eth.ExecutionPayload`.
payload, err := eth.BlockAsPayload(block)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to convert block to execution payload: %w", err))
}
// TODO: Validate the integrity of the payload.
if _, ok := payload.CheckBlockHash(); !ok {
return NewTemporaryError(fmt.Errorf("received invalid payload from backup unsafe sync RPC; invalid block hash"))
}
eq.log.Info("received unsafe payload from backup RPC", "payload", payload.ID(), "backup rpc", eq.cfg.BackupL2UnsafeSyncRPC)
// Add the received execution payload to the unsafe payload priority queue.
eq.AddUnsafePayload(payload)
eq.log.Info("inserted received unsafe payload into priority queue", "payload", payload.ID(), "backup rpc", eq.cfg.BackupL2UnsafeSyncRPC)
// TODO: Should we attempt to continue here, or wait for the next iteration of the state loop and still return EOF?
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload return io.EOF // time to go to next stage if we cannot process the first unsafe payload
} }
...@@ -711,3 +662,24 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -711,3 +662,24 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
eq.logSyncProgress("reset derivation work") eq.logSyncProgress("reset derivation work")
return io.EOF return io.EOF
} }
// GetUnsafeQueueGap retrieves the current size, start, and end of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, all values will be 0.
// Note: The range returned by this function is *inclusive*.
func (eq *EngineQueue) GetUnsafeQueueGap() (size uint64, start uint64, end uint64) {
first := eq.unsafePayloads.Peek()
// If the parent hash of the first unsafe payload does not match the current unsafe head, then there is a gap.
if first.ParentHash != eq.unsafeHead.Hash {
// The gap starts at the unsafe head + 1
start = eq.unsafeHead.Number + 1
// The gap ends at the parent block of the first unsafe payload in the priority queue.
end = first.ID().Number - 1
// The size of the gap is the difference between the exclusive end and inclusive start.
size = first.ID().Number - start
return size, start, end
} else {
return 0, 0, 0
}
}
...@@ -51,6 +51,7 @@ type EngineQueueStage interface { ...@@ -51,6 +51,7 @@ type EngineQueueStage interface {
Finalize(l1Origin eth.L1BlockRef) Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap() (uint64, uint64, uint64)
Step(context.Context) error Step(context.Context) error
} }
...@@ -160,6 +161,12 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { ...@@ -160,6 +161,12 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload) dp.eng.AddUnsafePayload(payload)
} }
// GetUnsafeQueueGap retrieves the current size, start, and end of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, all values will be 0.
func (dp *DerivationPipeline) GetUnsafeQueueGap() (uint64, uint64, uint64) {
return dp.eng.GetUnsafeQueueGap()
}
// Step tries to progress the buffer. // Step tries to progress the buffer.
// An EOF is returned if there pipeline is blocked by waiting for new L1 data. // An EOF is returned if there pipeline is blocked by waiting for new L1 data.
// If ctx errors no error is returned, but the step may exit early in a state that can still be continued. // If ctx errors no error is returned, but the step may exit early in a state that can still be continued.
......
...@@ -48,6 +48,7 @@ type DerivationPipeline interface { ...@@ -48,6 +48,7 @@ type DerivationPipeline interface {
Reset() Reset()
Step(ctx context.Context) error Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap() (uint64, uint64, uint64)
Finalize(ref eth.L1BlockRef) Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
......
...@@ -11,7 +11,9 @@ import ( ...@@ -11,7 +11,9 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -195,6 +197,11 @@ func (s *Driver) eventLoop() { ...@@ -195,6 +197,11 @@ func (s *Driver) eventLoop() {
sequencerTimer.Reset(delay) sequencerTimer.Reset(delay)
} }
// Create a ticker to check if there is a gap in the engine queue every minute
// If there is, we send requests to the backup RPC to retrieve the missing payloads
// and add them to the unsafe queue.
altSyncTicker := time.NewTicker(60 * time.Second)
for { for {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors. // This may adjust at any time based on fork-choice changes or previous errors.
...@@ -223,6 +230,61 @@ func (s *Driver) eventLoop() { ...@@ -223,6 +230,61 @@ func (s *Driver) eventLoop() {
} }
} }
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
// TODO: Should this be lower-priority in the switch case?
case <-altSyncTicker.C:
size, start, end := s.derivation.GetUnsafeQueueGap()
// If there is a gap in the queue and a backup sync RPC is configured, attempt to retrieve the missing payloads from the backup RPC
if size > 0 && s.config.BackupL2UnsafeSyncRPC != "" {
// Dial the backup unsafe sync RPC.
client, err := rpc.DialHTTP(s.config.BackupL2UnsafeSyncRPC)
if err != nil {
s.log.Warn("failed to dial backup unsafe sync RPC", "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
continue
}
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue.
// TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited.
for i := start; i <= end; i++ {
go func(blockNumber uint64) {
s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC)
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
// Fetch the next unsafe block from the backup unsafe sync RPC.
var block *types.Block
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err = client.CallContext(timeoutCtx, &block, "eth_getBlockByNumber", blockNumber); err != nil {
s.log.Warn("failed to retireve unsafe payload from backup RPC", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
return
}
// Convert the received block to a `eth.ExecutionPayload`.
payload, err := eth.BlockAsPayload(block)
if err != nil {
s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "backup rpc", s.config.BackupL2UnsafeSyncRPC, "err", err)
return
}
// TODO: Validate the integrity of the payload.
// Signature validation is not necessary here since the backup RPC is trusted. (?)
if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
return
}
s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
// Send the retrieved payload to the `unsafeL2Payloads` channel.
s.unsafeL2Payloads <- payload
s.log.Info("inserted received unsafe payload into priority queue", "payload", payload.ID(), "backup rpc", s.config.BackupL2UnsafeSyncRPC)
}(i)
}
}
case payload := <-s.unsafeL2Payloads: case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment