asyncgossiper.go 5.33 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
package async

import (
	"context"
	"sync/atomic"

	"github.com/ethereum/go-ethereum/log"

	"github.com/ethereum-optimism/optimism/op-service/eth"
)

type AsyncGossiper interface {
	Gossip(payload *eth.ExecutionPayloadEnvelope)
	Get() *eth.ExecutionPayloadEnvelope
	Clear()
	Stop()
	Start()
}

// SimpleAsyncGossiper is a component that stores and gossips a single payload at a time
// it uses a separate goroutine to handle gossiping the payload asynchronously
// the payload can be accessed by the Get function to be reused when the payload was gossiped but not inserted
// exposed functions are synchronous, and block until the async routine is able to start handling the request
type SimpleAsyncGossiper struct {
	running atomic.Bool
	// channel to add new payloads to gossip
	set chan *eth.ExecutionPayloadEnvelope
	// channel to request getting the currently gossiping payload
	get chan chan *eth.ExecutionPayloadEnvelope
	// channel to request clearing the currently gossiping payload
	clear chan struct{}
	// channel to request stopping the handling loop
	stop chan struct{}

	currentPayload *eth.ExecutionPayloadEnvelope
	ctx            context.Context
	net            Network
	log            log.Logger
	metrics        Metrics
}

// To avoid import cycles, we define a new Network interface here
// this interface is compatable with driver.Network
type Network interface {
	PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
}

// To avoid import cycles, we define a new Metrics interface here
// this interface is compatable with driver.Metrics
type Metrics interface {
	RecordPublishingError()
}

func NewAsyncGossiper(ctx context.Context, net Network, log log.Logger, metrics Metrics) *SimpleAsyncGossiper {
	return &SimpleAsyncGossiper{
		running: atomic.Bool{},
		set:     make(chan *eth.ExecutionPayloadEnvelope),
		get:     make(chan chan *eth.ExecutionPayloadEnvelope),
		clear:   make(chan struct{}),
		stop:    make(chan struct{}),

		currentPayload: nil,
		net:            net,
		ctx:            ctx,
		log:            log,
		metrics:        metrics,
	}
}

// Gossip is a synchronous function to store and gossip a payload
// it blocks until the payload can be taken by the async routine
func (p *SimpleAsyncGossiper) Gossip(payload *eth.ExecutionPayloadEnvelope) {
	p.set <- payload
}

// Get is a synchronous function to get the currently held payload
// it blocks until the async routine is able to return the payload
func (p *SimpleAsyncGossiper) Get() *eth.ExecutionPayloadEnvelope {
	c := make(chan *eth.ExecutionPayloadEnvelope)
	p.get <- c
	return <-c
}

// Clear is a synchronous function to clear the currently gossiping payload
// it blocks until the signal to clear is picked up by the async routine
func (p *SimpleAsyncGossiper) Clear() {
	p.clear <- struct{}{}
}

// Stop is a synchronous function to stop the async routine
// it blocks until the async routine accepts the signal
func (p *SimpleAsyncGossiper) Stop() {
	p.stop <- struct{}{}
}

// Start starts the AsyncGossiper's gossiping loop on a separate goroutine
// each behavior of the loop is handled by a select case on a channel, plus an internal handler function call
func (p *SimpleAsyncGossiper) Start() {
	// if the gossiping is already running, return
	if p.running.Load() {
		return
	}
	p.running.Store(true)
	// else, start the handling loop
	go func() {
		defer p.running.Store(false)
		for {
			select {
			// new payloads to be gossiped are found in the `set` channel
			case payload := <-p.set:
				p.gossip(p.ctx, payload)
			// requests to get the current payload are found in the `get` channel
			case c := <-p.get:
				p.getPayload(c)
			// requests to clear the current payload are found in the `clear` channel
			case <-p.clear:
				p.clearPayload()
			// if the context is done, return
			case <-p.stop:
				return
			}
		}
	}()
}

// gossip is the internal handler function for gossiping the current payload
// and storing the payload in the async AsyncGossiper's state
// it is called by the Start loop when a new payload is set
// the payload is only stored if the publish is successful
func (p *SimpleAsyncGossiper) gossip(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) {
	if err := p.net.PublishL2Payload(ctx, payload); err == nil {
		p.currentPayload = payload
	} else {
		p.log.Warn("failed to publish newly created block",
			"id", payload.ExecutionPayload.ID(),
			"hash", payload.ExecutionPayload.BlockHash,
			"err", err)
		p.metrics.RecordPublishingError()
	}
}

// getPayload is the internal handler function for getting the current payload
// c is the channel the caller expects to receive the payload on
func (p *SimpleAsyncGossiper) getPayload(c chan *eth.ExecutionPayloadEnvelope) {
	c <- p.currentPayload
}

// clearPayload is the internal handler function for clearing the current payload
func (p *SimpleAsyncGossiper) clearPayload() {
	p.currentPayload = nil
}

// NoOpGossiper is a no-op implementation of AsyncGossiper
// it serves as a placeholder for when the AsyncGossiper is not needed
type NoOpGossiper struct{}

func (NoOpGossiper) Gossip(payload *eth.ExecutionPayloadEnvelope) {}
func (NoOpGossiper) Get() *eth.ExecutionPayloadEnvelope           { return nil }
func (NoOpGossiper) Clear()                                       {}
func (NoOpGossiper) Stop()                                        {}
func (NoOpGossiper) Start()                                       {}