Commit 0d7859bb authored by protolambda's avatar protolambda

op-node: req-resp p2p sync with new feature-flag

Signed-off-by: default avatarprotolambda <proto@protolambda.com>
parent 11697889
...@@ -16,6 +16,7 @@ require ( ...@@ -16,6 +16,7 @@ require (
github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8 github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8
github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/holiman/uint256 v1.2.0 github.com/holiman/uint256 v1.2.0
github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-leveldb v0.5.0
...@@ -86,7 +87,6 @@ require ( ...@@ -86,7 +87,6 @@ require (
github.com/graph-gophers/graphql-go v1.3.0 // indirect github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-bexpr v0.1.11 // indirect github.com/hashicorp/go-bexpr v0.1.11 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/huin/goupnp v1.1.0 // indirect github.com/huin/goupnp v1.1.0 // indirect
github.com/influxdata/influxdb v1.8.3 // indirect github.com/influxdata/influxdb v1.8.3 // indirect
......
...@@ -143,6 +143,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { ...@@ -143,6 +143,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
UnsafeL2: s.L2Unsafe(), UnsafeL2: s.L2Unsafe(),
SafeL2: s.L2Safe(), SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(), FinalizedL2: s.L2Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
} }
} }
......
...@@ -193,6 +193,9 @@ type SystemConfig struct { ...@@ -193,6 +193,9 @@ type SystemConfig struct {
// Any node name not in the topology will not have p2p enabled. // Any node name not in the topology will not have p2p enabled.
P2PTopology map[string][]string P2PTopology map[string][]string
// Enables req-resp sync in the P2P nodes
P2PReqRespSync bool
// If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet. // If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet.
NonFinalizedProposals bool NonFinalizedProposals bool
...@@ -205,6 +208,8 @@ type System struct { ...@@ -205,6 +208,8 @@ type System struct {
RollupConfig *rollup.Config RollupConfig *rollup.Config
L2GenesisCfg *core.Genesis
// Connections to running nodes // Connections to running nodes
Nodes map[string]*node.Node Nodes map[string]*node.Node
Backends map[string]*geth_eth.Ethereum Backends map[string]*geth_eth.Ethereum
...@@ -316,6 +321,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -316,6 +321,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
sys.L2GenesisCfg = l2Genesis
for addr, amount := range cfg.Premine { for addr, amount := range cfg.Premine {
if existing, ok := l2Genesis.Alloc[addr]; ok { if existing, ok := l2Genesis.Alloc[addr]; ok {
l2Genesis.Alloc[addr] = core.GenesisAccount{ l2Genesis.Alloc[addr] = core.GenesisAccount{
...@@ -398,30 +404,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -398,30 +404,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
// Configure connections to L1 and L2 for rollup nodes. // Configure connections to L1 and L2 for rollup nodes.
// TODO: refactor testing to use in-process rpc connections instead of websockets. // TODO: refactor testing to use in-process rpc connections instead of websockets.
l1EndpointConfig := l1Node.WSEndpoint()
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
log.Info("using HTTP client")
l1EndpointConfig = l1Node.HTTPEndpoint()
}
for name, rollupCfg := range cfg.Nodes { for name, rollupCfg := range cfg.Nodes {
l2EndpointConfig := sys.Nodes[name].WSAuthEndpoint() configureL1(rollupCfg, l1Node)
if useHTTP { configureL2(rollupCfg, sys.Nodes[name], cfg.JWTSecret)
l2EndpointConfig = sys.Nodes[name].HTTPAuthEndpoint()
}
rollupCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
RateLimit: 0,
BatchSize: 20,
HttpPollInterval: time.Duration(cfg.DeployConfig.L1BlockTime) * time.Second / 10,
}
rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret,
}
rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{ rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: nil, Client: nil,
TrustRPC: false, TrustRPC: false,
...@@ -476,6 +462,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -476,6 +462,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
HostP2P: h, HostP2P: h,
LocalNode: nil, LocalNode: nil,
UDPv5: nil, UDPv5: nil,
EnableReqRespSync: cfg.P2PReqRespSync,
} }
p2pNodes[name] = p p2pNodes[name] = p
return p, nil return p, nil
...@@ -633,6 +620,35 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -633,6 +620,35 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return sys, nil return sys, nil
} }
func configureL1(rollupNodeCfg *rollupNode.Config, l1Node *node.Node) {
l1EndpointConfig := l1Node.WSEndpoint()
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
log.Info("using HTTP client")
l1EndpointConfig = l1Node.HTTPEndpoint()
}
rollupNodeCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
RateLimit: 0,
BatchSize: 20,
HttpPollInterval: time.Millisecond * 100,
}
}
func configureL2(rollupNodeCfg *rollupNode.Config, l2Node *node.Node, jwtSecret [32]byte) {
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
l2EndpointConfig := l2Node.WSAuthEndpoint()
if useHTTP {
l2EndpointConfig = l2Node.HTTPAuthEndpoint()
}
rollupNodeCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: jwtSecret,
}
}
func (cfg SystemConfig) L1ChainIDBig() *big.Int { func (cfg SystemConfig) L1ChainIDBig() *big.Int {
return new(big.Int).SetUint64(cfg.DeployConfig.L1ChainID) return new(big.Int).SetUint64(cfg.DeployConfig.L1ChainID)
} }
......
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
rollupNode "github.com/ethereum-optimism/optimism/op-node/node" rollupNode "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -35,6 +36,7 @@ import ( ...@@ -35,6 +36,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/withdrawals" "github.com/ethereum-optimism/optimism/op-node/withdrawals"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
) )
var enableParallelTesting bool = true var enableParallelTesting bool = true
...@@ -737,6 +739,159 @@ func TestSystemRPCAltSync(t *testing.T) { ...@@ -737,6 +739,159 @@ func TestSystemRPCAltSync(t *testing.T) {
require.ElementsMatch(t, received, published[:len(received)]) require.ElementsMatch(t, received, published[:len(received)])
} }
func TestSystemP2PAltSync(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := DefaultSystemConfig(t)
// remove default verifier node
delete(cfg.Nodes, "verifier")
// Add more verifier nodes
cfg.Nodes["alice"] = &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
}
cfg.Nodes["bob"] = &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
}
cfg.Loggers["alice"] = testlog.Logger(t, log.LvlInfo).New("role", "alice")
cfg.Loggers["bob"] = testlog.Logger(t, log.LvlInfo).New("role", "bob")
// connect the nodes
cfg.P2PTopology = map[string][]string{
"sequencer": {"alice", "bob"},
"alice": {"sequencer", "bob"},
"bob": {"alice", "sequencer"},
}
// Enable the P2P req-resp based sync
cfg.P2PReqRespSync = true
// Disable batcher, so there will not be any L1 data to sync from
cfg.DisableBatcher = true
var published []string
seqTracer := new(FnTracer)
// The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.ID().String())
}
// Blocks are now received via the RPC based alt-sync method
cfg.Nodes["sequencer"].Tracer = seqTracer
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
// Transactor Account
ethPrivKey := cfg.Secrets.Alice
// Submit a TX to L2 sequencer node
toAddr := common.Address{0xff, 0xff}
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Nonce: 0,
To: &toAddr,
Value: big.NewInt(1_000_000_000),
GasTipCap: big.NewInt(10),
GasFeeCap: big.NewInt(200),
Gas: 21000,
})
err = l2Seq.SendTransaction(context.Background(), tx)
require.Nil(t, err, "Sending L2 tx to sequencer")
// Wait for tx to be mined on the L2 sequencer chain
receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer")
// Gossip is able to respond to IWANT messages for the duration of heartbeat_time * message_window = 0.5 * 12 = 6
// Wait till we pass that, and then we'll have missed some blocks that cannot be retrieved in any way from gossip
time.Sleep(time.Second * 10)
// set up our syncer node, connect it to alice/bob
cfg.Loggers["syncer"] = testlog.Logger(t, log.LvlInfo).New("role", "syncer")
snapLog := log.New()
snapLog.SetHandler(log.DiscardHandler())
// Create a peer, and hook up alice and bob
h, err := sys.Mocknet.GenPeer()
require.NoError(t, err)
_, err = sys.Mocknet.LinkPeers(sys.RollupNodes["alice"].P2P().Host().ID(), h.ID())
require.NoError(t, err)
_, err = sys.Mocknet.LinkPeers(sys.RollupNodes["bob"].P2P().Host().ID(), h.ID())
require.NoError(t, err)
// Configure the new rollup node that'll be syncing
var syncedPayloads []string
syncNodeCfg := &rollupNode.Config{
L2Sync: &rollupNode.PreparedL2SyncEndpoint{Client: nil},
Driver: driver.Config{VerifierConfDepth: 0},
Rollup: *sys.RollupConfig,
P2PSigner: nil,
RPC: rollupNode.RPCConfig{
ListenAddr: "127.0.0.1",
ListenPort: 0,
EnableAdmin: true,
},
P2P: &p2p.Prepared{HostP2P: h, EnableReqRespSync: true},
Metrics: rollupNode.MetricsConfig{Enabled: false}, // no metrics server
Pprof: oppprof.CLIConfig{},
L1EpochPollInterval: time.Second * 10,
Tracer: &FnTracer{
OnUnsafeL2PayloadFn: func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
syncedPayloads = append(syncedPayloads, payload.ID().String())
},
},
}
configureL1(syncNodeCfg, sys.Nodes["l1"])
syncerL2Engine, _, err := initL2Geth("syncer", big.NewInt(int64(cfg.DeployConfig.L2ChainID)), sys.L2GenesisCfg, cfg.JWTFilePath)
require.NoError(t, err)
require.NoError(t, syncerL2Engine.Start())
configureL2(syncNodeCfg, syncerL2Engine, cfg.JWTSecret)
syncerNode, err := rollupNode.New(context.Background(), syncNodeCfg, cfg.Loggers["syncer"], snapLog, "", metrics.NewMetrics(""))
require.NoError(t, err)
err = syncerNode.Start(context.Background())
require.NoError(t, err)
// connect alice and bob to our new syncer node
_, err = sys.Mocknet.ConnectPeers(sys.RollupNodes["alice"].P2P().Host().ID(), syncerNode.P2P().Host().ID())
require.NoError(t, err)
_, err = sys.Mocknet.ConnectPeers(sys.RollupNodes["bob"].P2P().Host().ID(), syncerNode.P2P().Host().ID())
require.NoError(t, err)
rpc, err := syncerL2Engine.Attach()
require.NoError(t, err)
l2Verif := ethclient.NewClient(rpc)
// It may take a while to sync, but eventually we should see the sequenced data show up
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 100*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, receiptSeq, receiptVerif)
// Verify that the tx was received via P2P sync
require.Contains(t, syncedPayloads, eth.BlockID{Hash: receiptVerif.BlockHash, Number: receiptVerif.BlockNumber.Uint64()}.String())
// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(syncedPayloads))
require.ElementsMatch(t, syncedPayloads, published[:len(syncedPayloads)])
}
// TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node. // TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node.
func TestSystemDenseTopology(t *testing.T) { func TestSystemDenseTopology(t *testing.T) {
t.Skip("Skipping dense topology test to avoid flakiness. @refcell address in p2p scoring pr.") t.Skip("Skipping dense topology test to avoid flakiness. @refcell address in p2p scoring pr.")
......
...@@ -32,4 +32,7 @@ type SyncStatus struct { ...@@ -32,4 +32,7 @@ type SyncStatus struct {
// FinalizedL2 points to the L2 block that was derived fully from // FinalizedL2 points to the L2 block that was derived fully from
// finalized L1 information, thus irreversible. // finalized L1 information, thus irreversible.
FinalizedL2 L2BlockRef `json:"finalized_l2"` FinalizedL2 L2BlockRef `json:"finalized_l2"`
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
} }
...@@ -276,6 +276,12 @@ var ( ...@@ -276,6 +276,12 @@ var (
Hidden: true, Hidden: true,
EnvVar: p2pEnv("GOSSIP_FLOOD_PUBLISH"), EnvVar: p2pEnv("GOSSIP_FLOOD_PUBLISH"),
} }
SyncReqRespFlag = cli.BoolFlag{
Name: "p2p.sync.req-resp",
Usage: "Enables experimental P2P req-resp alternative sync method, on both server and client side.",
Required: false,
EnvVar: p2pEnv("SYNC_REQ_RESP"),
}
) )
// None of these flags are strictly required. // None of these flags are strictly required.
...@@ -315,4 +321,5 @@ var p2pFlags = []cli.Flag{ ...@@ -315,4 +321,5 @@ var p2pFlags = []cli.Flag{
GossipMeshDhiFlag, GossipMeshDhiFlag,
GossipMeshDlazyFlag, GossipMeshDlazyFlag,
GossipFloodPublishFlag, GossipFloodPublishFlag,
SyncReqRespFlag,
} }
...@@ -66,6 +66,9 @@ type Metricer interface { ...@@ -66,6 +66,9 @@ type Metricer interface {
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
// P2P Metrics // P2P Metrics
SetPeerScores(scores map[string]float64) SetPeerScores(scores map[string]float64)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int)
} }
// Metrics tracks all the metrics for the op-node. // Metrics tracks all the metrics for the op-node.
...@@ -90,6 +93,12 @@ type Metrics struct { ...@@ -90,6 +93,12 @@ type Metrics struct {
SequencingErrors *EventMetrics SequencingErrors *EventMetrics
PublishingErrors *EventMetrics PublishingErrors *EventMetrics
P2PReqDurationSeconds *prometheus.HistogramVec
P2PReqTotal *prometheus.CounterVec
P2PPayloadByNumber *prometheus.GaugeVec
PayloadsQuarantineTotal prometheus.Gauge
SequencerInconsistentL1Origin *EventMetrics SequencerInconsistentL1Origin *EventMetrics
SequencerResets *EventMetrics SequencerResets *EventMetrics
...@@ -322,6 +331,44 @@ func NewMetrics(procName string) *Metrics { ...@@ -322,6 +331,44 @@ func NewMetrics(procName string) *Metrics {
"direction", "direction",
}), }),
P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "req_duration_seconds",
Buckets: []float64{},
Help: "Duration of P2P requests",
}, []string{
"p2p_role", // "client" or "server"
"p2p_method",
"result_code",
}),
P2PReqTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "req_total",
Help: "Number of P2P requests",
}, []string{
"p2p_role", // "client" or "server"
"p2p_method",
"result_code",
}),
P2PPayloadByNumber: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "payload_by_number",
Help: "Payload by number requests",
}, []string{
"p2p_role", // "client" or "server"
}),
PayloadsQuarantineTotal: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "payloads_quarantine_total",
Help: "number of unverified execution payloads buffered in quarantine",
}),
SequencerBuildingDiffDurationSeconds: factory.NewHistogram(prometheus.HistogramOpts{ SequencerBuildingDiffDurationSeconds: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: ns, Namespace: ns,
Name: "sequencer_building_diff_seconds", Name: "sequencer_building_diff_seconds",
...@@ -567,6 +614,27 @@ func (m *Metrics) Document() []metrics.DocumentedMetric { ...@@ -567,6 +614,27 @@ func (m *Metrics) Document() []metrics.DocumentedMetric {
return m.factory.Document() return m.factory.Document()
} }
func (m *Metrics) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
if resultCode > 4 { // summarize all high codes to reduce metrics overhead
resultCode = 5
}
code := strconv.FormatUint(uint64(resultCode), 10)
m.P2PReqTotal.WithLabelValues("client", "payload_by_number", code).Inc()
m.P2PReqDurationSeconds.WithLabelValues("client", "payload_by_number", code).Observe(float64(duration) / float64(time.Second))
m.P2PPayloadByNumber.WithLabelValues("client").Set(float64(num))
}
func (m *Metrics) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
code := strconv.FormatUint(uint64(resultCode), 10)
m.P2PReqTotal.WithLabelValues("server", "payload_by_number", code).Inc()
m.P2PReqDurationSeconds.WithLabelValues("server", "payload_by_number", code).Observe(float64(duration) / float64(time.Second))
m.P2PPayloadByNumber.WithLabelValues("server").Set(float64(num))
}
func (m *Metrics) PayloadsQuarantineSize(n int) {
m.PayloadsQuarantineTotal.Set(float64(n))
}
type noopMetricer struct{} type noopMetricer struct{}
var NoopMetrics Metricer = new(noopMetricer) var NoopMetrics Metricer = new(noopMetricer)
...@@ -660,3 +728,12 @@ func (n *noopMetricer) RecordSequencerSealingTime(duration time.Duration) { ...@@ -660,3 +728,12 @@ func (n *noopMetricer) RecordSequencerSealingTime(duration time.Duration) {
func (n *noopMetricer) Document() []metrics.DocumentedMetric { func (n *noopMetricer) Document() []metrics.DocumentedMetric {
return nil return nil
} }
func (n *noopMetricer) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
}
func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) {
}
func (n *noopMetricer) PayloadsQuarantineSize(int) {
}
...@@ -256,7 +256,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error { ...@@ -256,7 +256,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil { if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.runCfg, n.metrics) p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics)
if err != nil || p2pNode == nil { if err != nil || p2pNode == nil {
return err return err
} }
...@@ -373,11 +373,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e ...@@ -373,11 +373,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e
return nil return nil
} }
func (n *OpNode) RequestL2Range(ctx context.Context, start, end uint64) error { func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.rpcSync != nil { if n.rpcSync != nil {
return n.rpcSync.RequestL2Range(ctx, start, end) return n.rpcSync.RequestL2Range(ctx, start, end)
} }
n.log.Debug("ignoring request to sync L2 range, no sync method available") if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
return n.p2pNode.RequestL2Range(ctx, start, end)
}
n.log.Debug("ignoring request to sync L2 range, no sync method available", "start", start, "end", end)
return nil return nil
} }
......
...@@ -166,6 +166,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { ...@@ -166,6 +166,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
UnsafeL2: testutils.RandomL2BlockRef(rng), UnsafeL2: testutils.RandomL2BlockRef(rng),
SafeL2: testutils.RandomL2BlockRef(rng), SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
} }
} }
......
...@@ -73,6 +73,8 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { ...@@ -73,6 +73,8 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
conf.ConnGater = p2p.DefaultConnGater conf.ConnGater = p2p.DefaultConnGater
conf.ConnMngr = p2p.DefaultConnManager conf.ConnMngr = p2p.DefaultConnManager
conf.EnableReqRespSync = ctx.GlobalBool(flags.SyncReqRespFlag.Name)
return conf, nil return conf, nil
} }
......
...@@ -40,6 +40,7 @@ type SetupP2P interface { ...@@ -40,6 +40,7 @@ type SetupP2P interface {
Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint TargetPeers() uint
GossipSetupConfigurables GossipSetupConfigurables
ReqRespSyncEnabled() bool
} }
// Config sets up a p2p host and discv5 service from configuration. // Config sets up a p2p host and discv5 service from configuration.
...@@ -50,6 +51,9 @@ type Config struct { ...@@ -50,6 +51,9 @@ type Config struct {
DisableP2P bool DisableP2P bool
NoDiscovery bool NoDiscovery bool
// Enable P2P-based alt-syncing method (req-resp protocol, not gossip)
AltSync bool
// Pubsub Scoring Parameters // Pubsub Scoring Parameters
PeerScoring pubsub.PeerScoreParams PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams TopicScoring pubsub.TopicScoreParams
...@@ -104,6 +108,8 @@ type Config struct { ...@@ -104,6 +108,8 @@ type Config struct {
ConnGater func(conf *Config) (connmgr.ConnectionGater, error) ConnGater func(conf *Config) (connmgr.ConnectionGater, error)
ConnMngr func(conf *Config) (connmgr.ConnManager, error) ConnMngr func(conf *Config) (connmgr.ConnManager, error)
EnableReqRespSync bool
} }
//go:generate mockery --name ConnectionGater //go:generate mockery --name ConnectionGater
...@@ -166,6 +172,10 @@ func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams { ...@@ -166,6 +172,10 @@ func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams {
return &conf.TopicScoring return &conf.TopicScoring
} }
func (conf *Config) ReqRespSyncEnabled() bool {
return conf.EnableReqRespSync
}
const maxMeshParam = 1000 const maxMeshParam = 1000
func (conf *Config) Check() error { func (conf *Config) Check() error {
......
...@@ -125,7 +125,7 @@ func TestP2PFull(t *testing.T) { ...@@ -125,7 +125,7 @@ func TestP2PFull(t *testing.T) {
runCfgB := &testutils.MockRuntimeConfig{P2PSeqAddress: common.Address{0x42}} runCfgB := &testutils.MockRuntimeConfig{P2PSeqAddress: common.Address{0x42}}
logA := testlog.Logger(t, log.LvlError).New("host", "A") logA := testlog.Logger(t, log.LvlError).New("host", "A")
nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, runCfgA, nil) nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, nil, runCfgA, nil)
require.NoError(t, err) require.NoError(t, err)
defer nodeA.Close() defer nodeA.Close()
...@@ -148,7 +148,7 @@ func TestP2PFull(t *testing.T) { ...@@ -148,7 +148,7 @@ func TestP2PFull(t *testing.T) {
logB := testlog.Logger(t, log.LvlError).New("host", "B") logB := testlog.Logger(t, log.LvlError).New("host", "B")
nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, runCfgB, nil) nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, nil, runCfgB, nil)
require.NoError(t, err) require.NoError(t, err)
defer nodeB.Close() defer nodeB.Close()
hostB := nodeB.Host() hostB := nodeB.Host()
...@@ -277,7 +277,7 @@ func TestDiscovery(t *testing.T) { ...@@ -277,7 +277,7 @@ func TestDiscovery(t *testing.T) {
resourcesCtx, resourcesCancel := context.WithCancel(context.Background()) resourcesCtx, resourcesCancel := context.WithCancel(context.Background())
defer resourcesCancel() defer resourcesCancel()
nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, runCfgA, nil) nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, nil, runCfgA, nil)
require.NoError(t, err) require.NoError(t, err)
defer nodeA.Close() defer nodeA.Close()
hostA := nodeA.Host() hostA := nodeA.Host()
...@@ -292,7 +292,7 @@ func TestDiscovery(t *testing.T) { ...@@ -292,7 +292,7 @@ func TestDiscovery(t *testing.T) {
confB.DiscoveryDB = discDBC confB.DiscoveryDB = discDBC
// Start B // Start B
nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, runCfgB, nil) nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, nil, runCfgB, nil)
require.NoError(t, err) require.NoError(t, err)
defer nodeB.Close() defer nodeB.Close()
hostB := nodeB.Host() hostB := nodeB.Host()
...@@ -307,7 +307,7 @@ func TestDiscovery(t *testing.T) { ...@@ -307,7 +307,7 @@ func TestDiscovery(t *testing.T) {
}}) }})
// Start C // Start C
nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, runCfgC, nil) nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, nil, runCfgC, nil)
require.NoError(t, err) require.NoError(t, err)
defer nodeC.Close() defer nodeC.Close()
hostC := nodeC.Host() hostC := nodeC.Host()
......
...@@ -11,8 +11,10 @@ import ( ...@@ -11,8 +11,10 @@ import (
"github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
p2pmetrics "github.com/libp2p/go-libp2p/core/metrics" p2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -32,16 +34,18 @@ type NodeP2P struct { ...@@ -32,16 +34,18 @@ type NodeP2P struct {
dv5Udp *discover.UDPv5 // p2p discovery service dv5Udp *discover.UDPv5 // p2p discovery service
gs *pubsub.PubSub // p2p gossip router gs *pubsub.PubSub // p2p gossip router
gsOut GossipOut // p2p gossip application interface for publishing gsOut GossipOut // p2p gossip application interface for publishing
syncCl *SyncClient
syncSrv *ReqRespServer
} }
// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil. // NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
// If metrics are configured, a bandwidth monitor will be spawned in a goroutine. // If metrics are configured, a bandwidth monitor will be spawned in a goroutine.
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) { func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) {
if setup == nil { if setup == nil {
return nil, errors.New("p2p node cannot be created without setup") return nil, errors.New("p2p node cannot be created without setup")
} }
var n NodeP2P var n NodeP2P
if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, runCfg, metrics); err != nil { if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics); err != nil {
closeErr := n.Close() closeErr := n.Close()
if closeErr != nil { if closeErr != nil {
log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err) log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err)
...@@ -54,7 +58,7 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. ...@@ -54,7 +58,7 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
return &n, nil return &n, nil
} }
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error { func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
bwc := p2pmetrics.NewBandwidthCounter() bwc := p2pmetrics.NewBandwidthCounter()
var err error var err error
...@@ -73,6 +77,29 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -73,6 +77,29 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.gater = extra.ConnectionGater() n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager() n.connMgr = extra.ConnectionManager()
} }
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
},
DisconnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.RemovePeer(conn.RemotePeer())
},
})
n.syncCl.Start()
// the host may already be connected to peers, add them all to the sync client
for _, peerID := range n.host.Network().Peers() {
n.syncCl.AddPeer(peerID)
}
if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
// register the sync protocol with libp2p host
payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
}
// notify of any new connections/streams/etc. // notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics)) n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
...@@ -104,6 +131,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -104,6 +131,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return nil return nil
} }
func (n *NodeP2P) AltSyncEnabled() bool {
return n.syncCl != nil
}
func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if !n.AltSyncEnabled() {
return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end)
}
return n.syncCl.RequestL2Range(ctx, start, end)
}
func (n *NodeP2P) Host() host.Host { func (n *NodeP2P) Host() host.Host {
return n.host return n.host
} }
...@@ -146,6 +184,8 @@ func (n *NodeP2P) Close() error { ...@@ -146,6 +184,8 @@ func (n *NodeP2P) Close() error {
if err := n.host.Close(); err != nil { if err := n.host.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %w", err)) result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %w", err))
} }
// TODO close sync loop
} }
return result.ErrorOrNil() return result.ErrorOrNil()
} }
......
...@@ -22,6 +22,8 @@ type Prepared struct { ...@@ -22,6 +22,8 @@ type Prepared struct {
HostP2P host.Host HostP2P host.Host
LocalNode *enode.LocalNode LocalNode *enode.LocalNode
UDPv5 *discover.UDPv5 UDPv5 *discover.UDPv5
EnableReqRespSync bool
} }
var _ SetupP2P = (*Prepared)(nil) var _ SetupP2P = (*Prepared)(nil)
...@@ -83,3 +85,7 @@ func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams { ...@@ -83,3 +85,7 @@ func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams {
func (p *Prepared) Disabled() bool { func (p *Prepared) Disabled() bool {
return false return false
} }
func (p *Prepared) ReqRespSyncEnabled() bool {
return p.EnableReqRespSync
}
package p2p
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/golang/snappy"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
// StreamCtxFn provides a new context to use when handling stream requests
type StreamCtxFn func() context.Context
// Note: the mocknet in testing does not support read/write stream timeouts, the timeouts are only applied if available.
// Rate-limits always apply, and are making sure the request/response throughput is not too fast, instead of too slow.
const (
// timeout for opening a req-resp stream to another peer. This may involve some protocol negotiation.
streamTimeout = time.Second * 5
// timeout for writing the request as client. Can be as long as serverReadRequestTimeout
clientWriteRequestTimeout = time.Second * 10
// timeout for reading a response of a serving peer as client. Can be as long as serverWriteChunkTimeout
clientReadResponsetimeout = time.Second * 10
// timeout for reading the request content, deny the request if it cannot be fully read in time
serverReadRequestTimeout = time.Second * 10
// timeout for writing a single response message chunk
// (if a future response consists of multiple chunks, reset the writing timeout per chunk)
serverWriteChunkTimeout = time.Second * 10
// after the rate-limit reservation hits the max throttle delay, give up on serving a request and just close the stream
maxThrottleDelay = time.Second * 20
// Do not serve more than 20 requests per second
globalServerBlocksRateLimit rate.Limit = 20
// Allow up to 5 concurrent requests to be served, eating into our rate-limit
globalServerBlocksBurst = 5
// Do not serve more than 5 requests per second to the same peer, so we can serve other peers at the same time
peerServerBlocksRateLimit rate.Limit = 5
// Allow a peer to burst 3 requests, so it does not have to wait
peerServerBlocksBurst = 3
// If the client hits a request error, it counts as a lot of rate-limit tokens for syncing from that peer:
// we rather sync from other servers. We'll try again later,
// and eventually kick the peer based on degraded scoring if it's really not serving us well.
clientErrRateCost = 100
)
func PayloadByNumberProtocolID(l2ChainID *big.Int) protocol.ID {
return protocol.ID(fmt.Sprintf("/opstack/req/payload_by_number/%d/0", l2ChainID))
}
type requestHandlerFn func(ctx context.Context, log log.Logger, stream network.Stream)
func MakeStreamHandler(resourcesCtx context.Context, log log.Logger, fn requestHandlerFn) network.StreamHandler {
return func(stream network.Stream) {
log := log.New("peer", stream.Conn().ID(), "remote", stream.Conn().RemoteMultiaddr())
defer func() {
if err := recover(); err != nil {
log.Error("p2p server request handling panic", "err", err, "protocol", stream.Protocol())
}
}()
defer stream.Close()
fn(resourcesCtx, log, stream)
}
}
type newStreamFn func(ctx context.Context, peerId peer.ID, protocolId ...protocol.ID) (network.Stream, error)
type receivePayloadFn func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error
type rangeRequest struct {
start uint64
end eth.L2BlockRef
}
type syncResult struct {
payload *eth.ExecutionPayload
peer peer.ID
}
type peerRequest struct {
num uint64
complete *atomic.Bool
}
type SyncClientMetrics interface {
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int)
}
// SyncClient implements a reverse chain sync with a minimal interface:
// signal the desired range, and receive blocks within this range back.
// Through parent-hash verification, received blocks are all ensured to be part of the canonical chain at one point,
// but it is up to the user to organize and process the results further.
//
// For the sync-client to retrieve any data, peers must be added with AddPeer(id), and removed upon disconnect with RemovePeer(id).
// The client is started with Start(), and may be started before or after changing any peers.
//
// ### Stages
//
// The sync mechanism is implemented as following:
// - User sends range request: blocks on sync main loop (with ctx timeout)
// - Main loop processes range request (from high to low), dividing block requests by number between parallel peers.
// - The high part of the range has a known block-hash, and is marked as trusted.
// - Once there are no more peers available for buffering requests, we stop the range request processing.
// - Every request buffered for a peer is tracked as in-flight, by block number.
// - In-flight requests are not repeated
// - Requests for data that's already in the quarantine are not repeated
// - Data already in the quarantine that is trusted is attempted to be promoted.
//
// - Peers each have their own routine for processing requests.
// - They fetch the requested block by number, parse and validate it, and then send it back to the main loop
// - If peers fail to fetch or process it, or fail to send it back to the main loop within timeout,
// then the doRequest returns an error. It then marks the in-flight request as completed.
//
// - Main loop receives results synchronously with the range requests
// - The result is removed from in-flight tracker
// - The result is added to the quarantine
// - If we trust the hash, we try to promote the result.
//
// ### Concepts
//
// The main concepts are:
// - Quarantine: an LRU that stores the latest fetched block data, by hash as well as an extra index by number.
//
// - Quarantine eviction: upon regular LRU eviction, or explicit removal (when we learn data is not canonical),
// the sync result is removed from quarantine without being forwarded to the receiver.
// The peer that provided the data may be down-scored for providing un-utilized data if the data
// is not trusted during eviction.
//
// - Trusted data: data becomes trusted through 2 ways:
// - The hash / parent-hash of the sync target is marked as trusted.
// - The parent-hash of any promoted data is marked as trusted.
//
// - The trusted-data is maintained in LRU: we only care about the recent accessed blocks.
//
// - Result promotion: content from the quarantine is "promoted" when we find the blockhash is trusted.
// The data is removed from the quarantine, and forwarded to the receiver.
//
// ### Usage
//
// The user is expected to request the range of blocks between its existing chain head,
// and a trusted future block-hash as reference to sync towards.
// Upon receiving results from the sync-client, the user should adjust down its sync-target
// based on the received results, to avoid duplicating work when req-requesting an updated range.
// Range requests should still be repeated eventually however, as the sync client will give up on syncing a large range
// when it's too busy syncing.
//
// The rationale for this approach is that this sync mechanism is primarily intended
// for quickly filling gaps between an existing chain and a gossip chain, and not for very long block ranges.
// Syncing in the execution-layer (through snap-sync) is more appropriate for long ranges.
// If the user does sync a long range of blocks through this mechanism,
// it does end up traversing through the chain, but receives the blocks in reverse order.
// It is up to the user to persist the blocks for later processing, or drop & resync them if persistence is limited.
type SyncClient struct {
log log.Logger
cfg *rollup.Config
metrics SyncClientMetrics
newStreamFn newStreamFn
payloadByNumber protocol.ID
sync.Mutex
// syncing worker per peer
peers map[peer.ID]context.CancelFunc
// trusted blocks are, or have been, canonical at one point.
// Everything that's trusted is acceptable to pass to the sync receiver,
// but we target to just sync the blocks of the latest canonical view of the chain.
trusted *simplelru.LRU[common.Hash, struct{}]
// quarantine is a LRU of untrusted results: blocks that could not be verified yet
quarantine *simplelru.LRU[common.Hash, syncResult]
// quarantineByNum indexes the quarantine contents by number.
// No duplicates here, only the latest quarantine write is indexed.
// This map is cleared upon evictions of items from the quarantine LRU
quarantineByNum map[uint64]common.Hash
// inFlight requests are not repeated
inFlight map[uint64]*atomic.Bool
requests chan rangeRequest
peerRequests chan peerRequest
results chan syncResult
resCtx context.Context
resCancel context.CancelFunc
receivePayload receivePayloadFn
wg sync.WaitGroup
}
func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rcv receivePayloadFn, metrics SyncClientMetrics) *SyncClient {
ctx, cancel := context.WithCancel(context.Background())
c := &SyncClient{
log: log,
cfg: cfg,
metrics: metrics,
newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash),
inFlight: make(map[uint64]*atomic.Bool),
requests: make(chan rangeRequest), // blocking
peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128),
resCtx: ctx,
resCancel: cancel,
receivePayload: rcv,
}
// never errors with positive LRU cache size
// TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count,
// we can safely buffer more data in the happy case.
q, _ := simplelru.NewLRU[common.Hash, syncResult](100, c.onQuarantineEvict)
c.quarantine = q
trusted, _ := simplelru.NewLRU[common.Hash, struct{}](10000, nil)
c.trusted = trusted
return c
}
func (s *SyncClient) Start() {
s.wg.Add(1)
go s.mainLoop()
}
func (s *SyncClient) AddPeer(id peer.ID) {
s.Lock()
defer s.Unlock()
if _, ok := s.peers[id]; ok {
s.log.Warn("cannot register peer for sync duties, peer was already registered", "peer", id)
return
}
s.wg.Add(1)
// add new peer routine
ctx, cancel := context.WithCancel(s.resCtx)
s.peers[id] = cancel
go s.peerLoop(ctx, id)
}
func (s *SyncClient) RemovePeer(id peer.ID) {
s.Lock()
defer s.Unlock()
cancel, ok := s.peers[id]
if !ok {
s.log.Warn("cannot remove peer from sync duties, peer was not registered", "peer", id)
return
}
cancel() // once loop exits
delete(s.peers, id)
}
func (s *SyncClient) Close() error {
s.resCancel()
s.wg.Wait()
return nil
}
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if end == (eth.L2BlockRef{}) {
s.log.Debug("P2P sync client received range signal, but cannot sync open-ended chain: need sync target to verify blocks through parent-hashes", "start", start)
return nil
}
// synchronize requests with the main loop for state access
select {
case s.requests <- rangeRequest{start: start.Number, end: end}:
return nil
case <-ctx.Done():
return fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err())
}
}
const (
maxRequestScheduling = time.Second * 3
maxResultProcessing = time.Second * 3
)
func (s *SyncClient) mainLoop() {
defer s.wg.Done()
for {
select {
case req := <-s.requests:
ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling)
s.onRangeRequest(ctx, req)
cancel()
case res := <-s.results:
ctx, cancel := context.WithTimeout(s.resCtx, maxResultProcessing)
s.onResult(ctx, res)
cancel()
case <-s.resCtx.Done():
s.log.Info("stopped P2P req-resp L2 block sync client")
return
}
}
}
// onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state.
// This function transforms requested block ranges into work for each peer.
func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
// add req head to trusted set of blocks
s.trusted.Add(req.end.Hash, struct{}{})
s.trusted.Add(req.end.ParentHash, struct{}{})
log := s.log.New("target", req.start, "end", req.end)
// clean up the completed in-flight requests
for k, v := range s.inFlight {
if v.Load() {
delete(s.inFlight, k)
}
}
// Now try to fetch lower numbers than current end, to traverse back towards the updated start.
for i := uint64(0); ; i++ {
num := req.end.Number - 1 - i
if num <= req.start {
return
}
// check if we have something in quarantine already
if h, ok := s.quarantineByNum[num]; ok {
if s.trusted.Contains(h) { // if we trust it, try to promote it.
s.tryPromote(h)
}
// Don't fetch things that we have a candidate for already.
// We'll evict it from quarantine by finding a conflict, or if we sync enough other blocks
continue
}
if _, ok := s.inFlight[num]; ok {
continue // request still in flight
}
pr := peerRequest{num: num, complete: new(atomic.Bool)}
log.Debug("Scheduling P2P block request", "num", num)
// schedule number
select {
case s.peerRequests <- pr:
s.inFlight[num] = pr.complete
case <-ctx.Done():
log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
default: // peers may all be busy processing requests already
log.Info("no peers ready to handle block requests for more P2P requests for L2 block history", "current", num)
return
}
}
}
func (s *SyncClient) onQuarantineEvict(key common.Hash, value syncResult) {
delete(s.quarantineByNum, uint64(value.payload.BlockNumber))
if s.metrics != nil {
s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
}
if !s.trusted.Contains(key) {
s.log.Debug("evicting untrusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer)
// TODO(CLI-3732): downscore peer for having provided us a bad block that never turned out to be canonical
} else {
s.log.Debug("evicting trusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer)
}
}
func (s *SyncClient) tryPromote(h common.Hash) {
parentRes, ok := s.quarantine.Get(h)
if ok {
// Simply reschedule the result, to get it (and possibly its parents) out of quarantine without recursion
select {
case s.results <- parentRes:
default:
s.log.Debug("failed to signal block for promotion: sync client is too busy", "h", h)
}
} else {
s.log.Debug("cannot find block in quarantine, nothing to promote", "h", h)
}
}
func (s *SyncClient) promote(ctx context.Context, res syncResult) {
s.log.Debug("promoting p2p sync result", "payload", res.payload.ID(), "peer", res.peer)
if err := s.receivePayload(ctx, res.peer, res.payload); err != nil {
s.log.Warn("failed to promote payload, receiver error", "err", err)
return
}
s.trusted.Add(res.payload.BlockHash, struct{}{})
if s.quarantine.Remove(res.payload.BlockHash) {
s.log.Debug("promoted previously p2p-synced block from quarantine to main", "id", res.payload.ID())
} else {
s.log.Debug("promoted new p2p-synced block to main", "id", res.payload.ID())
}
// Mark parent block as trusted, so that we can promote it once we receive it / find it
s.trusted.Add(res.payload.ParentHash, struct{}{})
// Try to promote the parent block too, if any: previous unverifiable data may now be canonical
s.tryPromote(res.payload.ParentHash)
// In case we don't have the parent, and what we have in quarantine is wrong,
// clear what we buffered in favor of fetching something else.
if h, ok := s.quarantineByNum[uint64(res.payload.BlockNumber)-1]; ok {
s.quarantine.Remove(h)
}
}
// onResult is exclusively called by the main loop, and has thus direct access to the request bookkeeping state.
// This function verifies if the result is canonical, and either promotes the result or moves the result into quarantine.
func (s *SyncClient) onResult(ctx context.Context, res syncResult) {
s.log.Debug("processing p2p sync result", "payload", res.payload.ID(), "peer", res.peer)
// Clean up the in-flight request, we have a result now.
delete(s.inFlight, uint64(res.payload.BlockNumber))
// Always put it in quarantine first. If promotion fails because the receiver is too busy, this functions as cache.
s.quarantine.Add(res.payload.BlockHash, res)
s.quarantineByNum[uint64(res.payload.BlockNumber)] = res.payload.BlockHash
if s.metrics != nil {
s.metrics.PayloadsQuarantineSize(s.quarantine.Len())
}
// If we know this block is canonical, then promote it
if s.trusted.Contains(res.payload.BlockHash) {
s.promote(ctx, res)
}
}
// peerLoop for syncing from a single peer
func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
defer func() {
s.Lock()
delete(s.peers, id) // clean up
s.wg.Done()
s.Unlock()
s.log.Debug("stopped syncing loop of peer", "id", id)
}()
log := s.log.New("peer", id)
log.Info("Starting P2P sync client event loop")
var rl rate.Limiter
// Implement the same rate limits as the server does per-peer,
// so we don't be too aggressive to the server.
rl.SetLimit(peerServerBlocksRateLimit)
rl.SetBurst(peerServerBlocksBurst)
for {
// wait for peer to be available for more work
if err := rl.WaitN(ctx, 1); err != nil {
return
}
// once the peer is available, wait for a sync request.
select {
case pr := <-s.peerRequests:
// We already established the peer is available w.r.t. rate-limiting,
// and this is the only loop over this peer, so we can request now.
start := time.Now()
err := s.doRequest(ctx, id, pr.num)
if err != nil {
// mark as complete if there's an error: we are not sending any result and can complete immediately.
pr.complete.Store(true)
log.Warn("failed p2p sync request", "num", pr.num, "err", err)
// If we hit an error, then count it as many requests.
// We'd like to avoid making more requests for a while, to back off.
if err := rl.WaitN(ctx, clientErrRateCost); err != nil {
return
}
} else {
log.Debug("completed p2p sync request", "num", pr.num)
}
took := time.Since(start)
// TODO(CLI-3732): update scores: depending on the speed of the result,
// increase the p2p-sync part of the peer score
// (don't allow the score to grow indefinitely only based on this factor though)
if s.metrics != nil {
resultCode := byte(0)
if err != nil {
if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
} else {
resultCode = 1
}
}
s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took)
}
case <-ctx.Done():
return
}
}
}
type requestResultErr byte
func (r requestResultErr) Error() string {
return fmt.Sprintf("peer failed to serve request with code %d", uint8(r))
}
func (r requestResultErr) ResultCode() byte {
return byte(r)
}
func (s *SyncClient) doRequest(ctx context.Context, id peer.ID, n uint64) error {
// open stream to peer
reqCtx, reqCancel := context.WithTimeout(ctx, streamTimeout)
str, err := s.newStreamFn(reqCtx, id, s.payloadByNumber)
reqCancel()
if err != nil {
return fmt.Errorf("failed to open stream: %w", err)
}
defer str.Close()
// set write timeout (if available)
_ = str.SetWriteDeadline(time.Now().Add(clientWriteRequestTimeout))
if err := binary.Write(str, binary.LittleEndian, n); err != nil {
return fmt.Errorf("failed to write request (%d): %w", n, err)
}
if err := str.CloseWrite(); err != nil {
return fmt.Errorf("failed to close writer side while making request: %w", err)
}
// set read timeout (if available)
_ = str.SetReadDeadline(time.Now().Add(clientReadResponsetimeout))
// Limit input, as well as output.
// Compression may otherwise continue to read ignored data for a small output,
// or output more data than desired (zip-bomb)
r := io.LimitReader(str, maxGossipSize)
var result [1]byte
if _, err := io.ReadFull(r, result[:]); err != nil {
return fmt.Errorf("failed to read result part of response: %w", err)
}
if res := result[0]; res != 0 {
return requestResultErr(res)
}
var versionData [4]byte
if _, err := io.ReadFull(r, versionData[:]); err != nil {
return fmt.Errorf("failed to read version part of response: %w", err)
}
version := binary.LittleEndian.Uint32(versionData[:])
if version != 0 {
return fmt.Errorf("unrecognized ExecutionPayload version: %d", version)
}
// payload is SSZ encoded with Snappy framed compression
r = snappy.NewReader(r)
r = io.LimitReader(r, maxGossipSize)
// We cannot stream straight into the SSZ decoder, since we need the scope of the SSZ payload.
// The server does not prepend it, nor would we trust a claimed length anyway, so we buffer the data we get.
data, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
var res eth.ExecutionPayload
if err := res.UnmarshalSSZ(uint32(len(data)), bytes.NewReader(data)); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}
if err := str.CloseRead(); err != nil {
return fmt.Errorf("failed to close reading side")
}
if err := verifyBlock(&res, n); err != nil {
return fmt.Errorf("received execution payload is invalid: %w", err)
}
select {
case s.results <- syncResult{payload: &res, peer: id}:
case <-ctx.Done():
return fmt.Errorf("failed to process response, sync client is too busy: %w", err)
}
return nil
}
func verifyBlock(payload *eth.ExecutionPayload, expectedNum uint64) error {
// verify L2 block
if expectedNum != uint64(payload.BlockNumber) {
return fmt.Errorf("received execution payload for block %d, but expected block %d", payload.BlockNumber, expectedNum)
}
actual, ok := payload.CheckBlockHash()
if !ok { // payload itself contains bad block hash
return fmt.Errorf("received execution payload for block %d with bad block hash %s, expected %s", expectedNum, payload.BlockHash, actual)
}
return nil
}
// peerStat maintains rate-limiting data of a peer that requests blocks from us.
type peerStat struct {
// Requests tokenizes each request to sync
Requests *rate.Limiter
}
type L2Chain interface {
PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error)
}
type ReqRespServerMetrics interface {
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
}
type ReqRespServer struct {
cfg *rollup.Config
l2 L2Chain
metrics ReqRespServerMetrics
peerRateLimits *simplelru.LRU[peer.ID, *peerStat]
peerStatsLock sync.Mutex
globalRequestsRL *rate.Limiter
}
func NewReqRespServer(cfg *rollup.Config, l2 L2Chain, metrics ReqRespServerMetrics) *ReqRespServer {
// We should never allow over 1000 different peers to churn through quickly,
// so it's fine to prune rate-limit details past this.
peerRateLimits, _ := simplelru.NewLRU[peer.ID, *peerStat](1000, nil)
// 3 sync requests per second, with 2 burst
globalRequestsRL := rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst)
return &ReqRespServer{
cfg: cfg,
l2: l2,
metrics: metrics,
peerRateLimits: peerRateLimits,
globalRequestsRL: globalRequestsRL,
}
}
// HandleSyncRequest is a stream handler function to register the L2 unsafe payloads alt-sync protocol.
// See MakeStreamHandler to transform this into a LibP2P handler function.
//
// Note that the same peer may open parallel streams.
//
// The caller must Close the stream.
func (srv *ReqRespServer) HandleSyncRequest(ctx context.Context, log log.Logger, stream network.Stream) {
// may stay 0 if we fail to decode the request
start := time.Now()
// We wait as long as necessary; we throttle the peer instead of disconnecting,
// unless the delay reaches a threshold that is unreasonable to wait for.
ctx, cancel := context.WithTimeout(ctx, maxThrottleDelay)
req, err := srv.handleSyncRequest(ctx, stream)
cancel()
resultCode := byte(0)
if err != nil {
log.Warn("failed to serve p2p sync request", "req", req, "err", err)
if errors.Is(err, ethereum.NotFound) {
resultCode = 1
} else if errors.Is(err, invalidRequestErr) {
resultCode = 2
} else {
resultCode = 3
}
// try to write error code, so the other peer can understand the reason for failure.
_, _ = stream.Write([]byte{resultCode})
} else {
log.Debug("successfully served sync response", "req", req)
}
if srv.metrics != nil {
srv.metrics.ServerPayloadByNumberEvent(req, 0, time.Since(start))
}
}
var invalidRequestErr = errors.New("invalid request")
func (srv *ReqRespServer) handleSyncRequest(ctx context.Context, stream network.Stream) (uint64, error) {
peerId := stream.Conn().RemotePeer()
// take a token from the global rate-limiter,
// to make sure there's not too much concurrent server work between different peers.
if err := srv.globalRequestsRL.Wait(ctx); err != nil {
return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
}
// find rate limiting data of peer, or add otherwise
srv.peerStatsLock.Lock()
ps, _ := srv.peerRateLimits.Get(peerId)
if ps == nil {
ps = &peerStat{
Requests: rate.NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst),
}
srv.peerRateLimits.Add(peerId, ps)
ps.Requests.Reserve() // count the hit, but make it delay the next request rather than immediately waiting
} else {
// Only wait if it's an existing peer, otherwise the instant rate-limit Wait call always errors.
// If the requester thinks we're taking too long, then it's their problem and they can disconnect.
// We'll disconnect ourselves only when failing to read/write,
// if the work is invalid (range validation), or when individual sub tasks timeout.
if err := ps.Requests.Wait(ctx); err != nil {
return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
}
}
srv.peerStatsLock.Unlock()
// Set read deadline, if available
_ = stream.SetReadDeadline(time.Now().Add(serverReadRequestTimeout))
// Read the request
var req uint64
if err := binary.Read(stream, binary.LittleEndian, &req); err != nil {
return 0, fmt.Errorf("failed to read requested block number: %w", err)
}
if err := stream.CloseRead(); err != nil {
return req, fmt.Errorf("failed to close reading-side of a P2P sync request call: %w", err)
}
// Check the request is within the expected range of blocks
if req < srv.cfg.Genesis.L2.Number {
return req, fmt.Errorf("cannot serve request for L2 block %d before genesis %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr)
}
max, err := srv.cfg.TargetBlockNumber(uint64(time.Now().Unix()))
if err != nil {
return req, fmt.Errorf("cannot determine max target block number to verify request: %w", invalidRequestErr)
}
if req > max {
return req, fmt.Errorf("cannot serve request for L2 block %d after max expected block (%v): %w", req, max, invalidRequestErr)
}
payload, err := srv.l2.PayloadByNumber(ctx, req)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
return req, fmt.Errorf("peer requested unknown block by number: %w", err)
} else {
return req, fmt.Errorf("failed to retrieve payload to serve to peer: %w", err)
}
}
// We set write deadline, if available, to safely write without blocking on a throttling peer connection
_ = stream.SetWriteDeadline(time.Now().Add(serverWriteChunkTimeout))
// 0 - resultCode: success = 0
// 1:5 - version: 0
var tmp [5]byte
if _, err := stream.Write(tmp[:]); err != nil {
return req, fmt.Errorf("failed to write response header data: %w", err)
}
w := snappy.NewBufferedWriter(stream)
if _, err := payload.MarshalSSZ(w); err != nil {
return req, fmt.Errorf("failed to write payload to sync response: %w", err)
}
if err := w.Close(); err != nil {
return req, fmt.Errorf("failed to finishing writing payload to sync response: %w", err)
}
return req, nil
}
package p2p
import (
"context"
"math"
"math/big"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type mockPayloadFn func(n uint64) (*eth.ExecutionPayload, error)
func (fn mockPayloadFn) PayloadByNumber(_ context.Context, number uint64) (*eth.ExecutionPayload, error) {
return fn(number)
}
var _ L2Chain = mockPayloadFn(nil)
func setupSyncTestData(length uint64) (*rollup.Config, map[uint64]*eth.ExecutionPayload, func(i uint64) eth.L2BlockRef) {
// minimal rollup config to build mock blocks & verify their time.
cfg := &rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{Hash: common.Hash{0xaa}},
L2: eth.BlockID{Hash: common.Hash{0xbb}},
L2Time: 9000,
},
BlockTime: 2,
L2ChainID: big.NewInt(1234),
}
// create some simple fake test blocks
payloads := make(map[uint64]*eth.ExecutionPayload)
payloads[0] = &eth.ExecutionPayload{
Timestamp: eth.Uint64Quantity(cfg.Genesis.L2Time),
}
payloads[0].BlockHash, _ = payloads[0].CheckBlockHash()
for i := uint64(1); i <= length; i++ {
payload := &eth.ExecutionPayload{
ParentHash: payloads[i-1].BlockHash,
BlockNumber: eth.Uint64Quantity(i),
Timestamp: eth.Uint64Quantity(cfg.Genesis.L2Time + i*cfg.BlockTime),
}
payload.BlockHash, _ = payload.CheckBlockHash()
payloads[i] = payload
}
l2Ref := func(i uint64) eth.L2BlockRef {
return eth.L2BlockRef{
Hash: payloads[i].BlockHash,
Number: uint64(payloads[i].BlockNumber),
ParentHash: payloads[i].ParentHash,
Time: uint64(payloads[i].Timestamp),
}
}
return cfg, payloads, l2Ref
}
func TestSinglePeerSync(t *testing.T) {
t.Parallel() // Takes a while, but can run in parallel
log := testlog.Logger(t, log.LvlError)
cfg, payloads, l2Ref := setupSyncTestData(25)
// Serving payloads: just load them from the map, if they exist
servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) {
p, ok := payloads[n]
if !ok {
return nil, ethereum.NotFound
}
return p, nil
})
// collect received payloads in a buffered channel, so we can verify we get everything
received := make(chan *eth.ExecutionPayload, 100)
receivePayload := receivePayloadFn(func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error {
received <- payload
return nil
})
// Setup 2 minimal test hosts to attach the sync protocol to
mnet, err := mocknet.FullMeshConnected(2)
require.NoError(t, err, "failed to setup mocknet")
defer mnet.Close()
hosts := mnet.Hosts()
hostA, hostB := hosts[0], hosts[1]
require.Equal(t, hostA.Network().Connectedness(hostB.ID()), network.Connected)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup host A as the server
srv := NewReqRespServer(cfg, servePayload, metrics.NoopMetrics)
payloadByNumber := MakeStreamHandler(ctx, log.New("role", "server"), srv.HandleSyncRequest)
hostA.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
// Setup host B as the client
cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics)
// Setup host B (client) to sync from its peer Host A (server)
cl.AddPeer(hostA.ID())
cl.Start()
defer cl.Close()
// request to start syncing between 10 and 20
require.NoError(t, cl.RequestL2Range(ctx, l2Ref(10), l2Ref(20)))
// and wait for the sync results to come in (in reverse order)
receiveCtx, receiveCancel := context.WithTimeout(ctx, time.Second*5)
defer receiveCancel()
for i := uint64(19); i > 10; i-- {
select {
case p := <-received:
require.Equal(t, uint64(p.BlockNumber), i, "expecting payloads in order")
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
}
func TestMultiPeerSync(t *testing.T) {
t.Parallel() // Takes a while, but can run in parallel
log := testlog.Logger(t, log.LvlError)
cfg, payloads, l2Ref := setupSyncTestData(100)
setupPeer := func(ctx context.Context, h host.Host) (*SyncClient, chan *eth.ExecutionPayload) {
// Serving payloads: just load them from the map, if they exist
servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) {
p, ok := payloads[n]
if !ok {
return nil, ethereum.NotFound
}
return p, nil
})
// collect received payloads in a buffered channel, so we can verify we get everything
received := make(chan *eth.ExecutionPayload, 100)
receivePayload := receivePayloadFn(func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error {
received <- payload
return nil
})
// Setup as server
srv := NewReqRespServer(cfg, servePayload, metrics.NoopMetrics)
payloadByNumber := MakeStreamHandler(ctx, log.New("serve", "payloads_by_number"), srv.HandleSyncRequest)
h.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics)
return cl, received
}
// Setup 3 minimal test hosts to attach the sync protocol to
mnet, err := mocknet.FullMeshConnected(3)
require.NoError(t, err, "failed to setup mocknet")
defer mnet.Close()
hosts := mnet.Hosts()
hostA, hostB, hostC := hosts[0], hosts[1], hosts[2]
require.Equal(t, hostA.Network().Connectedness(hostB.ID()), network.Connected)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clA, recvA := setupPeer(ctx, hostA)
clB, recvB := setupPeer(ctx, hostB)
clC, _ := setupPeer(ctx, hostC)
// Make them all sync from each other
clA.AddPeer(hostB.ID())
clA.AddPeer(hostC.ID())
clA.Start()
defer clA.Close()
clB.AddPeer(hostA.ID())
clB.AddPeer(hostC.ID())
clB.Start()
defer clB.Close()
clC.AddPeer(hostA.ID())
clC.AddPeer(hostB.ID())
clC.Start()
defer clC.Close()
// request to start syncing between 10 and 100
require.NoError(t, clA.RequestL2Range(ctx, l2Ref(10), l2Ref(90)))
// With such large range to request we are going to hit the rate-limits of B and C,
// but that means we'll balance the work between the peers.
// wait for the results to come in, based on the expected rate limit, divided by 2 (because we have 2 servers), with a buffer of 2 seconds
receiveCtx, receiveCancel := context.WithTimeout(ctx, time.Second*time.Duration(math.Ceil(float64((89-10)/peerServerBlocksRateLimit)))/2+time.Second*2)
defer receiveCancel()
for i := uint64(89); i > 10; i-- {
select {
case p := <-recvA:
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
// now see if B can sync a range, and fill the gap with a re-request
bl25 := payloads[25] // temporarily remove it from the available payloads. This will create a gap
delete(payloads, uint64(25))
require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(30)))
for i := uint64(29); i > 25; i-- {
select {
case p := <-recvB:
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
// the request for 25 should fail. See:
// server: WARN peer requested unknown block by number num=25
// client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1"
require.Zero(t, len(recvB), "there is a gap, should not see other payloads yet")
// Add back the block
payloads[25] = bl25
// And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine)
require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(26)))
receiveCtx, receiveCancel = context.WithTimeout(ctx, time.Second*10)
defer receiveCancel()
for i := uint64(25); i > 20; i-- {
select {
case p := <-recvB:
exp, ok := payloads[uint64(p.BlockNumber)]
require.True(t, ok, "expecting known payload")
require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload")
case <-receiveCtx.Done():
t.Fatal("did not receive all expected payloads within expected time")
}
}
}
...@@ -678,22 +678,15 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -678,22 +678,15 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return io.EOF return io.EOF
} }
// GetUnsafeQueueGap retrieves the current [start, end) range (incl. start, excl. end) // UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
// of the gap between the tip of the unsafe priority queue and the unsafe head. func (eq *EngineQueue) UnsafeL2SyncTarget() eth.L2BlockRef {
// If there is no gap, the difference between end and start will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) {
// The start of the gap is always the unsafe head + 1
start = eq.unsafeHead.Number + 1
// If the priority queue is empty, the end is the first block number at the top of the priority queue
// Otherwise, the end is the expected block number
if first := eq.unsafePayloads.Peek(); first != nil { if first := eq.unsafePayloads.Peek(); first != nil {
// Don't include the payload we already have in the sync range ref, err := PayloadToBlockRef(first, &eq.cfg.Genesis)
end = first.ID().Number if err != nil {
return eth.L2BlockRef{}
}
return ref
} else { } else {
// Include the expected payload in the sync range return eth.L2BlockRef{}
end = expectedNumber + 1
} }
return start, end
} }
...@@ -51,7 +51,7 @@ type EngineQueueStage interface { ...@@ -51,7 +51,7 @@ type EngineQueueStage interface {
Finalize(l1Origin eth.L1BlockRef) Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) UnsafeL2SyncTarget() eth.L2BlockRef
Step(context.Context) error Step(context.Context) error
} }
...@@ -167,10 +167,9 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { ...@@ -167,10 +167,9 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload) dp.eng.AddUnsafePayload(payload)
} }
// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. // UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
// If there is no gap, the start and end will be 0. func (dp *DerivationPipeline) UnsafeL2SyncTarget() eth.L2BlockRef {
func (dp *DerivationPipeline) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) { return dp.eng.UnsafeL2SyncTarget()
return dp.eng.GetUnsafeQueueGap(expectedNumber)
} }
// Step tries to progress the buffer. // Step tries to progress the buffer.
......
...@@ -48,7 +48,7 @@ type DerivationPipeline interface { ...@@ -48,7 +48,7 @@ type DerivationPipeline interface {
Reset() Reset()
Step(ctx context.Context) error Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayload) AddUnsafePayload(payload *eth.ExecutionPayload)
GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) UnsafeL2SyncTarget() eth.L2BlockRef
Finalize(ref eth.L1BlockRef) Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef Finalized() eth.L2BlockRef
...@@ -84,12 +84,20 @@ type Network interface { ...@@ -84,12 +84,20 @@ type Network interface {
type AltSync interface { type AltSync interface {
// RequestL2Range informs the sync source that the given range of L2 blocks is missing, // RequestL2Range informs the sync source that the given range of L2 blocks is missing,
// and should be retrieved from any available alternative syncing source. // and should be retrieved from any available alternative syncing source.
// The start of the range is inclusive, the end is exclusive. // The start and end of the range are exclusive:
// the start is the head we already have, the end is the first thing we have queued up.
// It's the task of the alt-sync mechanism to use this hint to fetch the right payloads.
// Note that the end and start may not be consistent: in this case the sync method should fetch older history
//
// If the end value is zeroed, then the sync-method may determine the end free of choice,
// e.g. sync till the chain head meets the wallclock time. This functionality is optional:
// a fixed target to sync towards may be determined by picking up payloads through P2P gossip or other sources.
//
// The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method. // The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method.
// The latest requested range should always take priority over previous requests. // The latest requested range should always take priority over previous requests.
// There may be overlaps in requested ranges. // There may be overlaps in requested ranges.
// An error may be returned if the scheduling fails immediately, e.g. a context timeout. // An error may be returned if the scheduling fails immediately, e.g. a context timeout.
RequestL2Range(ctx context.Context, start, end uint64) error RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
......
...@@ -422,6 +422,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus { ...@@ -422,6 +422,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
UnsafeL2: s.derivation.UnsafeL2Head(), UnsafeL2: s.derivation.UnsafeL2Head(),
SafeL2: s.derivation.SafeL2Head(), SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(), FinalizedL2: s.derivation.Finalized(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
} }
} }
...@@ -489,24 +490,14 @@ type hashAndErrorChannel struct { ...@@ -489,24 +490,14 @@ type hashAndErrorChannel struct {
// WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved. // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
// Results are received through OnUnsafeL2Payload. // Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error { func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
// subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that start := s.derivation.UnsafeL2Head()
// difference by the block time to get the expected L2 block number at the current time. If the end := s.derivation.UnsafeL2SyncTarget()
// unsafe head does not have this block number, then there is a gap in the queue. // Check if we have missing blocks between the start and end. Request them if we do.
wallClock := uint64(time.Now().Unix()) if end == (eth.L2BlockRef{}) {
genesisTimestamp := s.config.Genesis.L2Time s.log.Debug("requesting sync with open-end range", "start", start)
if wallClock < genesisTimestamp { return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{})
s.log.Debug("nothing to sync, did not reach genesis L2 time yet", "genesis", genesisTimestamp) } else if end.Number > start.Number+1 {
return nil s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number)
}
wallClockGenesisDiff := wallClock - genesisTimestamp
// Note: round down, we should not request blocks into the future.
blocksSinceGenesis := wallClockGenesisDiff / s.config.BlockTime
expectedL2Block := s.config.Genesis.L2.Number + blocksSinceGenesis
start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block)
// Check if there is a gap between the unsafe head and the expected L2 block number at the current time.
if end > start {
s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end-start)
return s.altSync.RequestL2Range(ctx, start, end) return s.altSync.RequestL2Range(ctx, start, end)
} }
return nil return nil
......
...@@ -116,6 +116,20 @@ func (cfg *Config) ValidateL2Config(ctx context.Context, client L2Client) error ...@@ -116,6 +116,20 @@ func (cfg *Config) ValidateL2Config(ctx context.Context, client L2Client) error
return nil return nil
} }
func (cfg *Config) TargetBlockNumber(timestamp uint64) (num uint64, err error) {
// subtract genesis time from timestamp to get the time elapsed since genesis, and then divide that
// difference by the block time to get the expected L2 block number at the current time. If the
// unsafe head does not have this block number, then there is a gap in the queue.
genesisTimestamp := cfg.Genesis.L2Time
if timestamp < genesisTimestamp {
return 0, fmt.Errorf("did not reach genesis time (%d) yet", genesisTimestamp)
}
wallClockGenesisDiff := timestamp - genesisTimestamp
// Note: round down, we should not request blocks into the future.
blocksSinceGenesis := wallClockGenesisDiff / cfg.BlockTime
return cfg.Genesis.L2.Number + blocksSinceGenesis, nil
}
type L1Client interface { type L1Client interface {
ChainID(context.Context) (*big.Int, error) ChainID(context.Context) (*big.Int, error)
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
......
...@@ -32,9 +32,10 @@ type RPCSync interface { ...@@ -32,9 +32,10 @@ type RPCSync interface {
// Start starts an additional worker syncing job // Start starts an additional worker syncing job
Start() error Start() error
// RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface. // RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface.
RequestL2Range(ctx context.Context, start, end uint64) error RequestL2Range(ctx context.Context, start uint64, end eth.L2BlockRef) error
} }
// SyncClient implements the driver AltSync interface, including support for fetching an open-ended chain of L2 blocks.
type SyncClient struct { type SyncClient struct {
*L2Client *L2Client
...@@ -88,7 +89,7 @@ func (s *SyncClient) Close() error { ...@@ -88,7 +89,7 @@ func (s *SyncClient) Close() error {
return nil return nil
} }
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error { func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
// Drain previous requests now that we have new information // Drain previous requests now that we have new information
for len(s.requests) > 0 { for len(s.requests) > 0 {
select { // in case requests is being read at the same time, don't block on draining it. select { // in case requests is being read at the same time, don't block on draining it.
...@@ -98,11 +99,23 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro ...@@ -98,11 +99,23 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro
} }
} }
endNum := end.Number
if end == (eth.L2BlockRef{}) {
n, err := s.rollupCfg.TargetBlockNumber(uint64(time.Now().Unix()))
if err != nil {
return err
}
if n <= start.Number {
return nil
}
endNum = n
}
// TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method. // TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.
s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start) s.log.Info("Scheduling to fetch trailing missing payloads from backup RPC", "start", start, "end", endNum, "size", endNum-start.Number-1)
for i := start; i < end; i++ { for i := start.Number + 1; i < endNum; i++ {
select { select {
case s.requests <- i: case s.requests <- i:
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -57,6 +57,8 @@ and are adopted by several other blockchains, most notably the [L1 consensus lay ...@@ -57,6 +57,8 @@ and are adopted by several other blockchains, most notably the [L1 consensus lay
- [Block validation](#block-validation) - [Block validation](#block-validation)
- [Block processing](#block-processing) - [Block processing](#block-processing)
- [Block topic scoring parameters](#block-topic-scoring-parameters) - [Block topic scoring parameters](#block-topic-scoring-parameters)
- [Req-Resp](#req-resp)
- [`payload_by_number`](#payload_by_number)
<!-- END doctoc generated TOC please keep comment here to allow auto update --> <!-- END doctoc generated TOC please keep comment here to allow auto update -->
...@@ -305,12 +307,97 @@ A node may apply the block to their local engine ahead of L1 availability, if it ...@@ -305,12 +307,97 @@ A node may apply the block to their local engine ahead of L1 availability, if it
TODO: GossipSub per-topic scoring to fine-tune incentives for ideal propagation delay and bandwidth usage. TODO: GossipSub per-topic scoring to fine-tune incentives for ideal propagation delay and bandwidth usage.
## Req-Resp
The op-node implements a similar request-response encoding for its sync protocols as the L1 ethereum Beacon-Chain.
See [L1 P2P-interface req-resp specification][eth2-p2p-reqresp] and [Altair P2P update][eth2-p2p-altair-reqresp].
However, the protocol is simplified, to avoid several issues seen in L1:
- Error strings in responses, if there is any alternative response,
should not need to be compressed or have an artificial global length limit.
- Payload lengths should be fixed-length: byte-by-byte uvarint reading from the underlying stream is undesired.
- `<context-bytes>` are relaxed to encode a `uint32`, rather than a beacon-chain `ForkDigest`.
- Payload-encoding may change per hardfork, so is not part of the protocol-ID.
- Usage of response-chunks is specific to the req-resp method: most basic req-resp does not need chunked responses.
- Compression is encouraged to be part of the payload-encoding, specific to the req-resp method, where necessary:
pings and such do not need streaming frame compression etc.
And the protocol ID format follows the same scheme as L1,
except the trailing encoding schema part, which is now message-specific:
```text
/ProtocolPrefix/MessageName/SchemaVersion/
```
The req-resp protocols served by the op-node all have `/ProtocolPrefix` set to `/opstack/req`.
Individual methods may include the chain ID as part of the `/MessageName` segment,
so it's immediately clear which chain the method applies to, if the communication is chain-specific.
Other methods may include chain-information in the request and/or response data,
such as the `ForkDigest` `<context-bytes>` in L1 beacon chain req-resp protocols.
Each segment starts with a `/`, and may contain multiple `/`, and the final protocol ID is suffixed with a `/`.
### `payload_by_number`
This is an optional chain syncing method, to request/serve execution payloads by number.
This serves as a method to fill gaps upon missed gossip, and sync short to medium ranges of unsafe L2 blocks.
Protocol ID: `/opstack/req/payload_by_number/<chain-id>/0/`
- `/MessageName` is `/block_by_number/<chain-id>` where `<chain-id>` is set to the op-node L2 chain ID.
- `/SchemaVersion` is `/0`
Request format: `<num>`: a little-endian `uint64` - the block number to request.
Response format: `<response> = <res><version><payload>`
- `<res>` is a byte code describing the result.
- `0` on success, `<version><payload>` should follow.
- `1` if valid request, but unavailable payload.
- `2` if invalid request
- `3+` if other error
- The `>= 128` range is reserved for future use.
- `<version>` is a little-endian `uint32`, identifying the type of `ExecutionPayload` (fork-specific)
- `<payload>` is an encoded block, read till stream EOF.
The input of `<response>` should be limited, as well as any generated decompressed output,
to avoid unexpected resource usage or zip-bomb type attacks.
A 10 MB limit is recommended, to ensure all blocks may be synced.
Implementations may opt for a different limit, since this sync method is optional.
`<version>` list:
- `0`: SSZ-encoded `ExecutionPayload`, with Snappy framing compression,
matching the `ExecutionPayload` SSZ definition of the L1 Merge, L2 Bedrock and L2 Regolith versions.
- Other versions may be listed here with future network upgrades, such as the L1 Shanghai upgrade.
The request is by block-number, enabling parallel fetching of a chain across many peers.
A `res = 0` response should be verified to:
- Have a block-number matching the requested block number.
- Have a consistent `blockhash` w.r.t. the other block contents.
- Build towards a known canonical block.
- This can be verified by checking if the parent-hash of a previous trusted canonical block matches
that of the verified hash of the retrieved block.
- For unsafe blocks this may be relaxed to verification against the parent-hash of any previously trusted block:
- The gossip validation process limits the amount of blocks that may be trusted to sync towards.
- The unsafe blocks should be queued for processing, the latest received L2 unsafe blocks should always
override any previous chain, until the final L2 chain can be reproduced from L1 data.
A `res > 0` response code should not be accepted. The result code is helpful for debugging,
but the client should regard any error like any any other unanswered request, as the responding peer cannot be trusted.
---- ----
[libp2p]: https://libp2p.io/ [libp2p]: https://libp2p.io/
[discv5]: https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md [discv5]: https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md
[discv5-random-nodes]: https://pkg.go.dev/github.com/ethereum/go-ethereum@v1.10.12/p2p/discover#UDPv5.RandomNodes [discv5-random-nodes]: https://pkg.go.dev/github.com/ethereum/go-ethereum@v1.10.12/p2p/discover#UDPv5.RandomNodes
[eth2-p2p]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md [eth2-p2p]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md
[eth2-p2p-reqresp]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain
[eth2-p2p-altair-reqresp]: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/p2p-interface.md#the-reqresp-domain
[libp2p-noise]: https://github.com/libp2p/specs/tree/master/noise [libp2p-noise]: https://github.com/libp2p/specs/tree/master/noise
[multistream-select]: https://github.com/multiformats/multistream-select/ [multistream-select]: https://github.com/multiformats/multistream-select/
[mplex]: https://github.com/libp2p/specs/tree/master/mplex [mplex]: https://github.com/libp2p/specs/tree/master/mplex
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment