sync_client.go 3.63 KB
Newer Older
clabby's avatar
clabby committed
1 2 3
package sources

import (
clabby's avatar
clabby committed
4 5 6 7
	"context"
	"errors"
	"sync"

clabby's avatar
clabby committed
8
	"github.com/ethereum-optimism/optimism/op-node/client"
clabby's avatar
clabby committed
9
	"github.com/ethereum-optimism/optimism/op-node/eth"
clabby's avatar
clabby committed
10 11 12
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum-optimism/optimism/op-node/sources/caching"
	"github.com/ethereum/go-ethereum/log"
clabby's avatar
clabby committed
13
	"github.com/libp2p/go-libp2p/core/peer"
clabby's avatar
clabby committed
14 15
)

clabby's avatar
clabby committed
16 17 18 19 20 21 22
var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not be nil")

// RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"

type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error

clabby's avatar
clabby committed
23
type SyncClientInterface interface {
clabby's avatar
clabby committed
24
	Start() error
clabby's avatar
clabby committed
25 26 27 28
	Close() error
	fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64)
}

clabby's avatar
clabby committed
29 30
type SyncClient struct {
	*L2Client
clabby's avatar
clabby committed
31 32
	FetchUnsafeBlock chan uint64
	done             chan struct{}
clabby's avatar
clabby committed
33
	receivePayload   receivePayload
clabby's avatar
clabby committed
34
	wg               sync.WaitGroup
clabby's avatar
clabby committed
35 36
}

clabby's avatar
clabby committed
37 38
var _ SyncClientInterface = (*SyncClient)(nil)

clabby's avatar
clabby committed
39 40 41 42 43 44 45 46 47 48
type SyncClientConfig struct {
	L2ClientConfig
}

func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientConfig {
	return &SyncClientConfig{
		*L2ClientDefaultConfig(config, trustRPC),
	}
}

clabby's avatar
clabby committed
49
func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) {
clabby's avatar
clabby committed
50 51 52 53 54 55
	l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig)
	if err != nil {
		return nil, err
	}

	return &SyncClient{
clabby's avatar
clabby committed
56
		L2Client:         l2Client,
clabby's avatar
clabby committed
57
		FetchUnsafeBlock: make(chan uint64, 128),
clabby's avatar
clabby committed
58
		done:             make(chan struct{}),
clabby's avatar
clabby committed
59
		receivePayload:   receiver,
clabby's avatar
clabby committed
60 61
	}, nil
}
clabby's avatar
clabby committed
62 63 64

// Start starts up the state loop.
// The loop will have been started if err is not nil.
clabby's avatar
clabby committed
65
func (s *SyncClient) Start() error {
clabby's avatar
clabby committed
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
	s.wg.Add(1)
	go s.eventLoop()
	return nil
}

// Close sends a signal to the event loop to stop.
func (s *SyncClient) Close() error {
	s.done <- struct{}{}
	s.wg.Wait()
	return nil
}

// eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() {
	defer s.wg.Done()
clabby's avatar
clabby committed
81
	s.log.Info("Starting sync client event loop")
clabby's avatar
clabby committed
82 83 84 85 86 87 88 89 90 91 92 93 94

	for {
		select {
		case <-s.done:
			return
		case blockNumber := <-s.FetchUnsafeBlock:
			s.fetchUnsafeBlockFromRpc(context.Background(), blockNumber)
		}
	}
}

// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
clabby's avatar
clabby committed
95 96 97
//
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
clabby's avatar
clabby committed
98
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) {
clabby's avatar
clabby committed
99
	s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
clabby's avatar
clabby committed
100 101 102

	payload, err := s.PayloadByNumber(ctx, blockNumber)
	if err != nil {
clabby's avatar
clabby committed
103
		s.log.Warn("Failed to convert block to execution payload", "block number", blockNumber, "err", err)
clabby's avatar
clabby committed
104 105 106 107 108
		return
	}

	// Signature validation is not necessary here since the backup RPC is trusted.
	if _, ok := payload.CheckBlockHash(); !ok {
clabby's avatar
clabby committed
109
		s.log.Warn("Received invalid payload from backup RPC; invalid block hash", "payload", payload.ID())
clabby's avatar
clabby committed
110 111 112
		return
	}

clabby's avatar
clabby committed
113
	s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
clabby's avatar
clabby committed
114 115

	// Send the retrieved payload to the `unsafeL2Payloads` channel.
clabby's avatar
clabby committed
116 117 118 119 120 121
	if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
		s.log.Warn("Failed to send payload into the driver's unsafeL2Payloads channel", "payload", payload.ID(), "err", err)
		return
	} else {
		s.log.Info("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
	}
clabby's avatar
clabby committed
122
}