Commit 2c3801cd authored by protolambda's avatar protolambda

op-node: cancel p2p gossip blocks event handler and subscription before closing topic

parent ab8a7abe
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
...@@ -379,10 +380,22 @@ type GossipOut interface { ...@@ -379,10 +380,22 @@ type GossipOut interface {
} }
type publisher struct { type publisher struct {
log log.Logger log log.Logger
cfg *rollup.Config cfg *rollup.Config
// p2pCtx is used for downstream message-handling resources,
// these can be cancelled without blocking.
p2pCtx context.Context
p2pCancel context.CancelFunc
// blocks topic, main handle on block gossip
blocksTopic *pubsub.Topic blocksTopic *pubsub.Topic
runCfg GossipRuntimeConfig // block events handler, to be cancelled before closing the blocks topic.
blocksEvents *pubsub.TopicEventHandler
// block subscriptions, to be cancelled before closing blocks topic.
blocksSub *pubsub.Subscription
runCfg GossipRuntimeConfig
} }
var _ GossipOut = (*publisher)(nil) var _ GossipOut = (*publisher)(nil)
...@@ -419,10 +432,13 @@ func (p *publisher) PublishL2Payload(ctx context.Context, payload *eth.Execution ...@@ -419,10 +432,13 @@ func (p *publisher) PublishL2Payload(ctx context.Context, payload *eth.Execution
} }
func (p *publisher) Close() error { func (p *publisher) Close() error {
p.p2pCancel()
p.blocksEvents.Cancel()
p.blocksSub.Cancel()
return p.blocksTopic.Close() return p.blocksTopic.Close()
} }
func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) { func JoinGossip(self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg))) val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
blocksTopicName := blocksTopicV1(cfg) blocksTopicName := blocksTopicV1(cfg)
err := ps.RegisterTopicValidator(blocksTopicName, err := ps.RegisterTopicValidator(blocksTopicName,
...@@ -440,17 +456,29 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log ...@@ -440,17 +456,29 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err) return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err)
} }
p2pCtx, p2pCancel := context.WithCancel(context.Background())
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents) go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
subscription, err := blocksTopic.Subscribe() subscription, err := blocksTopic.Subscribe()
if err != nil { if err != nil {
p2pCancel()
err = errors.Join(err, blocksTopic.Close())
return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err) return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
} }
subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload)) subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload))
go subscriber(p2pCtx, subscription) go subscriber(p2pCtx, subscription)
return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil return &publisher{
log: log,
cfg: cfg,
blocksTopic: blocksTopic,
blocksEvents: blocksTopicEvents,
blocksSub: subscription,
p2pCtx: p2pCtx,
p2pCancel: p2pCancel,
runCfg: runCfg,
}, nil
} }
type TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription) type TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription)
......
...@@ -135,7 +135,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -135,7 +135,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if err != nil { if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err) return fmt.Errorf("failed to start gossipsub router: %w", err)
} }
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn) n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil { if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err) return fmt.Errorf("failed to join blocks gossip topic: %w", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment