sync_client.go 5.38 KB
Newer Older
clabby's avatar
clabby committed
1 2 3
package sources

import (
clabby's avatar
clabby committed
4 5
	"context"
	"errors"
6 7
	"fmt"
	"io"
clabby's avatar
clabby committed
8
	"sync"
9
	"time"
clabby's avatar
clabby committed
10

clabby's avatar
clabby committed
11
	"github.com/ethereum-optimism/optimism/op-node/client"
clabby's avatar
clabby committed
12
	"github.com/ethereum-optimism/optimism/op-node/eth"
clabby's avatar
clabby committed
13 14
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum-optimism/optimism/op-node/sources/caching"
15 16
	"github.com/ethereum-optimism/optimism/op-service/backoff"

clabby's avatar
clabby committed
17
	"github.com/ethereum/go-ethereum/log"
clabby's avatar
clabby committed
18
	"github.com/libp2p/go-libp2p/core/peer"
clabby's avatar
clabby committed
19 20
)

clabby's avatar
clabby committed
21 22 23 24 25
var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not be nil")

// RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"

26 27
// receivePayload queues the received payload for processing.
// This may return an error if there's no capacity for the payload.
clabby's avatar
clabby committed
28 29
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error

30 31 32
type RPCSync interface {
	io.Closer
	// Start starts an additional worker syncing job
clabby's avatar
clabby committed
33
	Start() error
34 35
	// RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface.
	RequestL2Range(ctx context.Context, start, end uint64) error
clabby's avatar
clabby committed
36 37
}

clabby's avatar
clabby committed
38 39 40
type SyncClient struct {
	*L2Client

41
	requests chan uint64
42 43 44 45 46 47 48

	resCtx    context.Context
	resCancel context.CancelFunc

	receivePayload receivePayload
	wg             sync.WaitGroup
}
clabby's avatar
clabby committed
49

clabby's avatar
clabby committed
50 51 52 53 54 55 56 57 58 59
type SyncClientConfig struct {
	L2ClientConfig
}

func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientConfig {
	return &SyncClientConfig{
		*L2ClientDefaultConfig(config, trustRPC),
	}
}

clabby's avatar
clabby committed
60
func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) {
clabby's avatar
clabby committed
61 62 63 64
	l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig)
	if err != nil {
		return nil, err
	}
65 66
	// This resource context is shared between all workers that may be started
	resCtx, resCancel := context.WithCancel(context.Background())
clabby's avatar
clabby committed
67
	return &SyncClient{
68 69 70
		L2Client:       l2Client,
		resCtx:         resCtx,
		resCancel:      resCancel,
71
		requests:       make(chan uint64, 128),
72
		receivePayload: receiver,
clabby's avatar
clabby committed
73 74
	}, nil
}
clabby's avatar
clabby committed
75

76
// Start starts the syncing background work. This may not be called after Close().
clabby's avatar
clabby committed
77
func (s *SyncClient) Start() error {
78
	// TODO(CLI-3635): we can start multiple event loop runners as workers, to parallelize the work
clabby's avatar
clabby committed
79 80 81 82 83
	s.wg.Add(1)
	go s.eventLoop()
	return nil
}

84
// Close sends a signal to close all concurrent syncing work.
clabby's avatar
clabby committed
85
func (s *SyncClient) Close() error {
86
	s.resCancel()
clabby's avatar
clabby committed
87 88 89 90
	s.wg.Wait()
	return nil
}

91 92 93
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error {
	// Drain previous requests now that we have new information
	for len(s.requests) > 0 {
94 95 96 97 98
		select { // in case requests is being read at the same time, don't block on draining it.
		case <-s.requests:
		default:
			break
		}
99 100 101 102 103 104
	}

	// TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.

	s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start)

105
	for i := start; i < end; i++ {
106
		select {
107
		case s.requests <- i:
108 109 110 111 112 113 114
		case <-ctx.Done():
			return ctx.Err()
		}
	}
	return nil
}

clabby's avatar
clabby committed
115 116 117
// eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() {
	defer s.wg.Done()
clabby's avatar
clabby committed
118
	s.log.Info("Starting sync client event loop")
clabby's avatar
clabby committed
119

120 121 122 123 124 125
	backoffStrategy := &backoff.ExponentialStrategy{
		Min:       1000,
		Max:       20_000,
		MaxJitter: 250,
	}

clabby's avatar
clabby committed
126 127
	for {
		select {
128 129
		case <-s.resCtx.Done():
			s.log.Debug("Shutting down RPC sync worker")
clabby's avatar
clabby committed
130
			return
131 132 133 134 135 136 137 138 139
		case reqNum := <-s.requests:
			err := backoff.DoCtx(s.resCtx, 5, backoffStrategy, func() error {
				// Limit the maximum time for fetching payloads
				ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
				defer cancel()
				// We are only fetching one block at a time here.
				return s.fetchUnsafeBlockFromRpc(ctx, reqNum)
			})
			if err != nil {
140 141 142
				if err == s.resCtx.Err() {
					return
				}
143 144
				s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum)
				// Reschedule at end of queue
145
				select {
146
				case s.requests <- reqNum:
147 148 149 150
				default:
					// drop syncing job if we are too busy with sync jobs already.
				}
			}
clabby's avatar
clabby committed
151 152 153 154 155 156
		}
	}
}

// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
clabby's avatar
clabby committed
157 158 159
//
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
160
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) error {
clabby's avatar
clabby committed
161
	s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
clabby's avatar
clabby committed
162 163 164

	payload, err := s.PayloadByNumber(ctx, blockNumber)
	if err != nil {
165
		return fmt.Errorf("failed to fetch payload by number (%d): %w", blockNumber, err)
clabby's avatar
clabby committed
166
	}
167
	// Note: the underlying RPC client used for syncing verifies the execution payload blockhash, if set to untrusted.
clabby's avatar
clabby committed
168

clabby's avatar
clabby committed
169
	s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
clabby's avatar
clabby committed
170 171

	// Send the retrieved payload to the `unsafeL2Payloads` channel.
clabby's avatar
clabby committed
172
	if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
173
		return fmt.Errorf("failed to send payload %s into the driver's unsafeL2Payloads channel: %w", payload.ID(), err)
clabby's avatar
clabby committed
174
	} else {
175 176
		s.log.Debug("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
		return nil
clabby's avatar
clabby committed
177
	}
clabby's avatar
clabby committed
178
}