Commit b9f8f3ce authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #5180 from ethereum-optimism/p2p-alt-sync

op-node: P2P req-resp alt sync method support
parents 025e157a dee6046b
...@@ -16,6 +16,7 @@ require ( ...@@ -16,6 +16,7 @@ require (
github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8 github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8
github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/holiman/uint256 v1.2.0 github.com/holiman/uint256 v1.2.0
github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-leveldb v0.5.0
...@@ -86,7 +87,6 @@ require ( ...@@ -86,7 +87,6 @@ require (
github.com/graph-gophers/graphql-go v1.3.0 // indirect github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-bexpr v0.1.11 // indirect github.com/hashicorp/go-bexpr v0.1.11 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/huin/goupnp v1.1.0 // indirect github.com/huin/goupnp v1.1.0 // indirect
github.com/influxdata/influxdb v1.8.3 // indirect github.com/influxdata/influxdb v1.8.3 // indirect
......
...@@ -143,6 +143,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { ...@@ -143,6 +143,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
UnsafeL2: s.L2Unsafe(), UnsafeL2: s.L2Unsafe(),
SafeL2: s.L2Safe(), SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(), FinalizedL2: s.L2Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
} }
} }
......
...@@ -206,6 +206,9 @@ type SystemConfig struct { ...@@ -206,6 +206,9 @@ type SystemConfig struct {
// Any node name not in the topology will not have p2p enabled. // Any node name not in the topology will not have p2p enabled.
P2PTopology map[string][]string P2PTopology map[string][]string
// Enables req-resp sync in the P2P nodes
P2PReqRespSync bool
// If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet. // If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet.
NonFinalizedProposals bool NonFinalizedProposals bool
...@@ -218,6 +221,8 @@ type System struct { ...@@ -218,6 +221,8 @@ type System struct {
RollupConfig *rollup.Config RollupConfig *rollup.Config
L2GenesisCfg *core.Genesis
// Connections to running nodes // Connections to running nodes
Nodes map[string]*node.Node Nodes map[string]*node.Node
Backends map[string]*geth_eth.Ethereum Backends map[string]*geth_eth.Ethereum
...@@ -329,6 +334,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -329,6 +334,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
sys.L2GenesisCfg = l2Genesis
for addr, amount := range cfg.Premine { for addr, amount := range cfg.Premine {
if existing, ok := l2Genesis.Alloc[addr]; ok { if existing, ok := l2Genesis.Alloc[addr]; ok {
l2Genesis.Alloc[addr] = core.GenesisAccount{ l2Genesis.Alloc[addr] = core.GenesisAccount{
...@@ -411,30 +417,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -411,30 +417,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
// Configure connections to L1 and L2 for rollup nodes. // Configure connections to L1 and L2 for rollup nodes.
// TODO: refactor testing to use in-process rpc connections instead of websockets. // TODO: refactor testing to use in-process rpc connections instead of websockets.
l1EndpointConfig := l1Node.WSEndpoint()
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
log.Info("using HTTP client")
l1EndpointConfig = l1Node.HTTPEndpoint()
}
for name, rollupCfg := range cfg.Nodes { for name, rollupCfg := range cfg.Nodes {
l2EndpointConfig := sys.Nodes[name].WSAuthEndpoint() configureL1(rollupCfg, l1Node)
if useHTTP { configureL2(rollupCfg, sys.Nodes[name], cfg.JWTSecret)
l2EndpointConfig = sys.Nodes[name].HTTPAuthEndpoint()
}
rollupCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
RateLimit: 0,
BatchSize: 20,
HttpPollInterval: time.Duration(cfg.DeployConfig.L1BlockTime) * time.Second / 10,
}
rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret,
}
rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{ rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: nil, Client: nil,
TrustRPC: false, TrustRPC: false,
...@@ -489,6 +475,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -489,6 +475,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
HostP2P: h, HostP2P: h,
LocalNode: nil, LocalNode: nil,
UDPv5: nil, UDPv5: nil,
EnableReqRespSync: cfg.P2PReqRespSync,
} }
p2pNodes[name] = p p2pNodes[name] = p
return p, nil return p, nil
...@@ -632,6 +619,35 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -632,6 +619,35 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return sys, nil return sys, nil
} }
func configureL1(rollupNodeCfg *rollupNode.Config, l1Node *node.Node) {
l1EndpointConfig := l1Node.WSEndpoint()
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
log.Info("using HTTP client")
l1EndpointConfig = l1Node.HTTPEndpoint()
}
rollupNodeCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
RateLimit: 0,
BatchSize: 20,
HttpPollInterval: time.Millisecond * 100,
}
}
func configureL2(rollupNodeCfg *rollupNode.Config, l2Node *node.Node, jwtSecret [32]byte) {
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
l2EndpointConfig := l2Node.WSAuthEndpoint()
if useHTTP {
l2EndpointConfig = l2Node.HTTPAuthEndpoint()
}
rollupNodeCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: jwtSecret,
}
}
func (cfg SystemConfig) L1ChainIDBig() *big.Int { func (cfg SystemConfig) L1ChainIDBig() *big.Int {
return new(big.Int).SetUint64(cfg.DeployConfig.L1ChainID) return new(big.Int).SetUint64(cfg.DeployConfig.L1ChainID)
} }
......
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
rollupNode "github.com/ethereum-optimism/optimism/op-node/node" rollupNode "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -35,6 +36,7 @@ import ( ...@@ -35,6 +36,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/withdrawals" "github.com/ethereum-optimism/optimism/op-node/withdrawals"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
) )
var enableParallelTesting bool = true var enableParallelTesting bool = true
...@@ -737,6 +739,159 @@ func TestSystemRPCAltSync(t *testing.T) { ...@@ -737,6 +739,159 @@ func TestSystemRPCAltSync(t *testing.T) {
require.ElementsMatch(t, received, published[:len(received)]) require.ElementsMatch(t, received, published[:len(received)])
} }
func TestSystemP2PAltSync(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := DefaultSystemConfig(t)
// remove default verifier node
delete(cfg.Nodes, "verifier")
// Add more verifier nodes
cfg.Nodes["alice"] = &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
}
cfg.Nodes["bob"] = &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
}
cfg.Loggers["alice"] = testlog.Logger(t, log.LvlInfo).New("role", "alice")
cfg.Loggers["bob"] = testlog.Logger(t, log.LvlInfo).New("role", "bob")
// connect the nodes
cfg.P2PTopology = map[string][]string{
"sequencer": {"alice", "bob"},
"alice": {"sequencer", "bob"},
"bob": {"alice", "sequencer"},
}
// Enable the P2P req-resp based sync
cfg.P2PReqRespSync = true
// Disable batcher, so there will not be any L1 data to sync from
cfg.DisableBatcher = true
var published []string
seqTracer := new(FnTracer)
// The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.ID().String())
}
// Blocks are now received via the RPC based alt-sync method
cfg.Nodes["sequencer"].Tracer = seqTracer
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
// Transactor Account
ethPrivKey := cfg.Secrets.Alice
// Submit a TX to L2 sequencer node
toAddr := common.Address{0xff, 0xff}
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Nonce: 0,
To: &toAddr,
Value: big.NewInt(1_000_000_000),
GasTipCap: big.NewInt(10),
GasFeeCap: big.NewInt(200),
Gas: 21000,
})
err = l2Seq.SendTransaction(context.Background(), tx)
require.Nil(t, err, "Sending L2 tx to sequencer")
// Wait for tx to be mined on the L2 sequencer chain
receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer")
// Gossip is able to respond to IWANT messages for the duration of heartbeat_time * message_window = 0.5 * 12 = 6
// Wait till we pass that, and then we'll have missed some blocks that cannot be retrieved in any way from gossip
time.Sleep(time.Second * 10)
// set up our syncer node, connect it to alice/bob
cfg.Loggers["syncer"] = testlog.Logger(t, log.LvlInfo).New("role", "syncer")
snapLog := log.New()
snapLog.SetHandler(log.DiscardHandler())
// Create a peer, and hook up alice and bob
h, err := sys.Mocknet.GenPeer()
require.NoError(t, err)
_, err = sys.Mocknet.LinkPeers(sys.RollupNodes["alice"].P2P().Host().ID(), h.ID())
require.NoError(t, err)
_, err = sys.Mocknet.LinkPeers(sys.RollupNodes["bob"].P2P().Host().ID(), h.ID())
require.NoError(t, err)
// Configure the new rollup node that'll be syncing
var syncedPayloads []string
syncNodeCfg := &rollupNode.Config{
L2Sync: &rollupNode.PreparedL2SyncEndpoint{Client: nil},
Driver: driver.Config{VerifierConfDepth: 0},
Rollup: *sys.RollupConfig,
P2PSigner: nil,
RPC: rollupNode.RPCConfig{
ListenAddr: "127.0.0.1",
ListenPort: 0,
EnableAdmin: true,
},
P2P: &p2p.Prepared{HostP2P: h, EnableReqRespSync: true},
Metrics: rollupNode.MetricsConfig{Enabled: false}, // no metrics server
Pprof: oppprof.CLIConfig{},
L1EpochPollInterval: time.Second * 10,
Tracer: &FnTracer{
OnUnsafeL2PayloadFn: func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
syncedPayloads = append(syncedPayloads, payload.ID().String())
},
},
}
configureL1(syncNodeCfg, sys.Nodes["l1"])
syncerL2Engine, _, err := initL2Geth("syncer", big.NewInt(int64(cfg.DeployConfig.L2ChainID)), sys.L2GenesisCfg, cfg.JWTFilePath)
require.NoError(t, err)
require.NoError(t, syncerL2Engine.Start())
configureL2(syncNodeCfg, syncerL2Engine, cfg.JWTSecret)
syncerNode, err := rollupNode.New(context.Background(), syncNodeCfg, cfg.Loggers["syncer"], snapLog, "", metrics.NewMetrics(""))
require.NoError(t, err)
err = syncerNode.Start(context.Background())
require.NoError(t, err)
// connect alice and bob to our new syncer node
_, err = sys.Mocknet.ConnectPeers(sys.RollupNodes["alice"].P2P().Host().ID(), syncerNode.P2P().Host().ID())
require.NoError(t, err)
_, err = sys.Mocknet.ConnectPeers(sys.RollupNodes["bob"].P2P().Host().ID(), syncerNode.P2P().Host().ID())
require.NoError(t, err)
rpc, err := syncerL2Engine.Attach()
require.NoError(t, err)
l2Verif := ethclient.NewClient(rpc)
// It may take a while to sync, but eventually we should see the sequenced data show up
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 100*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, receiptSeq, receiptVerif)
// Verify that the tx was received via P2P sync
require.Contains(t, syncedPayloads, eth.BlockID{Hash: receiptVerif.BlockHash, Number: receiptVerif.BlockNumber.Uint64()}.String())
// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(syncedPayloads))
require.ElementsMatch(t, syncedPayloads, published[:len(syncedPayloads)])
}
// TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node. // TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node.
func TestSystemDenseTopology(t *testing.T) { func TestSystemDenseTopology(t *testing.T) {
t.Skip("Skipping dense topology test to avoid flakiness. @refcell address in p2p scoring pr.") t.Skip("Skipping dense topology test to avoid flakiness. @refcell address in p2p scoring pr.")
......
...@@ -32,4 +32,7 @@ type SyncStatus struct { ...@@ -32,4 +32,7 @@ type SyncStatus struct {
// FinalizedL2 points to the L2 block that was derived fully from // FinalizedL2 points to the L2 block that was derived fully from
// finalized L1 information, thus irreversible. // finalized L1 information, thus irreversible.
FinalizedL2 L2BlockRef `json:"finalized_l2"` FinalizedL2 L2BlockRef `json:"finalized_l2"`
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
} }
...@@ -276,6 +276,12 @@ var ( ...@@ -276,6 +276,12 @@ var (
Hidden: true, Hidden: true,
EnvVar: p2pEnv("GOSSIP_FLOOD_PUBLISH"), EnvVar: p2pEnv("GOSSIP_FLOOD_PUBLISH"),
} }
SyncReqRespFlag = cli.BoolFlag{
Name: "p2p.sync.req-resp",
Usage: "Enables experimental P2P req-resp alternative sync method, on both server and client side.",
Required: false,
EnvVar: p2pEnv("SYNC_REQ_RESP"),
}
) )
// None of these flags are strictly required. // None of these flags are strictly required.
...@@ -315,4 +321,5 @@ var p2pFlags = []cli.Flag{ ...@@ -315,4 +321,5 @@ var p2pFlags = []cli.Flag{
GossipMeshDhiFlag, GossipMeshDhiFlag,
GossipMeshDlazyFlag, GossipMeshDlazyFlag,
GossipFloodPublishFlag, GossipFloodPublishFlag,
SyncReqRespFlag,
} }
...@@ -66,6 +66,9 @@ type Metricer interface { ...@@ -66,6 +66,9 @@ type Metricer interface {
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
// P2P Metrics // P2P Metrics
SetPeerScores(scores map[string]float64) SetPeerScores(scores map[string]float64)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int)
} }
// Metrics tracks all the metrics for the op-node. // Metrics tracks all the metrics for the op-node.
...@@ -90,6 +93,12 @@ type Metrics struct { ...@@ -90,6 +93,12 @@ type Metrics struct {
SequencingErrors *EventMetrics SequencingErrors *EventMetrics
PublishingErrors *EventMetrics PublishingErrors *EventMetrics
P2PReqDurationSeconds *prometheus.HistogramVec
P2PReqTotal *prometheus.CounterVec
P2PPayloadByNumber *prometheus.GaugeVec
PayloadsQuarantineTotal prometheus.Gauge
SequencerInconsistentL1Origin *EventMetrics SequencerInconsistentL1Origin *EventMetrics
SequencerResets *EventMetrics SequencerResets *EventMetrics
...@@ -322,6 +331,44 @@ func NewMetrics(procName string) *Metrics { ...@@ -322,6 +331,44 @@ func NewMetrics(procName string) *Metrics {
"direction", "direction",
}), }),
P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "req_duration_seconds",
Buckets: []float64{},
Help: "Duration of P2P requests",
}, []string{
"p2p_role", // "client" or "server"
"p2p_method",
"result_code",
}),
P2PReqTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "req_total",
Help: "Number of P2P requests",
}, []string{
"p2p_role", // "client" or "server"
"p2p_method",
"result_code",
}),
P2PPayloadByNumber: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "payload_by_number",
Help: "Payload by number requests",
}, []string{
"p2p_role", // "client" or "server"
}),
PayloadsQuarantineTotal: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "payloads_quarantine_total",
Help: "number of unverified execution payloads buffered in quarantine",
}),
SequencerBuildingDiffDurationSeconds: factory.NewHistogram(prometheus.HistogramOpts{ SequencerBuildingDiffDurationSeconds: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: ns, Namespace: ns,
Name: "sequencer_building_diff_seconds", Name: "sequencer_building_diff_seconds",
...@@ -567,6 +614,27 @@ func (m *Metrics) Document() []metrics.DocumentedMetric { ...@@ -567,6 +614,27 @@ func (m *Metrics) Document() []metrics.DocumentedMetric {
return m.factory.Document() return m.factory.Document()
} }
func (m *Metrics) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
if resultCode > 4 { // summarize all high codes to reduce metrics overhead
resultCode = 5
}
code := strconv.FormatUint(uint64(resultCode), 10)
m.P2PReqTotal.WithLabelValues("client", "payload_by_number", code).Inc()
m.P2PReqDurationSeconds.WithLabelValues("client", "payload_by_number", code).Observe(float64(duration) / float64(time.Second))
m.P2PPayloadByNumber.WithLabelValues("client").Set(float64(num))
}
func (m *Metrics) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
code := strconv.FormatUint(uint64(resultCode), 10)
m.P2PReqTotal.WithLabelValues("server", "payload_by_number", code).Inc()
m.P2PReqDurationSeconds.WithLabelValues("server", "payload_by_number", code).Observe(float64(duration) / float64(time.Second))
m.P2PPayloadByNumber.WithLabelValues("server").Set(float64(num))
}
func (m *Metrics) PayloadsQuarantineSize(n int) {
m.PayloadsQuarantineTotal.Set(float64(n))
}
type noopMetricer struct{} type noopMetricer struct{}
var NoopMetrics Metricer = new(noopMetricer) var NoopMetrics Metricer = new(noopMetricer)
...@@ -660,3 +728,12 @@ func (n *noopMetricer) RecordSequencerSealingTime(duration time.Duration) { ...@@ -660,3 +728,12 @@ func (n *noopMetricer) RecordSequencerSealingTime(duration time.Duration) {
func (n *noopMetricer) Document() []metrics.DocumentedMetric { func (n *noopMetricer) Document() []metrics.DocumentedMetric {
return nil return nil
} }
func (n *noopMetricer) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
}
func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
}
func (n *noopMetricer) PayloadsQuarantineSize(int) {
}
...@@ -256,7 +256,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error { ...@@ -256,7 +256,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil { if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.runCfg, n.metrics) p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics)
if err != nil || p2pNode == nil { if err != nil || p2pNode == nil {
return err return err
} }
...@@ -373,11 +373,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e ...@@ -373,11 +373,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e
return nil return nil
} }
func (n *OpNode) RequestL2Range(ctx context.Context, start, end uint64) error { func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.rpcSync != nil { if n.rpcSync != nil {
return n.rpcSync.RequestL2Range(ctx, start, end) return n.rpcSync.RequestL2Range(ctx, start, end)
} }
n.log.Debug("ignoring request to sync L2 range, no sync method available") if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
return n.p2pNode.RequestL2Range(ctx, start, end)
}
n.log.Debug("ignoring request to sync L2 range, no sync method available", "start", start, "end", end)
return nil return nil
} }
......
...@@ -166,6 +166,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { ...@@ -166,6 +166,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
UnsafeL2: testutils.RandomL2BlockRef(rng), UnsafeL2: testutils.RandomL2BlockRef(rng),
SafeL2: testutils.RandomL2BlockRef(rng), SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
} }
} }
......
...@@ -73,6 +73,8 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { ...@@ -73,6 +73,8 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
conf.ConnGater = p2p.DefaultConnGater conf.ConnGater = p2p.DefaultConnGater
conf.ConnMngr = p2p.DefaultConnManager conf.ConnMngr = p2p.DefaultConnManager
conf.EnableReqRespSync = ctx.GlobalBool(flags.SyncReqRespFlag.Name)
return conf, nil return conf, nil
} }
......
...@@ -40,6 +40,7 @@ type SetupP2P interface { ...@@ -40,6 +40,7 @@ type SetupP2P interface {
Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint TargetPeers() uint
GossipSetupConfigurables GossipSetupConfigurables
ReqRespSyncEnabled() bool
} }
// Config sets up a p2p host and discv5 service from configuration. // Config sets up a p2p host and discv5 service from configuration.
...@@ -50,6 +51,9 @@ type Config struct { ...@@ -50,6 +51,9 @@ type Config struct {
DisableP2P bool DisableP2P bool
NoDiscovery bool NoDiscovery bool
// Enable P2P-based alt-syncing method (req-resp protocol, not gossip)
AltSync bool
// Pubsub Scoring Parameters // Pubsub Scoring Parameters
PeerScoring pubsub.PeerScoreParams PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams TopicScoring pubsub.TopicScoreParams
...@@ -104,6 +108,8 @@ type Config struct { ...@@ -104,6 +108,8 @@ type Config struct {
ConnGater func(conf *Config) (connmgr.ConnectionGater, error) ConnGater func(conf *Config) (connmgr.ConnectionGater, error)
ConnMngr func(conf *Config) (connmgr.ConnManager, error) ConnMngr func(conf *Config) (connmgr.ConnManager, error)
EnableReqRespSync bool
} }
//go:generate mockery --name ConnectionGater //go:generate mockery --name ConnectionGater
...@@ -166,6 +172,10 @@ func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams { ...@@ -166,6 +172,10 @@ func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams {
return &conf.TopicScoring return &conf.TopicScoring
} }
func (conf *Config) ReqRespSyncEnabled() bool {
return conf.EnableReqRespSync
}
const maxMeshParam = 1000 const maxMeshParam = 1000
func (conf *Config) Check() error { func (conf *Config) Check() error {
......
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
...@@ -125,7 +126,7 @@ func TestP2PFull(t *testing.T) { ...@@ -125,7 +126,7 @@ func TestP2PFull(t *testing.T) {
runCfgB := &testutils.MockRuntimeConfig{P2PSeqAddress: common.Address{0x42}} runCfgB := &testutils.MockRuntimeConfig{P2PSeqAddress: common.Address{0x42}}
logA := testlog.Logger(t, log.LvlError).New("host", "A") logA := testlog.Logger(t, log.LvlError).New("host", "A")
nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, runCfgA, nil) nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, nil, runCfgA, metrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
defer nodeA.Close() defer nodeA.Close()
...@@ -148,7 +149,7 @@ func TestP2PFull(t *testing.T) { ...@@ -148,7 +149,7 @@ func TestP2PFull(t *testing.T) {
logB := testlog.Logger(t, log.LvlError).New("host", "B") logB := testlog.Logger(t, log.LvlError).New("host", "B")
nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, runCfgB, nil) nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, nil, runCfgB, metrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
defer nodeB.Close() defer nodeB.Close()
hostB := nodeB.Host() hostB := nodeB.Host()
...@@ -277,7 +278,7 @@ func TestDiscovery(t *testing.T) { ...@@ -277,7 +278,7 @@ func TestDiscovery(t *testing.T) {
resourcesCtx, resourcesCancel := context.WithCancel(context.Background()) resourcesCtx, resourcesCancel := context.WithCancel(context.Background())
defer resourcesCancel() defer resourcesCancel()
nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, runCfgA, nil) nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, nil, runCfgA, metrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
defer nodeA.Close() defer nodeA.Close()
hostA := nodeA.Host() hostA := nodeA.Host()
...@@ -292,7 +293,7 @@ func TestDiscovery(t *testing.T) { ...@@ -292,7 +293,7 @@ func TestDiscovery(t *testing.T) {
confB.DiscoveryDB = discDBC confB.DiscoveryDB = discDBC
// Start B // Start B
nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, runCfgB, nil) nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, nil, runCfgB, metrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
defer nodeB.Close() defer nodeB.Close()
hostB := nodeB.Host() hostB := nodeB.Host()
...@@ -307,7 +308,7 @@ func TestDiscovery(t *testing.T) { ...@@ -307,7 +308,7 @@ func TestDiscovery(t *testing.T) {
}}) }})
// Start C // Start C
nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, runCfgC, nil) nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, nil, runCfgC, metrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
defer nodeC.Close() defer nodeC.Close()
hostC := nodeC.Host() hostC := nodeC.Host()
......
...@@ -11,8 +11,10 @@ import ( ...@@ -11,8 +11,10 @@ import (
"github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
p2pmetrics "github.com/libp2p/go-libp2p/core/metrics" p2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -32,16 +34,18 @@ type NodeP2P struct { ...@@ -32,16 +34,18 @@ type NodeP2P struct {
dv5Udp *discover.UDPv5 // p2p discovery service dv5Udp *discover.UDPv5 // p2p discovery service
gs *pubsub.PubSub // p2p gossip router gs *pubsub.PubSub // p2p gossip router
gsOut GossipOut // p2p gossip application interface for publishing gsOut GossipOut // p2p gossip application interface for publishing
syncCl *SyncClient
syncSrv *ReqRespServer
} }
// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil. // NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
// If metrics are configured, a bandwidth monitor will be spawned in a goroutine. // If metrics are configured, a bandwidth monitor will be spawned in a goroutine.
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) { func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) {
if setup == nil { if setup == nil {
return nil, errors.New("p2p node cannot be created without setup") return nil, errors.New("p2p node cannot be created without setup")
} }
var n NodeP2P var n NodeP2P
if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, runCfg, metrics); err != nil { if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics); err != nil {
closeErr := n.Close() closeErr := n.Close()
if closeErr != nil { if closeErr != nil {
log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err) log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err)
...@@ -54,7 +58,7 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. ...@@ -54,7 +58,7 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
return &n, nil return &n, nil
} }
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error { func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
bwc := p2pmetrics.NewBandwidthCounter() bwc := p2pmetrics.NewBandwidthCounter()
var err error var err error
...@@ -73,6 +77,29 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -73,6 +77,29 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.gater = extra.ConnectionGater() n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager() n.connMgr = extra.ConnectionManager()
} }
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
},
DisconnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.RemovePeer(conn.RemotePeer())
},
})
n.syncCl.Start()
// the host may already be connected to peers, add them all to the sync client
for _, peerID := range n.host.Network().Peers() {
n.syncCl.AddPeer(peerID)
}
if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
// register the sync protocol with libp2p host
payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
}
// notify of any new connections/streams/etc. // notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics)) n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
...@@ -104,6 +131,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -104,6 +131,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return nil return nil
} }
func (n *NodeP2P) AltSyncEnabled() bool {
return n.syncCl != nil
}
func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if !n.AltSyncEnabled() {
return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end)
}
return n.syncCl.RequestL2Range(ctx, start, end)
}
func (n *NodeP2P) Host() host.Host { func (n *NodeP2P) Host() host.Host {
return n.host return n.host
} }
...@@ -146,6 +184,11 @@ func (n *NodeP2P) Close() error { ...@@ -146,6 +184,11 @@ func (n *NodeP2P) Close() error {
if err := n.host.Close(); err != nil { if err := n.host.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %w", err)) result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %w", err))
} }
if n.syncCl != nil {
if err := n.syncCl.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p sync client cleanly: %w", err))
}
}
} }
return result.ErrorOrNil() return result.ErrorOrNil()
} }
......
...@@ -22,6 +22,8 @@ type Prepared struct { ...@@ -22,6 +22,8 @@ type Prepared struct {
HostP2P host.Host HostP2P host.Host
LocalNode *enode.LocalNode LocalNode *enode.LocalNode
UDPv5 *discover.UDPv5 UDPv5 *discover.UDPv5
EnableReqRespSync bool
} }
var _ SetupP2P = (*Prepared)(nil) var _ SetupP2P = (*Prepared)(nil)
...@@ -83,3 +85,7 @@ func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams { ...@@ -83,3 +85,7 @@ func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams {
func (p *Prepared) Disabled() bool { func (p *Prepared) Disabled() bool {
return false return false
} }
func (p *Prepared) ReqRespSyncEnabled() bool {
return p.EnableReqRespSync
}
This diff is collapsed.
package p2p
import (
"context"
"math"
"math/big"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type mockPayloadFn func(n uint64) (*eth.ExecutionPayload, error)
func (fn mockPayloadFn) PayloadByNumber(_ context.Context, number uint64) (*eth.ExecutionPayload, error) {
return fn(number)
}
var _ L2Chain = mockPayloadFn(nil)
func setupSyncTestData(length uint64) (*rollup.Config, map[uint64]*eth.ExecutionPayload, func(i uint64) eth.L2BlockRef) {
// minimal rollup config to build mock blocks & verify their time.
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{Hash: common.Hash{0xaa}},
L2: eth.BlockID{Hash: common.Hash{0xbb}},
L2Time: 9000,
},
BlockTime: 2,
L2ChainID: big.NewInt(1234),
}
// create some simple fake test blocks
payloads := make(map[uint64]*eth.ExecutionPayload)
payloads[0] = &eth.ExecutionPayload{
Timestamp: eth.Uint64Quantity(cfg.Genesis.L2Time),
}
payloads[0].BlockHash, _ = payloads[0].CheckBlockHash()
for i := uint64(1); i <= length; i++ {
payload := &eth.ExecutionPayload{
ParentHash: payloads[i-1].BlockHash,
BlockNumber: eth.Uint64Quantity(i),
Timestamp: eth.Uint64Quantity(cfg.Genesis.L2Time + i*cfg.BlockTime),
}
payload.BlockHash, _ = payload.CheckBlockHash()
payloads[i] = payload
}
l2Ref := func(i uint64) eth.L2BlockRef {
return eth.L2BlockRef{
Hash: payloads[i].BlockHash,
Number: uint64(payloads[i].BlockNumber),
ParentHash: payloads[i].ParentHash,
Time: uint64(payloads[i].Timestamp),
}
}
return cfg, payloads, l2Ref
}
func TestSinglePeerSync(t *testing.T) {
t.Parallel() // Takes a while, but can run in parallel
log := testlog.Logger(t, log.LvlError)
cfg, payloads, l2Ref := setupSyncTestData(25)
// Serving payloads: just load them from the map, if they exist
servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) {
p, ok := payloads[n]
if !ok {
return nil, ethereum.NotFound
}
return p, nil
})
// collect received payloads in a buffered channel, so we can verify we get everything
received := make(chan *eth.ExecutionPayload, 100)
receivePayload := receivePayloadFn(func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error {
received <- payload
return nil
})
// Setup 2 minimal test hosts to attach the sync protocol to
mnet, err := mocknet.FullMeshConnected(2)
require.NoError(t, err, "failed to setup mocknet")
defer mnet.Close()
hosts := mnet.Hosts()
hostA, hostB := hosts[0], hosts[1]
require.Equal(t, hostA.Network().Connectedness(hostB.ID()), network.Connected)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup host A as the server
srv := NewReqRespServer(cfg, servePayload, metrics.NoopMetrics)
payloadByNumber := MakeStreamHandler(ctx, log.New("role", "server"), srv.HandleSyncRequest)
hostA.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
// Setup host B as the client
cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics)
// Setup host B (client) to sync from its peer Host A (server)
cl.AddPeer(hostA.ID())
cl.Start()
defer cl.Close()
// request to start syncing between 10 and 20
require.NoError(t, cl.RequestL2Range(ctx, l2Ref(10), l2Ref(20)))
// and wait for the sync results to come in (in reverse order)
receiveCtx, receiveCancel := context.WithTimeout(ctx, time.Second*5)
defer receiveCancel()
for i := uint64(19); i > 10; i-- {
select {
case p := <-received:
require.Equal(t, uint64(p.BlockNumber), i, "expecting payloads in order")
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
}
func TestMultiPeerSync(t *testing.T) {
t.Parallel() // Takes a while, but can run in parallel
log := testlog.Logger(t, log.LvlError)
cfg, payloads, l2Ref := setupSyncTestData(100)
setupPeer := func(ctx context.Context, h host.Host) (*SyncClient, chan *eth.ExecutionPayload) {
// Serving payloads: just load them from the map, if they exist
servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) {
p, ok := payloads[n]
if !ok {
return nil, ethereum.NotFound
}
return p, nil
})
// collect received payloads in a buffered channel, so we can verify we get everything
received := make(chan *eth.ExecutionPayload, 100)
receivePayload := receivePayloadFn(func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error {
received <- payload
return nil
})
// Setup as server
srv := NewReqRespServer(cfg, servePayload, metrics.NoopMetrics)
payloadByNumber := MakeStreamHandler(ctx, log.New("serve", "payloads_by_number"), srv.HandleSyncRequest)
h.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics)
return cl, received
}
// Setup 3 minimal test hosts to attach the sync protocol to
mnet, err := mocknet.FullMeshConnected(3)
require.NoError(t, err, "failed to setup mocknet")
defer mnet.Close()
hosts := mnet.Hosts()
hostA, hostB, hostC := hosts[0], hosts[1], hosts[2]
require.Equal(t, hostA.Network().Connectedness(hostB.ID()), network.Connected)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clA, recvA := setupPeer(ctx, hostA)
clB, recvB := setupPeer(ctx, hostB)
clC, _ := setupPeer(ctx, hostC)
// Make them all sync from each other
clA.AddPeer(hostB.ID())
clA.AddPeer(hostC.ID())
clA.Start()
defer clA.Close()
clB.AddPeer(hostA.ID())
clB.AddPeer(hostC.ID())
clB.Start()
defer clB.Close()
clC.AddPeer(hostA.ID())
clC.AddPeer(hostB.ID())
clC.Start()
defer clC.Close()
// request to start syncing between 10 and 100
require.NoError(t, clA.RequestL2Range(ctx, l2Ref(10), l2Ref(90)))
// With such large range to request we are going to hit the rate-limits of B and C,
// but that means we'll balance the work between the peers.
// wait for the results to come in, based on the expected rate limit, divided by 2 (because we have 2 servers), with a buffer of 2 seconds
receiveCtx, receiveCancel := context.WithTimeout(ctx, time.Second*time.Duration(math.Ceil(float64((89-10)/peerServerBlocksRateLimit)))/2+time.Second*2)
defer receiveCancel()
for i := uint64(89); i > 10; i-- {
select {
case p := <-recvA:
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
// now see if B can sync a range, and fill the gap with a re-request
bl25 := payloads[25] // temporarily remove it from the available payloads. This will create a gap
delete(payloads, uint64(25))
require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(30)))
for i := uint64(29); i > 25; i-- {
select {
case p := <-recvB:
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
// the request for 25 should fail. See:
// server: WARN peer requested unknown block by number num=25
// client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1"
require.Zero(t, len(recvB), "there is a gap, should not see other payloads yet")
// Add back the block
payloads[25] = bl25
// And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine)
require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(26)))
receiveCtx, receiveCancel = context.WithTimeout(ctx, time.Second*10)
defer receiveCancel()
for i := uint64(25); i > 20; i-- {
select {
case p := <-recvB:
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
}
...@@ -678,22 +678,15 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -678,22 +678,15 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return io.EOF return io.EOF
} }
// GetUnsafeQueueGap retrieves the current [start, end) range (incl. start, excl. end) // UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
// of the gap between the tip of the unsafe priority queue and the unsafe head. func (eq *EngineQueue) UnsafeL2SyncTarget() eth.L2BlockRef {
// If there is no gap, the difference between end and start will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) {
// The start of the gap is always the unsafe head + 1
start = eq.unsafeHead.Number + 1
// If the priority queue is empty, the end is the first block number at the top of the priority queue
// Otherwise, the end is the expected block number
if first := eq.unsafePayloads.Peek(); first != nil { if first := eq.unsafePayloads.Peek(); first != nil {
// Don't include the payload we already have in the sync range ref, err := PayloadToBlockRef(first, &eq.cfg.Genesis)
end = first.ID().Number if err != nil {
return eth.L2BlockRef{}
}
return ref
} else { } else {
// Include the expected payload in the sync range return eth.L2BlockRef{}
end = expectedNumber + 1
} }
return start, end
} }
...@@ -51,7 +51,7 @@ type EngineQueueStage interface { ...@@ -51,7 +51,7 @@ type EngineQueueStage interface {
Finalize(l1Origin eth.L1BlockRef) Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) UnsafeL2SyncTarget() eth.L2BlockRef
Step(context.Context) error Step(context.Context) error
} }
...@@ -167,10 +167,9 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { ...@@ -167,10 +167,9 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload) dp.eng.AddUnsafePayload(payload)
} }
// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. // UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
// If there is no gap, the start and end will be 0. func (dp *DerivationPipeline) UnsafeL2SyncTarget() eth.L2BlockRef {
func (dp *DerivationPipeline) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) { return dp.eng.UnsafeL2SyncTarget()
return dp.eng.GetUnsafeQueueGap(expectedNumber)
} }
// Step tries to progress the buffer. // Step tries to progress the buffer.
......
...@@ -48,7 +48,7 @@ type DerivationPipeline interface { ...@@ -48,7 +48,7 @@ type DerivationPipeline interface {
Reset() Reset()
Step(ctx context.Context) error Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) UnsafeL2SyncTarget() eth.L2BlockRef
Finalize(ref eth.L1BlockRef) Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
...@@ -84,12 +84,20 @@ type Network interface { ...@@ -84,12 +84,20 @@ type Network interface {
type AltSync interface { type AltSync interface {
// RequestL2Range informs the sync source that the given range of L2 blocks is missing, // RequestL2Range informs the sync source that the given range of L2 blocks is missing,
// and should be retrieved from any available alternative syncing source. // and should be retrieved from any available alternative syncing source.
// The start of the range is inclusive, the end is exclusive. // The start and end of the range are exclusive:
// the start is the head we already have, the end is the first thing we have queued up.
// It's the task of the alt-sync mechanism to use this hint to fetch the right payloads.
// Note that the end and start may not be consistent: in this case the sync method should fetch older history
//
// If the end value is zeroed, then the sync-method may determine the end free of choice,
// e.g. sync till the chain head meets the wallclock time. This functionality is optional:
// a fixed target to sync towards may be determined by picking up payloads through P2P gossip or other sources.
//
// The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method. // The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method.
// The latest requested range should always take priority over previous requests. // The latest requested range should always take priority over previous requests.
// There may be overlaps in requested ranges. // There may be overlaps in requested ranges.
// An error may be returned if the scheduling fails immediately, e.g. a context timeout. // An error may be returned if the scheduling fails immediately, e.g. a context timeout.
RequestL2Range(ctx context.Context, start, end uint64) error RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
......
...@@ -422,6 +422,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus { ...@@ -422,6 +422,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
UnsafeL2: s.derivation.UnsafeL2Head(), UnsafeL2: s.derivation.UnsafeL2Head(),
SafeL2: s.derivation.SafeL2Head(), SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(), FinalizedL2: s.derivation.Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
} }
} }
...@@ -489,24 +490,14 @@ type hashAndErrorChannel struct { ...@@ -489,24 +490,14 @@ type hashAndErrorChannel struct {
// WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved. // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
// Results are received through OnUnsafeL2Payload. // Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error { func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
// subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that start := s.derivation.UnsafeL2Head()
// difference by the block time to get the expected L2 block number at the current time. If the end := s.derivation.UnsafeL2SyncTarget()
// unsafe head does not have this block number, then there is a gap in the queue. // Check if we have missing blocks between the start and end. Request them if we do.
wallClock := uint64(time.Now().Unix()) if end == (eth.L2BlockRef{}) {
genesisTimestamp := s.config.Genesis.L2Time s.log.Debug("requesting sync with open-end range", "start", start)
if wallClock < genesisTimestamp { return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{})
s.log.Debug("nothing to sync, did not reach genesis L2 time yet", "genesis", genesisTimestamp) } else if end.Number > start.Number+1 {
return nil s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number)
}
wallClockGenesisDiff := wallClock - genesisTimestamp
// Note: round down, we should not request blocks into the future.
blocksSinceGenesis := wallClockGenesisDiff / s.config.BlockTime
expectedL2Block := s.config.Genesis.L2.Number + blocksSinceGenesis
start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block)
// Check if there is a gap between the unsafe head and the expected L2 block number at the current time.
if end > start {
s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end-start)
return s.altSync.RequestL2Range(ctx, start, end) return s.altSync.RequestL2Range(ctx, start, end)
} }
return nil return nil
......
...@@ -116,6 +116,20 @@ func (cfg *Config) ValidateL2Config(ctx context.Context, client L2Client) error ...@@ -116,6 +116,20 @@ func (cfg *Config) ValidateL2Config(ctx context.Context, client L2Client) error
return nil return nil
} }
func (cfg *Config) TargetBlockNumber(timestamp uint64) (num uint64, err error) {
// subtract genesis time from timestamp to get the time elapsed since genesis, and then divide that
// difference by the block time to get the expected L2 block number at the current time. If the
// unsafe head does not have this block number, then there is a gap in the queue.
genesisTimestamp := cfg.Genesis.L2Time
if timestamp < genesisTimestamp {
return 0, fmt.Errorf("did not reach genesis time (%d) yet", genesisTimestamp)
}
wallClockGenesisDiff := timestamp - genesisTimestamp
// Note: round down, we should not request blocks into the future.
blocksSinceGenesis := wallClockGenesisDiff / cfg.BlockTime
return cfg.Genesis.L2.Number + blocksSinceGenesis, nil
}
type L1Client interface { type L1Client interface {
ChainID(context.Context) (*big.Int, error) ChainID(context.Context) (*big.Int, error)
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
......
...@@ -32,9 +32,10 @@ type RPCSync interface { ...@@ -32,9 +32,10 @@ type RPCSync interface {
// Start starts an additional worker syncing job // Start starts an additional worker syncing job
Start() error Start() error
// RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface. // RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface.
RequestL2Range(ctx context.Context, start, end uint64) error RequestL2Range(ctx context.Context, start uint64, end eth.L2BlockRef) error
} }
// SyncClient implements the driver AltSync interface, including support for fetching an open-ended chain of L2 blocks.
type SyncClient struct { type SyncClient struct {
*L2Client *L2Client
...@@ -88,7 +89,7 @@ func (s *SyncClient) Close() error { ...@@ -88,7 +89,7 @@ func (s *SyncClient) Close() error {
return nil return nil
} }
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error { func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
// Drain previous requests now that we have new information // Drain previous requests now that we have new information
for len(s.requests) > 0 { for len(s.requests) > 0 {
select { // in case requests is being read at the same time, don't block on draining it. select { // in case requests is being read at the same time, don't block on draining it.
...@@ -98,11 +99,23 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro ...@@ -98,11 +99,23 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro
} }
} }
endNum := end.Number
if end == (eth.L2BlockRef{}) {
n, err := s.rollupCfg.TargetBlockNumber(uint64(time.Now().Unix()))
if err != nil {
return err
}
if n <= start.Number {
return nil
}
endNum = n
}
// TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method. // TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.
s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start) s.log.Info("Scheduling to fetch trailing missing payloads from backup RPC", "start", start, "end", endNum, "size", endNum-start.Number-1)
for i := start; i < end; i++ { for i := start.Number + 1; i < endNum; i++ {
select { select {
case s.requests <- i: case s.requests <- i:
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -57,6 +57,8 @@ and are adopted by several other blockchains, most notably the [L1 consensus lay ...@@ -57,6 +57,8 @@ and are adopted by several other blockchains, most notably the [L1 consensus lay
- [Block validation](#block-validation) - [Block validation](#block-validation)
- [Block processing](#block-processing) - [Block processing](#block-processing)
- [Block topic scoring parameters](#block-topic-scoring-parameters) - [Block topic scoring parameters](#block-topic-scoring-parameters)
- [Req-Resp](#req-resp)
- [`payload_by_number`](#payload_by_number)
<!-- END doctoc generated TOC please keep comment here to allow auto update --> <!-- END doctoc generated TOC please keep comment here to allow auto update -->
...@@ -305,12 +307,97 @@ A node may apply the block to their local engine ahead of L1 availability, if it ...@@ -305,12 +307,97 @@ A node may apply the block to their local engine ahead of L1 availability, if it
TODO: GossipSub per-topic scoring to fine-tune incentives for ideal propagation delay and bandwidth usage. TODO: GossipSub per-topic scoring to fine-tune incentives for ideal propagation delay and bandwidth usage.
## Req-Resp
The op-node implements a similar request-response encoding for its sync protocols as the L1 ethereum Beacon-Chain.
See [L1 P2P-interface req-resp specification][eth2-p2p-reqresp] and [Altair P2P update][eth2-p2p-altair-reqresp].
However, the protocol is simplified, to avoid several issues seen in L1:
- Error strings in responses, if there is any alternative response,
should not need to be compressed or have an artificial global length limit.
- Payload lengths should be fixed-length: byte-by-byte uvarint reading from the underlying stream is undesired.
- `<context-bytes>` are relaxed to encode a `uint32`, rather than a beacon-chain `ForkDigest`.
- Payload-encoding may change per hardfork, so is not part of the protocol-ID.
- Usage of response-chunks is specific to the req-resp method: most basic req-resp does not need chunked responses.
- Compression is encouraged to be part of the payload-encoding, specific to the req-resp method, where necessary:
pings and such do not need streaming frame compression etc.
And the protocol ID format follows the same scheme as L1,
except the trailing encoding schema part, which is now message-specific:
```text
/ProtocolPrefix/MessageName/SchemaVersion/
```
The req-resp protocols served by the op-node all have `/ProtocolPrefix` set to `/opstack/req`.
Individual methods may include the chain ID as part of the `/MessageName` segment,
so it's immediately clear which chain the method applies to, if the communication is chain-specific.
Other methods may include chain-information in the request and/or response data,
such as the `ForkDigest` `<context-bytes>` in L1 beacon chain req-resp protocols.
Each segment starts with a `/`, and may contain multiple `/`, and the final protocol ID is suffixed with a `/`.
### `payload_by_number`
This is an optional chain syncing method, to request/serve execution payloads by number.
This serves as a method to fill gaps upon missed gossip, and sync short to medium ranges of unsafe L2 blocks.
Protocol ID: `/opstack/req/payload_by_number/<chain-id>/0/`
- `/MessageName` is `/block_by_number/<chain-id>` where `<chain-id>` is set to the op-node L2 chain ID.
- `/SchemaVersion` is `/0`
Request format: `<num>`: a little-endian `uint64` - the block number to request.
Response format: `<response> = <res><version><payload>`
- `<res>` is a byte code describing the result.
- `0` on success, `<version><payload>` should follow.
- `1` if valid request, but unavailable payload.
- `2` if invalid request
- `3+` if other error
- The `>= 128` range is reserved for future use.
- `<version>` is a little-endian `uint32`, identifying the type of `ExecutionPayload` (fork-specific)
- `<payload>` is an encoded block, read till stream EOF.
The input of `<response>` should be limited, as well as any generated decompressed output,
to avoid unexpected resource usage or zip-bomb type attacks.
A 10 MB limit is recommended, to ensure all blocks may be synced.
Implementations may opt for a different limit, since this sync method is optional.
`<version>` list:
- `0`: SSZ-encoded `ExecutionPayload`, with Snappy framing compression,
matching the `ExecutionPayload` SSZ definition of the L1 Merge, L2 Bedrock and L2 Regolith versions.
- Other versions may be listed here with future network upgrades, such as the L1 Shanghai upgrade.
The request is by block-number, enabling parallel fetching of a chain across many peers.
A `res = 0` response should be verified to:
- Have a block-number matching the requested block number.
- Have a consistent `blockhash` w.r.t. the other block contents.
- Build towards a known canonical block.
- This can be verified by checking if the parent-hash of a previous trusted canonical block matches
that of the verified hash of the retrieved block.
- For unsafe blocks this may be relaxed to verification against the parent-hash of any previously trusted block:
- The gossip validation process limits the amount of blocks that may be trusted to sync towards.
- The unsafe blocks should be queued for processing, the latest received L2 unsafe blocks should always
override any previous chain, until the final L2 chain can be reproduced from L1 data.
A `res > 0` response code should not be accepted. The result code is helpful for debugging,
but the client should regard any error like any any other unanswered request, as the responding peer cannot be trusted.
---- ----
[libp2p]: https://libp2p.io/ [libp2p]: https://libp2p.io/
[discv5]: https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md [discv5]: https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md
[discv5-random-nodes]: https://pkg.go.dev/github.com/ethereum/go-ethereum@v1.10.12/p2p/discover#UDPv5.RandomNodes [discv5-random-nodes]: https://pkg.go.dev/github.com/ethereum/go-ethereum@v1.10.12/p2p/discover#UDPv5.RandomNodes
[eth2-p2p]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md [eth2-p2p]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md
[eth2-p2p-reqresp]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain
[eth2-p2p-altair-reqresp]: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/p2p-interface.md#the-reqresp-domain
[libp2p-noise]: https://github.com/libp2p/specs/tree/master/noise [libp2p-noise]: https://github.com/libp2p/specs/tree/master/noise
[multistream-select]: https://github.com/multiformats/multistream-select/ [multistream-select]: https://github.com/multiformats/multistream-select/
[mplex]: https://github.com/libp2p/specs/tree/master/mplex [mplex]: https://github.com/libp2p/specs/tree/master/mplex
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment