gossip.go 24.3 KB
Newer Older
1 2 3 4 5 6 7
package p2p

import (
	"bytes"
	"context"
	"crypto/sha256"
	"encoding/binary"
8
	"errors"
9 10 11 12 13
	"fmt"
	"sync"
	"time"

	"github.com/golang/snappy"
14
	lru "github.com/hashicorp/golang-lru/v2"
15 16
	pubsub "github.com/libp2p/go-libp2p-pubsub"
	pb "github.com/libp2p/go-libp2p-pubsub/pb"
17 18
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/peer"
19 20 21 22

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/crypto"
	"github.com/ethereum/go-ethereum/log"
23

24
	"github.com/ethereum-optimism/optimism/op-node/rollup"
25
	"github.com/ethereum-optimism/optimism/op-service/eth"
26
)
27

28 29 30 31 32 33 34 35 36 37
const (
	// maxGossipSize limits the total size of gossip RPC containers as well as decompressed individual messages.
	maxGossipSize = 10 * (1 << 20)
	// minGossipSize is used to make sure that there is at least some data to validate the signature against.
	minGossipSize          = 66
	maxOutboundQueue       = 256
	maxValidateQueue       = 256
	globalValidateThrottle = 512
	gossipHeartbeat        = 500 * time.Millisecond
	// seenMessagesTTL limits the duration that message IDs are remembered for gossip deduplication purposes
38 39
	// 130 * gossipHeartbeat
	seenMessagesTTL  = 130 * gossipHeartbeat
40 41 42 43
	DefaultMeshD     = 8  // topic stable mesh target count
	DefaultMeshDlo   = 6  // topic stable mesh low watermark
	DefaultMeshDhi   = 12 // topic stable mesh high watermark
	DefaultMeshDlazy = 6  // gossip target
44
	// peerScoreInspectFrequency is the frequency at which peer scores are inspected
45
	peerScoreInspectFrequency = 15 * time.Second
46
)
47 48 49 50 51 52 53

// Message domains, the msg id function uncompresses to keep data monomorphic,
// but invalid compressed data will need a unique different id.

var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}

54
type GossipSetupConfigurables interface {
55
	PeerScoringParams() *ScoringParams
56 57
	// ConfigureGossip creates configuration options to apply to the GossipSub setup
	ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option
58 59
}

60 61 62 63
type GossipRuntimeConfig interface {
	P2PSequencerAddress() common.Address
}

Andreas Bigger's avatar
Andreas Bigger committed
64
//go:generate mockery --name GossipMetricer
Matthew Slipper's avatar
Matthew Slipper committed
65 66 67 68
type GossipMetricer interface {
	RecordGossipEvent(evType int32)
}

69 70 71 72
func blocksTopicV1(cfg *rollup.Config) string {
	return fmt.Sprintf("/optimism/%s/0/blocks", cfg.L2ChainID.String())
}

Danyal Prout's avatar
Danyal Prout committed
73 74 75 76
func blocksTopicV2(cfg *rollup.Config) string {
	return fmt.Sprintf("/optimism/%s/1/blocks", cfg.L2ChainID.String())
}

77 78 79 80
func blocksTopicV3(cfg *rollup.Config) string {
	return fmt.Sprintf("/optimism/%s/2/blocks", cfg.L2ChainID.String())
}

81 82 83
// BuildSubscriptionFilter builds a simple subscription filter,
// to help protect against peers spamming useless subscriptions.
func BuildSubscriptionFilter(cfg *rollup.Config) pubsub.SubscriptionFilter {
84
	return pubsub.NewAllowlistSubscriptionFilter(blocksTopicV1(cfg), blocksTopicV2(cfg), blocksTopicV3(cfg)) // add more topics here in the future, if any.
85 86 87 88
}

var msgBufPool = sync.Pool{New: func() any {
	// note: the topic validator concurrency is limited, so pool won't blow up, even with large pre-allocation.
89
	x := make([]byte, 0, maxGossipSize)
90 91 92 93 94 95 96 97 98 99 100 101 102
	return &x
}}

// BuildMsgIdFn builds a generic message ID function for gossipsub that can handle compressed payloads,
// mirroring the eth2 p2p gossip spec.
func BuildMsgIdFn(cfg *rollup.Config) pubsub.MsgIdFunction {
	return func(pmsg *pb.Message) string {
		valid := false
		var data []byte
		// If it's a valid compressed snappy data, then hash the uncompressed contents.
		// The validator can throw away the message later when recognized as invalid,
		// and the unique hash helps detect duplicates.
		dLen, err := snappy.DecodedLen(pmsg.Data)
103
		if err == nil && dLen <= maxGossipSize {
104 105
			res := msgBufPool.Get().(*[]byte)
			defer msgBufPool.Put(res)
106 107 108 109 110
			if data, err = snappy.Decode((*res)[:cap(*res)], pmsg.Data); err == nil {
				if cap(data) > cap(*res) {
					// if we ended up growing the slice capacity, fine, keep the larger one.
					*res = data[:cap(data)]
				}
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
				valid = true
			}
		}
		if data == nil {
			data = pmsg.Data
		}
		h := sha256.New()
		if valid {
			h.Write(MessageDomainValidSnappy[:])
		} else {
			h.Write(MessageDomainInvalidSnappy[:])
		}
		// The chain ID is part of the gossip topic, making the msg id unique
		topic := pmsg.GetTopic()
		var topicLen [8]byte
		binary.LittleEndian.PutUint64(topicLen[:], uint64(len(topic)))
		h.Write(topicLen[:])
		h.Write([]byte(topic))
		h.Write(data)
		// the message ID is shortened to save space, a lot of these may be gossiped.
		return string(h.Sum(nil)[:20])
	}
}

135 136 137 138
func (p *Config) ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option {
	params := BuildGlobalGossipParams(rollupCfg)

	// override with CLI changes
139 140 141 142 143 144 145
	params.D = p.MeshD
	params.Dlo = p.MeshDLo
	params.Dhi = p.MeshDHi
	params.Dlazy = p.MeshDLazy

	// in the future we may add more advanced options like scoring and PX / direct-mesh / episub
	return []pubsub.Option{
146
		pubsub.WithGossipSubParams(params),
147 148 149 150
		pubsub.WithFloodPublish(p.FloodPublish),
	}
}

151 152
func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
	params := pubsub.DefaultGossipSubParams()
153 154 155 156
	params.D = DefaultMeshD                    // topic stable mesh target count
	params.Dlo = DefaultMeshDlo                // topic stable mesh low watermark
	params.Dhi = DefaultMeshDhi                // topic stable mesh high watermark
	params.Dlazy = DefaultMeshDlazy            // gossip target
157 158 159 160
	params.HeartbeatInterval = gossipHeartbeat // interval of heartbeat
	params.FanoutTTL = 24 * time.Second        // ttl for fanout maps for topics we are not subscribed to but have published to
	params.HistoryLength = 12                  // number of windows to retain full messages in cache for IWANT responses
	params.HistoryGossip = 3                   // number of windows to gossip about
161 162 163 164

	return params
}

165 166
// NewGossipSub configures a new pubsub instance with the specified parameters.
// PubSub uses a GossipSubRouter as it's router under the hood.
167
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, scorer Scorer, m GossipMetricer, log log.Logger) (*pubsub.PubSub, error) {
168 169 170 171
	denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
	if err != nil {
		return nil, err
	}
172
	gossipOpts := []pubsub.Option{
173 174 175 176 177 178 179 180
		pubsub.WithMaxMessageSize(maxGossipSize),
		pubsub.WithMessageIdFn(BuildMsgIdFn(cfg)),
		pubsub.WithNoAuthor(),
		pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
		pubsub.WithSubscriptionFilter(BuildSubscriptionFilter(cfg)),
		pubsub.WithValidateQueueSize(maxValidateQueue),
		pubsub.WithPeerOutboundQueueSize(maxOutboundQueue),
		pubsub.WithValidateThrottle(globalValidateThrottle),
181
		pubsub.WithSeenMessagesTTL(seenMessagesTTL),
182 183
		pubsub.WithPeerExchange(false),
		pubsub.WithBlacklist(denyList),
184
		pubsub.WithEventTracer(&gossipTracer{m: m}),
185
	}
186
	gossipOpts = append(gossipOpts, ConfigurePeerScoring(gossipConf, scorer, log)...)
187
	gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(cfg)...)
188
	return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
189 190
}

191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
func validationResultString(v pubsub.ValidationResult) string {
	switch v {
	case pubsub.ValidationAccept:
		return "ACCEPT"
	case pubsub.ValidationIgnore:
		return "IGNORE"
	case pubsub.ValidationReject:
		return "REJECT"
	default:
		return fmt.Sprintf("UNKNOWN_%d", v)
	}
}

func logValidationResult(self peer.ID, msg string, log log.Logger, fn pubsub.ValidatorEx) pubsub.ValidatorEx {
	return func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
		res := fn(ctx, id, message)
207
		var src any
208 209 210 211 212 213 214 215 216
		src = id
		if id == self {
			src = "self"
		}
		log.Debug(msg, "result", validationResultString(res), "from", src)
		return res
	}
}

217 218 219 220 221 222 223 224 225 226 227 228
func guardGossipValidator(log log.Logger, fn pubsub.ValidatorEx) pubsub.ValidatorEx {
	return func(ctx context.Context, id peer.ID, message *pubsub.Message) (result pubsub.ValidationResult) {
		defer func() {
			if err := recover(); err != nil {
				log.Error("gossip validation panic", "err", err, "peer", id)
				result = pubsub.ValidationReject
			}
		}()
		return fn(ctx, id, message)
	}
}

229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
type seenBlocks struct {
	sync.Mutex
	blockHashes []common.Hash
}

// hasSeen checks if the hash has been marked as seen, and how many have been seen.
func (sb *seenBlocks) hasSeen(h common.Hash) (count int, hasSeen bool) {
	sb.Lock()
	defer sb.Unlock()
	for _, prev := range sb.blockHashes {
		if prev == h {
			return len(sb.blockHashes), true
		}
	}
	return len(sb.blockHashes), false
}

// markSeen marks the block hash as seen
func (sb *seenBlocks) markSeen(h common.Hash) {
	sb.Lock()
	defer sb.Unlock()
	sb.blockHashes = append(sb.blockHashes, h)
}

Danyal Prout's avatar
Danyal Prout committed
253
func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, blockVersion eth.BlockVersion) pubsub.ValidatorEx {
254 255 256

	// Seen block hashes per block height
	// uint64 -> *seenBlocks
257
	blockHeightLRU, err := lru.New[uint64, *seenBlocks](1000)
258
	if err != nil {
259
		panic(fmt.Errorf("failed to set up block height LRU cache: %w", err))
260 261 262 263 264 265 266 267 268 269 270 271 272
	}

	return func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
		// [REJECT] if the compression is not valid
		outLen, err := snappy.DecodedLen(message.Data)
		if err != nil {
			log.Warn("invalid snappy compression length data", "err", err, "peer", id)
			return pubsub.ValidationReject
		}
		if outLen > maxGossipSize {
			log.Warn("possible snappy zip bomb, decoded length is too large", "decoded_length", outLen, "peer", id)
			return pubsub.ValidationReject
		}
273 274 275 276
		if outLen < minGossipSize {
			log.Warn("rejecting undersized gossip payload")
			return pubsub.ValidationReject
		}
277 278 279

		res := msgBufPool.Get().(*[]byte)
		defer msgBufPool.Put(res)
280
		data, err := snappy.Decode((*res)[:cap(*res)], message.Data)
281 282 283 284
		if err != nil {
			log.Warn("invalid snappy compression", "err", err, "peer", id)
			return pubsub.ValidationReject
		}
285 286 287 288
		// if we ended up growing the slice capacity, fine, keep the larger one.
		if cap(data) > cap(*res) {
			*res = data[:cap(data)]
		}
289 290 291 292

		// message starts with compact-encoding secp256k1 encoded signature
		signatureBytes, payloadBytes := data[:65], data[65:]

293
		// [REJECT] if the signature by the sequencer is not valid
294 295 296
		result := verifyBlockSignature(log, cfg, runCfg, id, signatureBytes, payloadBytes)
		if result != pubsub.ValidationAccept {
			return result
297 298
		}

299 300
		var envelope eth.ExecutionPayloadEnvelope

301
		// [REJECT] if the block encoding is not valid
302 303 304 305 306 307 308 309 310 311 312 313
		if blockVersion == eth.BlockV3 {
			if err := envelope.UnmarshalSSZ(uint32(len(payloadBytes)), bytes.NewReader(payloadBytes)); err != nil {
				log.Warn("invalid envelope payload", "err", err, "peer", id)
				return pubsub.ValidationReject
			}
		} else {
			var payload eth.ExecutionPayload
			if err := payload.UnmarshalSSZ(blockVersion, uint32(len(payloadBytes)), bytes.NewReader(payloadBytes)); err != nil {
				log.Warn("invalid execution payload", "err", err, "peer", id)
				return pubsub.ValidationReject
			}
			envelope = eth.ExecutionPayloadEnvelope{ExecutionPayload: &payload}
314 315
		}

316 317
		payload := envelope.ExecutionPayload

318 319 320
		// rounding down to seconds is fine here.
		now := uint64(time.Now().Unix())

321 322
		// [REJECT] if the `payload.timestamp` is older than 60 seconds in the past
		if uint64(payload.Timestamp) < now-60 {
323 324 325 326 327 328 329 330 331 332 333
			log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp))
			return pubsub.ValidationReject
		}

		// [REJECT] if the `payload.timestamp` is more than 5 seconds into the future
		if uint64(payload.Timestamp) > now+5 {
			log.Warn("payload is too new", "timestamp", uint64(payload.Timestamp))
			return pubsub.ValidationReject
		}

		// [REJECT] if the `block_hash` in the `payload` is not valid
334
		if actual, ok := envelope.CheckBlockHash(); !ok {
335 336 337 338
			log.Warn("payload has bad block hash", "bad_hash", payload.BlockHash.String(), "actual", actual.String())
			return pubsub.ValidationReject
		}

Danyal Prout's avatar
Danyal Prout committed
339
		// [REJECT] if a V1 Block has withdrawals
340
		if !blockVersion.HasWithdrawals() && payload.Withdrawals != nil {
Danyal Prout's avatar
Danyal Prout committed
341 342 343 344
			log.Warn("payload is on v1 topic, but has withdrawals", "bad_hash", payload.BlockHash.String())
			return pubsub.ValidationReject
		}

345
		// [REJECT] if a >= V2 Block does not have withdrawals
346 347
		if blockVersion.HasWithdrawals() && payload.Withdrawals == nil {
			log.Warn("payload is on v2/v3 topic, but does not have withdrawals", "bad_hash", payload.BlockHash.String())
Danyal Prout's avatar
Danyal Prout committed
348 349 350
			return pubsub.ValidationReject
		}

351
		// [REJECT] if a >= V2 Block has non-empty withdrawals
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
		if blockVersion.HasWithdrawals() && len(*payload.Withdrawals) != 0 {
			log.Warn("payload is on v2/v3 topic, but has non-empty withdrawals", "bad_hash", payload.BlockHash.String(), "withdrawal_count", len(*payload.Withdrawals))
			return pubsub.ValidationReject
		}

		// [REJECT] if the block is on a topic <= V2 and has a blob gas value set
		if !blockVersion.HasBlobProperties() && payload.BlobGasUsed != nil {
			log.Warn("payload is on v1/v2 topic, but has blob gas used", "bad_hash", payload.BlockHash.String())
			return pubsub.ValidationReject
		}

		// [REJECT] if the block is on a topic <= V2 and has an excess blob gas value set
		if !blockVersion.HasBlobProperties() && payload.ExcessBlobGas != nil {
			log.Warn("payload is on v1/v2 topic, but has excess blob gas", "bad_hash", payload.BlockHash.String())
			return pubsub.ValidationReject
		}

		if blockVersion.HasBlobProperties() {
			// [REJECT] if the block is on a topic >= V3 and has a blob gas used value that is not zero
371
			if payload.BlobGasUsed == nil || *payload.BlobGasUsed != 0 {
372 373 374 375 376
				log.Warn("payload is on v3 topic, but has non-zero blob gas used", "bad_hash", payload.BlockHash.String(), "blob_gas_used", payload.BlobGasUsed)
				return pubsub.ValidationReject
			}

			// [REJECT] if the block is on a topic >= V3 and has an excess blob gas value that is not zero
377
			if payload.ExcessBlobGas == nil || *payload.ExcessBlobGas != 0 {
378 379 380 381 382 383 384 385
				log.Warn("payload is on v3 topic, but has non-zero excess blob gas", "bad_hash", payload.BlockHash.String(), "excess_blob_gas", payload.ExcessBlobGas)
				return pubsub.ValidationReject
			}
		}

		// [REJECT] if the block is on a topic >= V3 and the parent beacon block root is nil
		if blockVersion.HasParentBeaconBlockRoot() && envelope.ParentBeaconBlockRoot == nil {
			log.Warn("payload is on v3 topic, but has nil parent beacon block root", "bad_hash", payload.BlockHash.String())
386 387 388
			return pubsub.ValidationReject
		}

389 390 391 392 393 394
		seen, ok := blockHeightLRU.Get(uint64(payload.BlockNumber))
		if !ok {
			seen = new(seenBlocks)
			blockHeightLRU.Add(uint64(payload.BlockNumber), seen)
		}

395
		if count, hasSeen := seen.hasSeen(payload.BlockHash); count > 5 {
396 397 398 399 400 401 402 403 404 405 406
			// [REJECT] if more than 5 blocks have been seen with the same block height
			log.Warn("seen too many different blocks at same height", "height", payload.BlockNumber)
			return pubsub.ValidationReject
		} else if hasSeen {
			// [IGNORE] if the block has already been seen
			log.Warn("validated already seen message again")
			return pubsub.ValidationIgnore
		}

		// mark it as seen. (note: with concurrent validation more than 5 blocks may be marked as seen still,
		// but validator concurrency is limited anyway)
407
		seen.markSeen(payload.BlockHash)
408 409

		// remember the decoded payload for later usage in topic subscriber.
410
		message.ValidatorData = &envelope
411 412 413 414
		return pubsub.ValidationAccept
	}
}

415
func verifyBlockSignature(log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, id peer.ID, signatureBytes []byte, payloadBytes []byte) pubsub.ValidationResult {
416
	signingHash, err := BlockSigningHash(cfg, payloadBytes)
417
	if err != nil {
418
		log.Warn("failed to compute block signing hash", "err", err, "peer", id)
419 420 421 422 423
		return pubsub.ValidationReject
	}

	pub, err := crypto.SigToPub(signingHash[:], signatureBytes)
	if err != nil {
424
		log.Warn("invalid block signature", "err", err, "peer", id)
425 426 427 428 429 430 431 432 433 434
		return pubsub.ValidationReject
	}
	addr := crypto.PubkeyToAddress(*pub)

	// In the future we may load & validate block metadata before checking the signature.
	// And then check the signer based on the metadata, to support e.g. multiple p2p signers at the same time.
	// For now we only have one signer at a time and thus check the address directly.
	// This means we may drop old payloads upon key rotation,
	// but this can be recovered from like any other missed unsafe payload.
	if expected := runCfg.P2PSequencerAddress(); expected == (common.Address{}) {
435
		log.Warn("no configured p2p sequencer address, ignoring gossiped block", "peer", id, "addr", addr)
436 437
		return pubsub.ValidationIgnore
	} else if addr != expected {
438
		log.Warn("unexpected block author", "err", err, "peer", id, "addr", addr, "expected", expected)
439 440 441 442 443
		return pubsub.ValidationReject
	}
	return pubsub.ValidationAccept
}

444
type GossipIn interface {
445
	OnUnsafeL2Payload(ctx context.Context, from peer.ID, msg *eth.ExecutionPayloadEnvelope) error
446 447 448
}

type GossipTopicInfo interface {
Danyal Prout's avatar
Danyal Prout committed
449 450 451
	AllBlockTopicsPeers() []peer.ID
	BlocksTopicV1Peers() []peer.ID
	BlocksTopicV2Peers() []peer.ID
452
	BlocksTopicV3Peers() []peer.ID
453 454 455 456
}

type GossipOut interface {
	GossipTopicInfo
457
	PublishL2Payload(ctx context.Context, msg *eth.ExecutionPayloadEnvelope, signer Signer) error
458 459 460
	Close() error
}

Danyal Prout's avatar
Danyal Prout committed
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
type blockTopic struct {
	// blocks topic, main handle on block gossip
	topic *pubsub.Topic
	// block events handler, to be cancelled before closing the blocks topic.
	events *pubsub.TopicEventHandler
	// block subscriptions, to be cancelled before closing blocks topic.
	sub *pubsub.Subscription
}

func (bt *blockTopic) Close() error {
	bt.events.Cancel()
	bt.sub.Cancel()
	return bt.topic.Close()
}

476
type publisher struct {
477 478 479
	log log.Logger
	cfg *rollup.Config

480 481 482
	// p2pCancel cancels the downstream gossip event-handling functions, independent of the sources.
	// A closed gossip event source (event handler or subscription) does not stop any open event iteration,
	// thus we have to stop it ourselves this way.
483 484
	p2pCancel context.CancelFunc

Danyal Prout's avatar
Danyal Prout committed
485 486
	blocksV1 *blockTopic
	blocksV2 *blockTopic
487
	blocksV3 *blockTopic
488 489

	runCfg GossipRuntimeConfig
490 491 492 493
}

var _ GossipOut = (*publisher)(nil)

Danyal Prout's avatar
Danyal Prout committed
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
func combinePeers(allPeers ...[]peer.ID) []peer.ID {
	var seen = make(map[peer.ID]bool)
	var res []peer.ID
	for _, peers := range allPeers {
		for _, p := range peers {
			if _, ok := seen[p]; ok {
				continue
			}
			res = append(res, p)
			seen[p] = true
		}
	}
	return res
}

func (p *publisher) AllBlockTopicsPeers() []peer.ID {
510
	return combinePeers(p.BlocksTopicV1Peers(), p.BlocksTopicV2Peers(), p.BlocksTopicV3Peers())
Danyal Prout's avatar
Danyal Prout committed
511 512 513 514 515 516 517 518
}

func (p *publisher) BlocksTopicV1Peers() []peer.ID {
	return p.blocksV1.topic.ListPeers()
}

func (p *publisher) BlocksTopicV2Peers() []peer.ID {
	return p.blocksV2.topic.ListPeers()
519 520
}

521 522 523 524 525
func (p *publisher) BlocksTopicV3Peers() []peer.ID {
	return p.blocksV3.topic.ListPeers()
}

func (p *publisher) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, signer Signer) error {
526 527 528 529 530 531 532 533
	res := msgBufPool.Get().(*[]byte)
	buf := bytes.NewBuffer((*res)[:0])
	defer func() {
		*res = buf.Bytes()
		defer msgBufPool.Put(res)
	}()

	buf.Write(make([]byte, 65))
534 535 536 537 538 539 540 541 542

	if envelope.ParentBeaconBlockRoot != nil {
		if _, err := envelope.MarshalSSZ(buf); err != nil {
			return fmt.Errorf("failed to encoded execution payload envelope to publish: %w", err)
		}
	} else {
		if _, err := envelope.ExecutionPayload.MarshalSSZ(buf); err != nil {
			return fmt.Errorf("failed to encoded execution payload to publish: %w", err)
		}
543
	}
544

545 546 547 548
	data := buf.Bytes()
	payloadData := data[65:]
	sig, err := signer.Sign(ctx, SigningDomainBlocksV1, p.cfg.L2ChainID, payloadData)
	if err != nil {
549
		return fmt.Errorf("failed to sign execution payload with signer: %w", err)
550 551 552 553 554 555 556
	}
	copy(data[:65], sig[:])

	// compress the full message
	// This also copies the data, freeing up the original buffer to go back into the pool
	out := snappy.Encode(nil, data)

557 558 559
	if p.cfg.IsEcotone(uint64(envelope.ExecutionPayload.Timestamp)) {
		return p.blocksV3.topic.Publish(ctx, out)
	} else if p.cfg.IsCanyon(uint64(envelope.ExecutionPayload.Timestamp)) {
Danyal Prout's avatar
Danyal Prout committed
560 561 562 563
		return p.blocksV2.topic.Publish(ctx, out)
	} else {
		return p.blocksV1.topic.Publish(ctx, out)
	}
564 565 566
}

func (p *publisher) Close() error {
567
	p.p2pCancel()
Danyal Prout's avatar
Danyal Prout committed
568 569 570
	e1 := p.blocksV1.Close()
	e2 := p.blocksV2.Close()
	return errors.Join(e1, e2)
571 572
}

573
func JoinGossip(self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
Danyal Prout's avatar
Danyal Prout committed
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
	p2pCtx, p2pCancel := context.WithCancel(context.Background())

	v1Logger := log.New("topic", "blocksV1")
	blocksV1Validator := guardGossipValidator(log, logValidationResult(self, "validated blockv1", v1Logger, BuildBlocksValidator(v1Logger, cfg, runCfg, eth.BlockV1)))
	blocksV1, err := newBlockTopic(p2pCtx, blocksTopicV1(cfg), ps, v1Logger, gossipIn, blocksV1Validator)
	if err != nil {
		p2pCancel()
		return nil, fmt.Errorf("failed to setup blocks v1 p2p: %w", err)
	}

	v2Logger := log.New("topic", "blocksV2")
	blocksV2Validator := guardGossipValidator(log, logValidationResult(self, "validated blockv2", v2Logger, BuildBlocksValidator(v2Logger, cfg, runCfg, eth.BlockV2)))
	blocksV2, err := newBlockTopic(p2pCtx, blocksTopicV2(cfg), ps, v2Logger, gossipIn, blocksV2Validator)
	if err != nil {
		p2pCancel()
		return nil, fmt.Errorf("failed to setup blocks v2 p2p: %w", err)
	}

592 593 594 595 596 597 598 599
	v3Logger := log.New("topic", "blocksV3")
	blocksV3Validator := guardGossipValidator(log, logValidationResult(self, "validated blockv3", v3Logger, BuildBlocksValidator(v3Logger, cfg, runCfg, eth.BlockV3)))
	blocksV3, err := newBlockTopic(p2pCtx, blocksTopicV3(cfg), ps, v3Logger, gossipIn, blocksV3Validator)
	if err != nil {
		p2pCancel()
		return nil, fmt.Errorf("failed to setup blocks v3 p2p: %w", err)
	}

Danyal Prout's avatar
Danyal Prout committed
600 601 602 603 604 605
	return &publisher{
		log:       log,
		cfg:       cfg,
		p2pCancel: p2pCancel,
		blocksV1:  blocksV1,
		blocksV2:  blocksV2,
606
		blocksV3:  blocksV3,
Danyal Prout's avatar
Danyal Prout committed
607 608 609 610 611 612 613
		runCfg:    runCfg,
	}, nil
}

func newBlockTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log log.Logger, gossipIn GossipIn, validator pubsub.ValidatorEx) (*blockTopic, error) {
	err := ps.RegisterTopicValidator(topicId,
		validator,
614 615
		pubsub.WithValidatorTimeout(3*time.Second),
		pubsub.WithValidatorConcurrency(4))
Danyal Prout's avatar
Danyal Prout committed
616

617
	if err != nil {
Danyal Prout's avatar
Danyal Prout committed
618
		return nil, fmt.Errorf("failed to register gossip topic: %w", err)
619
	}
Danyal Prout's avatar
Danyal Prout committed
620 621

	blocksTopic, err := ps.Join(topicId)
622
	if err != nil {
Danyal Prout's avatar
Danyal Prout committed
623
		return nil, fmt.Errorf("failed to join gossip topic: %w", err)
624
	}
Danyal Prout's avatar
Danyal Prout committed
625

626 627
	blocksTopicEvents, err := blocksTopic.EventHandler()
	if err != nil {
628
		return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err)
629
	}
Danyal Prout's avatar
Danyal Prout committed
630 631

	go LogTopicEvents(ctx, log, blocksTopicEvents)
632 633 634

	subscription, err := blocksTopic.Subscribe()
	if err != nil {
635
		err = errors.Join(err, blocksTopic.Close())
636
		return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
637 638 639
	}

	subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload))
Danyal Prout's avatar
Danyal Prout committed
640
	go subscriber(ctx, subscription)
641

Danyal Prout's avatar
Danyal Prout committed
642 643 644 645
	return &blockTopic{
		topic:  blocksTopic,
		events: blocksTopicEvents,
		sub:    subscription,
646
	}, nil
647 648 649
}

type TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription)
650
type MessageHandler func(ctx context.Context, from peer.ID, msg any) error
651

652
func BlocksHandler(onBlock func(ctx context.Context, from peer.ID, msg *eth.ExecutionPayloadEnvelope) error) MessageHandler {
653
	return func(ctx context.Context, from peer.ID, msg any) error {
654
		payload, ok := msg.(*eth.ExecutionPayloadEnvelope)
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
		if !ok {
			return fmt.Errorf("expected topic validator to parse and validate data into execution payload, but got %T", msg)
		}
		return onBlock(ctx, from, payload)
	}
}

func MakeSubscriber(log log.Logger, msgHandler MessageHandler) TopicSubscriber {
	return func(ctx context.Context, sub *pubsub.Subscription) {
		topicLog := log.New("topic", sub.Topic())
		for {
			msg, err := sub.Next(ctx)
			if err != nil { // ctx was closed, or subscription was closed
				topicLog.Debug("stopped subscriber")
				return
			}
			if msg.ValidatorData == nil {
				topicLog.Error("gossip message with no data", "from", msg.ReceivedFrom)
				continue
			}
			if err := msgHandler(ctx, msg.ReceivedFrom, msg.ValidatorData); err != nil {
				topicLog.Error("failed to process gossip message", "err", err)
			}
		}
	}
}

func LogTopicEvents(ctx context.Context, log log.Logger, evHandler *pubsub.TopicEventHandler) {
	for {
		ev, err := evHandler.NextPeerEvent(ctx)
		if err != nil {
			return // ctx closed
		}
		switch ev.Type {
		case pubsub.PeerJoin:
			log.Debug("peer joined topic", "peer", ev.Peer)
		case pubsub.PeerLeave:
			log.Debug("peer left topic", "peer", ev.Peer)
		default:
			log.Warn("unrecognized topic event", "ev", ev)
		}
	}
}
698 699

type gossipTracer struct {
700
	m GossipMetricer
701 702 703
}

func (g *gossipTracer) Trace(evt *pb.TraceEvent) {
704 705
	if g.m != nil {
		g.m.RecordGossipEvent(int32(*evt.Type))
Matthew Slipper's avatar
Matthew Slipper committed
706
	}
707
}