Commit d8a7d711 authored by clabby's avatar clabby

:broom: Squash

:broom: x2

Mark's nits pt. 1

Buffer `FetchUnsafeBlock` channel

Bump channel size

Don't rely on P2P to detect the gap; Test WIP

Resolve conflicts

Fix test

:broom:, fix `op-node` PayloadQueue test after dedup change

:broom:

Don't hardcode sequencer RPC port

:broom: `SystemConfig` hooks

Attempt CI fix

Force e2e setup to loop through nodes in alphabetical order

Temp: Add Kelvin's CI fix
parent d6b0c35c
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"math/big" "math/big"
"os" "os"
"path" "path"
"sort"
"strings" "strings"
"testing" "testing"
"time" "time"
...@@ -119,14 +120,6 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -119,14 +120,6 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
JWTFilePath: writeDefaultJWT(t), JWTFilePath: writeDefaultJWT(t),
JWTSecret: testingJWTSecret, JWTSecret: testingJWTSecret,
Nodes: map[string]*rollupNode.Config{ Nodes: map[string]*rollupNode.Config{
"verifier": {
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
"sequencer": { "sequencer": {
Driver: driver.Config{ Driver: driver.Config{
VerifierConfDepth: 0, VerifierConfDepth: 0,
...@@ -141,6 +134,14 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -141,6 +134,14 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
}, },
L1EpochPollInterval: time.Second * 4, L1EpochPollInterval: time.Second * 4,
}, },
"verifier": {
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
},
}, },
Loggers: map[string]log.Logger{ Loggers: map[string]log.Logger{
"verifier": testlog.Logger(t, log.LvlInfo).New("role", "verifier"), "verifier": testlog.Logger(t, log.LvlInfo).New("role", "verifier"),
...@@ -225,7 +226,43 @@ func (sys *System) Close() { ...@@ -225,7 +226,43 @@ func (sys *System) Close() {
sys.Mocknet.Close() sys.Mocknet.Close()
} }
func (cfg SystemConfig) Start() (*System, error) { type systemConfigHook func(sCfg *SystemConfig, s *System)
type SystemConfigOption struct {
key string
role string
action systemConfigHook
}
type SystemConfigOptions struct {
opts map[string]systemConfigHook
}
func NewSystemConfigOptions(_opts []SystemConfigOption) (SystemConfigOptions, error) {
opts := make(map[string]systemConfigHook)
for _, opt := range _opts {
if _, ok := opts[opt.key+":"+opt.role]; ok {
return SystemConfigOptions{}, fmt.Errorf("duplicate option for key %s and role %s", opt.key, opt.role)
}
opts[opt.key+":"+opt.role] = opt.action
}
return SystemConfigOptions{
opts: opts,
}, nil
}
func (s *SystemConfigOptions) Get(key, role string) (systemConfigHook, bool) {
v, ok := s.opts[key+":"+role]
return v, ok
}
func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
opts, err := NewSystemConfigOptions(_opts)
if err != nil {
return nil, err
}
sys := &System{ sys := &System{
cfg: cfg, cfg: cfg,
Nodes: make(map[string]*node.Node), Nodes: make(map[string]*node.Node),
...@@ -457,7 +494,17 @@ func (cfg SystemConfig) Start() (*System, error) { ...@@ -457,7 +494,17 @@ func (cfg SystemConfig) Start() (*System, error) {
snapLog.SetHandler(log.DiscardHandler()) snapLog.SetHandler(log.DiscardHandler())
// Rollup nodes // Rollup nodes
for name, nodeConfig := range cfg.Nodes {
// Ensure we are looping through the nodes in alphabetical order
ks := make([]string, 0, len(cfg.Nodes))
for k := range cfg.Nodes {
ks = append(ks, k)
}
// Sort strings in ascending alphabetical order
sort.Strings(ks)
for _, name := range ks {
nodeConfig := cfg.Nodes[name]
c := *nodeConfig // copy c := *nodeConfig // copy
c.Rollup = makeRollupConfig() c.Rollup = makeRollupConfig()
...@@ -482,6 +529,10 @@ func (cfg SystemConfig) Start() (*System, error) { ...@@ -482,6 +529,10 @@ func (cfg SystemConfig) Start() (*System, error) {
return nil, err return nil, err
} }
sys.RollupNodes[name] = node sys.RollupNodes[name] = node
if action, ok := opts.Get("afterRollupNodeStart", name); ok {
action(&cfg, sys)
}
} }
if cfg.P2PTopology != nil { if cfg.P2PTopology != nil {
......
...@@ -649,6 +649,90 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -649,6 +649,90 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptVerif.BlockHash) require.Contains(t, received, receiptVerif.BlockHash)
} }
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
//
// Test steps:
// 1. Spin up the nodes (P2P is disabled on the verifier)
// 2. Send a transaction to the sequencer.
// 3. Wait for the TX to be mined on the sequencer chain.
// 5. Wait for the verifier to detect a gap in the payload queue vs. the unsafe head
// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain.
// 7. Wait for the verifier to sync the unsafe chain into the safe chain.
// 8. Verify that the TX is included in the verifier's safe chain.
func TestSystemMockAltSync(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := DefaultSystemConfig(t)
// slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do.
// Keep the seq window small so the L2 chain is started quick
cfg.DeployConfig.L1BlockTime = 10
var published, received []common.Hash
seqTracer, verifTracer := new(FnTracer), new(FnTracer)
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.BlockHash)
}
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received = append(received, payload.BlockHash)
}
cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["verifier"].Tracer = verifTracer
sys, err := cfg.Start(SystemConfigOption{
key: "afterRollupNodeStart",
role: "sequencer",
action: func(sCfg *SystemConfig, system *System) {
rpc, _ := system.Nodes["sequencer"].Attach() // never errors
cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{
Rpc: client.NewBaseRPCClient(rpc),
}
},
})
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
l2Verif := sys.Clients["verifier"]
// Transactor Account
ethPrivKey := cfg.Secrets.Alice
// Submit a TX to L2 sequencer node
toAddr := common.Address{0xff, 0xff}
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Nonce: 0,
To: &toAddr,
Value: big.NewInt(1_000_000_000),
GasTipCap: big.NewInt(10),
GasFeeCap: big.NewInt(200),
Gas: 21000,
})
err = l2Seq.SendTransaction(context.Background(), tx)
require.Nil(t, err, "Sending L2 tx to sequencer")
// Wait for tx to be mined on the L2 sequencer chain
receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer")
// Wait for alt RPC sync to pick up the blocks on the sequencer chain
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 12*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, receiptSeq, receiptVerif)
// Verify that the tx was received via RPC sync (P2P is disabled)
require.Contains(t, received, receiptVerif.BlockHash)
// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received))
require.ElementsMatch(t, received, published[:len(received)])
}
// TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node. // TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node.
func TestSystemDenseTopology(t *testing.T) { func TestSystemDenseTopology(t *testing.T) {
parallel(t) parallel(t)
......
...@@ -82,7 +82,7 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client ...@@ -82,7 +82,7 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint // L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct { type L2SyncEndpointConfig struct {
// HTTP Address of the L2 RPC to use for backup sync // Address of the L2 RPC to use for backup sync
L2NodeAddr string L2NodeAddr string
} }
...@@ -105,6 +105,25 @@ func (cfg *L2SyncEndpointConfig) Check() error { ...@@ -105,6 +105,25 @@ func (cfg *L2SyncEndpointConfig) Check() error {
return nil return nil
} }
type L2SyncRPCConfig struct {
// RPC endpoint to use for syncing
Rpc client.RPC
}
var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil)
func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return cfg.Rpc, nil
}
func (cfg *L2SyncRPCConfig) Check() error {
if cfg.Rpc == nil {
return errors.New("rpc cannot be nil")
}
return nil
}
type L1EndpointConfig struct { type L1EndpointConfig struct {
L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required) L1NodeAddr string // Address of L1 User JSON-RPC endpoint to use (eth namespace required)
......
...@@ -198,8 +198,11 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -198,8 +198,11 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
} }
var syncClient *sources.SyncClient var syncClient *sources.SyncClient
// If there is an RPC url present in the config, create a sync client // If the L2 sync config is present, use it to create a sync client
if err := cfg.L2Sync.Check(); err == nil { if cfg.L2Sync != nil {
if err := cfg.L2Sync.Check(); err != nil {
log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err)
} else {
rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log) rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log)
if err != nil { if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err) return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
...@@ -208,11 +211,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -208,11 +211,12 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
// The sync client's RPC is always trusted // The sync client's RPC is always trusted
config := sources.SyncClientDefaultConfig(&cfg.Rollup, true) config := sources.SyncClientDefaultConfig(&cfg.Rollup, true)
syncClient, err = sources.NewSyncClient(rpcSyncClient, n.log, n.metrics.L2SourceCache, config) syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
if err != nil { if err != nil {
return fmt.Errorf("failed to create sync client: %w", err) return fmt.Errorf("failed to create sync client: %w", err)
} }
} }
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics) n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics)
...@@ -289,7 +293,7 @@ func (n *OpNode) Start(ctx context.Context) error { ...@@ -289,7 +293,7 @@ func (n *OpNode) Start(ctx context.Context) error {
// If the backup unsafe sync client is enabled, start its event loop // If the backup unsafe sync client is enabled, start its event loop
if n.l2Driver.L2SyncCl != nil { if n.l2Driver.L2SyncCl != nil {
if err := n.l2Driver.L2SyncCl.Start(n.l2Driver.UnsafeL2Payloads); err != nil { if err := n.l2Driver.L2SyncCl.Start(); err != nil {
n.log.Error("Could not start the backup sync client", "err", err) n.log.Error("Could not start the backup sync client", "err", err)
return err return err
} }
......
...@@ -133,6 +133,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M ...@@ -133,6 +133,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
unsafePayloads: PayloadsQueue{ unsafePayloads: PayloadsQueue{
MaxSize: maxUnsafePayloadsMemory, MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize, SizeFn: payloadMemSize,
blockNos: make(map[uint64]bool),
}, },
prev: prev, prev: prev,
l1Fetcher: l1Fetcher, l1Fetcher: l1Fetcher,
...@@ -663,18 +664,19 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -663,18 +664,19 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return io.EOF return io.EOF
} }
// GetUnsafeQueueGap retrieves the current [start, end) range of the gap between the tip of the unsafe priority queue and the unsafe head. // GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the start and end will be 0. // If there is no gap, the difference between end and start will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap() (start uint64, end uint64) { func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) {
// If the parent hash of the first unsafe payload does not match the current unsafe head, then there is a gap. // The start of the gap is always the unsafe head + 1
if first := eq.unsafePayloads.Peek(); first != nil && first.ParentHash != eq.unsafeHead.Hash {
// The gap starts at the unsafe head + 1
start = eq.unsafeHead.Number + 1 start = eq.unsafeHead.Number + 1
// The gap ends at the parent block of the first unsafe payload in the priority queue, but we return the exclusive bound.
end = first.ID().Number
return start, end // If the priority queue is empty, the end is the first block number at the top of the priority queue
// Otherwise, the end is the expected block number
if first := eq.unsafePayloads.Peek(); first != nil {
end = first.ID().Number
} else { } else {
return 0, 0 end = expectedNumber
} }
return start, end
} }
...@@ -77,6 +77,7 @@ type PayloadsQueue struct { ...@@ -77,6 +77,7 @@ type PayloadsQueue struct {
pq payloadsByNumber pq payloadsByNumber
currentSize uint64 currentSize uint64
MaxSize uint64 MaxSize uint64
blockNos map[uint64]bool
SizeFn func(p *eth.ExecutionPayload) uint64 SizeFn func(p *eth.ExecutionPayload) uint64
} }
...@@ -99,6 +100,9 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { ...@@ -99,6 +100,9 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
if p == nil { if p == nil {
return errors.New("cannot add nil payload") return errors.New("cannot add nil payload")
} }
if upq.blockNos[p.ID().Number] {
return errors.New("cannot add duplicate payload")
}
size := upq.SizeFn(p) size := upq.SizeFn(p)
if size > upq.MaxSize { if size > upq.MaxSize {
return fmt.Errorf("cannot add payload %s, payload mem size %d is larger than max queue size %d", p.ID(), size, upq.MaxSize) return fmt.Errorf("cannot add payload %s, payload mem size %d is larger than max queue size %d", p.ID(), size, upq.MaxSize)
...@@ -111,6 +115,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { ...@@ -111,6 +115,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
for upq.currentSize > upq.MaxSize { for upq.currentSize > upq.MaxSize {
upq.Pop() upq.Pop()
} }
upq.blockNos[p.ID().Number] = true
return nil return nil
} }
...@@ -132,5 +137,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload { ...@@ -132,5 +137,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload {
} }
ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep
upq.currentSize -= ps.size upq.currentSize -= ps.size
// remove the key from the blockNos map
delete(upq.blockNos, ps.payload.ID().Number)
return ps.payload return ps.payload
} }
...@@ -77,6 +77,7 @@ func TestPayloadsQueue(t *testing.T) { ...@@ -77,6 +77,7 @@ func TestPayloadsQueue(t *testing.T) {
pq := PayloadsQueue{ pq := PayloadsQueue{
MaxSize: payloadMemFixedCost * 3, MaxSize: payloadMemFixedCost * 3,
SizeFn: payloadMemSize, SizeFn: payloadMemSize,
blockNos: make(map[uint64]bool),
} }
require.Equal(t, 0, pq.Len()) require.Equal(t, 0, pq.Len())
require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek())
...@@ -85,6 +86,7 @@ func TestPayloadsQueue(t *testing.T) { ...@@ -85,6 +86,7 @@ func TestPayloadsQueue(t *testing.T) {
a := &eth.ExecutionPayload{BlockNumber: 3} a := &eth.ExecutionPayload{BlockNumber: 3}
b := &eth.ExecutionPayload{BlockNumber: 4} b := &eth.ExecutionPayload{BlockNumber: 4}
c := &eth.ExecutionPayload{BlockNumber: 5} c := &eth.ExecutionPayload{BlockNumber: 5}
d := &eth.ExecutionPayload{BlockNumber: 6}
bAlt := &eth.ExecutionPayload{BlockNumber: 4} bAlt := &eth.ExecutionPayload{BlockNumber: 4}
require.NoError(t, pq.Push(b)) require.NoError(t, pq.Push(b))
require.Equal(t, pq.Len(), 1) require.Equal(t, pq.Len(), 1)
...@@ -105,28 +107,33 @@ func TestPayloadsQueue(t *testing.T) { ...@@ -105,28 +107,33 @@ func TestPayloadsQueue(t *testing.T) {
require.Equal(t, pq.Pop(), a) require.Equal(t, pq.Pop(), a)
require.Equal(t, pq.Len(), 2, "expecting to pop the lowest") require.Equal(t, pq.Len(), 2, "expecting to pop the lowest")
require.NoError(t, pq.Push(bAlt)) require.Equal(t, pq.Peek(), b, "expecting b to be lowest, compared to c")
require.Equal(t, pq.Len(), 3)
require.Equal(t, pq.Peek(), b, "expecting b to be lowest, compared to bAlt and c")
require.Equal(t, pq.Pop(), b) require.Equal(t, pq.Pop(), b)
require.Equal(t, pq.Len(), 2)
require.Equal(t, pq.MemSize(), 2*payloadMemFixedCost)
require.Equal(t, pq.Pop(), bAlt)
require.Equal(t, pq.Len(), 1) require.Equal(t, pq.Len(), 1)
require.Equal(t, pq.Peek(), c, "expecting c to only remain") require.Equal(t, pq.MemSize(), payloadMemFixedCost)
require.Equal(t, pq.Pop(), c)
require.Equal(t, pq.Len(), 0, "expecting no items to remain")
d := &eth.ExecutionPayload{BlockNumber: 5, Transactions: []eth.Data{make([]byte, payloadMemFixedCost*3+1)}} e := &eth.ExecutionPayload{BlockNumber: 5, Transactions: []eth.Data{make([]byte, payloadMemFixedCost*3+1)}}
require.Error(t, pq.Push(d), "cannot add payloads that are too large") require.Error(t, pq.Push(e), "cannot add payloads that are too large")
require.NoError(t, pq.Push(b)) require.NoError(t, pq.Push(b))
require.Equal(t, pq.Len(), 1, "expecting b")
require.Equal(t, pq.Peek(), b)
require.NoError(t, pq.Push(c))
require.Equal(t, pq.Len(), 2, "expecting b, c") require.Equal(t, pq.Len(), 2, "expecting b, c")
require.Equal(t, pq.Peek(), b) require.Equal(t, pq.Peek(), b)
require.NoError(t, pq.Push(a)) require.NoError(t, pq.Push(a))
require.Equal(t, pq.Len(), 3, "expecting a, b, c") require.Equal(t, pq.Len(), 3, "expecting a, b, c")
require.Equal(t, pq.Peek(), a) require.Equal(t, pq.Peek(), a)
require.NoError(t, pq.Push(bAlt))
require.Equal(t, pq.Len(), 3, "expecting b, bAlt, c") // No duplicates allowed
require.Error(t, pq.Push(bAlt))
require.NoError(t, pq.Push(d))
require.Equal(t, pq.Len(), 3)
require.Equal(t, pq.Peek(), b, "expecting b, c, d")
require.NotContainsf(t, pq.pq[:], a, "a should be dropped after 3 items already exist under max size constraint") require.NotContainsf(t, pq.pq[:], a, "a should be dropped after 3 items already exist under max size constraint")
} }
...@@ -51,7 +51,7 @@ type EngineQueueStage interface { ...@@ -51,7 +51,7 @@ type EngineQueueStage interface {
Finalize(l1Origin eth.L1BlockRef) Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap() (uint64, uint64) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64)
Step(context.Context) error Step(context.Context) error
} }
...@@ -161,10 +161,10 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { ...@@ -161,10 +161,10 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload) dp.eng.AddUnsafePayload(payload)
} }
// GetUnsafeQueueGap retrieves the current [start, end) range of the gap between the tip of the unsafe priority queue and the unsafe head. // GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the start and end will be 0. // If there is no gap, the start and end will be 0.
func (dp *DerivationPipeline) GetUnsafeQueueGap() (uint64, uint64) { func (dp *DerivationPipeline) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) {
return dp.eng.GetUnsafeQueueGap() return dp.eng.GetUnsafeQueueGap(expectedNumber)
} }
// Step tries to progress the buffer. // Step tries to progress the buffer.
......
...@@ -49,7 +49,7 @@ type DerivationPipeline interface { ...@@ -49,7 +49,7 @@ type DerivationPipeline interface {
Reset() Reset()
Step(ctx context.Context) error Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap() (uint64, uint64) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64)
Finalize(ref eth.L1BlockRef) Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
...@@ -113,7 +113,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy ...@@ -113,7 +113,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy
l1HeadSig: make(chan eth.L1BlockRef, 10), l1HeadSig: make(chan eth.L1BlockRef, 10),
l1SafeSig: make(chan eth.L1BlockRef, 10), l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10),
UnsafeL2Payloads: make(chan *eth.ExecutionPayload, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
L2SyncCl: syncClient, L2SyncCl: syncClient,
} }
} }
...@@ -69,8 +69,7 @@ type Driver struct { ...@@ -69,8 +69,7 @@ type Driver struct {
// L2 Signals: // L2 Signals:
// Note: `UnsafeL2Payloads` is exposed so that the SyncClient can send payloads to the driver if it is enabled. unsafeL2Payloads chan *eth.ExecutionPayload
UnsafeL2Payloads chan *eth.ExecutionPayload
l1 L1Chain l1 L1Chain
l2 L2Chain l2 L2Chain
...@@ -137,7 +136,7 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa ...@@ -137,7 +136,7 @@ func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case s.UnsafeL2Payloads <- payload: case s.unsafeL2Payloads <- payload:
return nil return nil
} }
} }
...@@ -201,10 +200,11 @@ func (s *Driver) eventLoop() { ...@@ -201,10 +200,11 @@ func (s *Driver) eventLoop() {
sequencerTimer.Reset(delay) sequencerTimer.Reset(delay)
} }
// Create a ticker to check if there is a gap in the engine queue every minute // Create a ticker to check if there is a gap in the engine queue every 15 seconds
// If there is, we send requests to the backup RPC to retrieve the missing payloads // If there is, we send requests to the backup RPC to retrieve the missing payloads
// and add them to the unsafe queue. // and add them to the unsafe queue.
altSyncTicker := time.NewTicker(60 * time.Second) altSyncTicker := time.NewTicker(15 * time.Second)
defer altSyncTicker.Stop()
for { for {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
...@@ -234,12 +234,13 @@ func (s *Driver) eventLoop() { ...@@ -234,12 +234,13 @@ func (s *Driver) eventLoop() {
} }
} }
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
// TODO: Should this be lower-priority in the switch case?
case <-altSyncTicker.C: case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch // Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch
// missing payloads from the backup RPC (if it is configured). // missing payloads from the backup RPC (if it is configured).
if s.L2SyncCl != nil {
s.checkForGapInUnsafeQueue(ctx) s.checkForGapInUnsafeQueue(ctx)
case payload := <-s.UnsafeL2Payloads: }
case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
s.derivation.AddUnsafePayload(payload) s.derivation.AddUnsafePayload(payload)
...@@ -463,16 +464,31 @@ type hashAndErrorChannel struct { ...@@ -463,16 +464,31 @@ type hashAndErrorChannel struct {
// WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides // WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides
// emitting warning logs) if the requests fail. // emitting warning logs) if the requests fail.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) { func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) {
start, end := s.derivation.GetUnsafeQueueGap() // subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that
// difference by the block time to get the expected L2 block number at the current time. If the
// unsafe head does not have this block number, then there is a gap in the queue.
wallClock := uint64(time.Now().Unix())
genesisTimestamp := s.config.Genesis.L2Time
wallClockGenesisDiff := wallClock - genesisTimestamp
expectedL2Block := wallClockGenesisDiff / s.config.BlockTime
start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block)
size := end - start size := end - start
// If there is a gap in the queue and a backup sync client is configured, attempt to retrieve the missing payloads from the backup RPC // Check if there is a gap between the unsafe head and the expected L2 block number at the current time.
if size > 0 && s.L2SyncCl != nil { if size > 0 {
s.log.Warn("Gap in payload queue tip and expected unsafe chain detected", "start", start, "end", end, "size", size)
s.log.Info("Attempting to fetch missing payloads from backup RPC", "start", start, "end", end, "size", size)
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently. // Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue. // Concurrent requests are safe here due to the engine queue being a priority queue.
// TODO: Should enforce a max gap size to prevent spamming the backup RPC or being rate limited. for blockNumber := start; blockNumber <= end; blockNumber++ {
for blockNumber := start; blockNumber < end; blockNumber++ { select {
s.L2SyncCl.FetchUnsafeBlock <- blockNumber case s.L2SyncCl.FetchUnsafeBlock <- blockNumber:
// Do nothing- the block number was successfully sent into the channel
default:
return // If the channel is full, return and wait for the next iteration of the event loop
}
} }
} }
} }
...@@ -36,10 +36,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -36,10 +36,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, err return nil, err
} }
driverConfig, err := NewDriverConfig(ctx) driverConfig := NewDriverConfig(ctx)
if err != nil {
return nil, err
}
p2pSignerSetup, err := p2pcli.LoadSignerSetup(ctx) p2pSignerSetup, err := p2pcli.LoadSignerSetup(ctx)
if err != nil { if err != nil {
...@@ -51,10 +48,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -51,10 +48,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load p2p config: %w", err) return nil, fmt.Errorf("failed to load p2p config: %w", err)
} }
l1Endpoint, err := NewL1EndpointConfig(ctx) l1Endpoint := NewL1EndpointConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load l1 endpoint info: %w", err)
}
l2Endpoint, err := NewL2EndpointConfig(ctx, log) l2Endpoint, err := NewL2EndpointConfig(ctx, log)
if err != nil { if err != nil {
...@@ -99,12 +93,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -99,12 +93,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return cfg, nil return cfg, nil
} }
func NewL1EndpointConfig(ctx *cli.Context) (*node.L1EndpointConfig, error) { func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig {
return &node.L1EndpointConfig{ return &node.L1EndpointConfig{
L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name), L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name), L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))), L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))),
}, nil }
} }
func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConfig, error) { func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConfig, error) {
...@@ -137,19 +131,21 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf ...@@ -137,19 +131,21 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
}, nil }, nil
} }
// NewL2SyncEndpointConfig returns a pointer to a L2SyncEndpointConfig if the
// flag is set, otherwise nil.
func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig { func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
return &node.L2SyncEndpointConfig{ return &node.L2SyncEndpointConfig{
L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name), L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name),
} }
} }
func NewDriverConfig(ctx *cli.Context) (*driver.Config, error) { func NewDriverConfig(ctx *cli.Context) *driver.Config {
return &driver.Config{ return &driver.Config{
VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name), VerifierConfDepth: ctx.GlobalUint64(flags.VerifierL1Confs.Name),
SequencerConfDepth: ctx.GlobalUint64(flags.SequencerL1Confs.Name), SequencerConfDepth: ctx.GlobalUint64(flags.SequencerL1Confs.Name),
SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name), SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name),
SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name), SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name),
}, nil }
} }
func NewRollupConfig(ctx *cli.Context) (*rollup.Config, error) { func NewRollupConfig(ctx *cli.Context) (*rollup.Config, error) {
......
...@@ -10,10 +10,18 @@ import ( ...@@ -10,10 +10,18 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching" "github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
) )
var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not be nil")
// RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error
type SyncClientInterface interface { type SyncClientInterface interface {
Start(unsafeL2Payloads chan *eth.ExecutionPayload) error Start() error
Close() error Close() error
fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64)
} }
...@@ -22,7 +30,7 @@ type SyncClient struct { ...@@ -22,7 +30,7 @@ type SyncClient struct {
*L2Client *L2Client
FetchUnsafeBlock chan uint64 FetchUnsafeBlock chan uint64
done chan struct{} done chan struct{}
unsafeL2Payloads chan *eth.ExecutionPayload receivePayload receivePayload
wg sync.WaitGroup wg sync.WaitGroup
} }
...@@ -38,7 +46,7 @@ func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientCo ...@@ -38,7 +46,7 @@ func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientCo
} }
} }
func NewSyncClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) { func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) {
l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig) l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -46,19 +54,15 @@ func NewSyncClient(client client.RPC, log log.Logger, metrics caching.Metrics, c ...@@ -46,19 +54,15 @@ func NewSyncClient(client client.RPC, log log.Logger, metrics caching.Metrics, c
return &SyncClient{ return &SyncClient{
L2Client: l2Client, L2Client: l2Client,
FetchUnsafeBlock: make(chan uint64), FetchUnsafeBlock: make(chan uint64, 128),
done: make(chan struct{}), done: make(chan struct{}),
receivePayload: receiver,
}, nil }, nil
} }
// Start starts up the state loop. // Start starts up the state loop.
// The loop will have been started if err is not nil. // The loop will have been started if err is not nil.
func (s *SyncClient) Start(unsafeL2Payloads chan *eth.ExecutionPayload) error { func (s *SyncClient) Start() error {
if unsafeL2Payloads == nil {
return errors.New("unsafeL2Payloads channel must not be nil")
}
s.unsafeL2Payloads = unsafeL2Payloads
s.wg.Add(1) s.wg.Add(1)
go s.eventLoop() go s.eventLoop()
return nil return nil
...@@ -74,7 +78,7 @@ func (s *SyncClient) Close() error { ...@@ -74,7 +78,7 @@ func (s *SyncClient) Close() error {
// eventLoop is the main event loop for the sync client. // eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() { func (s *SyncClient) eventLoop() {
defer s.wg.Done() defer s.wg.Done()
s.log.Info("starting sync client event loop") s.log.Info("Starting sync client event loop")
for { for {
select { select {
...@@ -88,29 +92,31 @@ func (s *SyncClient) eventLoop() { ...@@ -88,29 +92,31 @@ func (s *SyncClient) eventLoop() {
// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC. // fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs). // WARNING: This function fails silently (aside from warning logs).
//
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) { func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) {
s.log.Info("requesting unsafe payload from backup RPC", "block number", blockNumber) s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
// TODO: Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
payload, err := s.PayloadByNumber(ctx, blockNumber) payload, err := s.PayloadByNumber(ctx, blockNumber)
if err != nil { if err != nil {
s.log.Warn("failed to convert block to execution payload", "block number", blockNumber, "err", err) s.log.Warn("Failed to convert block to execution payload", "block number", blockNumber, "err", err)
return return
} }
// TODO: Validate the integrity of the payload. Is this required?
// Signature validation is not necessary here since the backup RPC is trusted. // Signature validation is not necessary here since the backup RPC is trusted.
if _, ok := payload.CheckBlockHash(); !ok { if _, ok := payload.CheckBlockHash(); !ok {
s.log.Warn("received invalid payload from backup RPC; invalid block hash", "payload", payload.ID()) s.log.Warn("Received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
return return
} }
s.log.Info("received unsafe payload from backup RPC", "payload", payload.ID()) s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
// Send the retrieved payload to the `unsafeL2Payloads` channel. // Send the retrieved payload to the `unsafeL2Payloads` channel.
s.unsafeL2Payloads <- payload if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
s.log.Warn("Failed to send payload into the driver's unsafeL2Payloads channel", "payload", payload.ID(), "err", err)
s.log.Info("sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID()) return
} else {
s.log.Info("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment