sync_client.go 5.84 KB
Newer Older
clabby's avatar
clabby committed
1 2 3
package sources

import (
clabby's avatar
clabby committed
4 5
	"context"
	"errors"
6 7
	"fmt"
	"io"
clabby's avatar
clabby committed
8
	"sync"
9
	"time"
clabby's avatar
clabby committed
10

clabby's avatar
clabby committed
11 12 13
	"github.com/ethereum-optimism/optimism/op-node/client"
	"github.com/ethereum-optimism/optimism/op-node/rollup"
	"github.com/ethereum-optimism/optimism/op-node/sources/caching"
14
	"github.com/ethereum-optimism/optimism/op-service/eth"
15
	"github.com/ethereum-optimism/optimism/op-service/retry"
16

clabby's avatar
clabby committed
17
	"github.com/ethereum/go-ethereum/log"
clabby's avatar
clabby committed
18
	"github.com/libp2p/go-libp2p/core/peer"
clabby's avatar
clabby committed
19 20
)

clabby's avatar
clabby committed
21 22 23 24 25
var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not be nil")

// RpcSyncPeer is a mock PeerID for the RPC sync client.
var RpcSyncPeer peer.ID = "ALT_RPC_SYNC"

26 27
// receivePayload queues the received payload for processing.
// This may return an error if there's no capacity for the payload.
clabby's avatar
clabby committed
28 29
type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error

30 31 32
type RPCSync interface {
	io.Closer
	// Start starts an additional worker syncing job
clabby's avatar
clabby committed
33
	Start() error
34
	// RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface.
35
	RequestL2Range(ctx context.Context, start uint64, end eth.L2BlockRef) error
clabby's avatar
clabby committed
36 37
}

38
// SyncClient implements the driver AltSync interface, including support for fetching an open-ended chain of L2 blocks.
clabby's avatar
clabby committed
39 40 41
type SyncClient struct {
	*L2Client

42
	requests chan uint64
43 44 45 46 47 48 49

	resCtx    context.Context
	resCancel context.CancelFunc

	receivePayload receivePayload
	wg             sync.WaitGroup
}
clabby's avatar
clabby committed
50

clabby's avatar
clabby committed
51 52 53 54 55 56 57 58 59 60
type SyncClientConfig struct {
	L2ClientConfig
}

func SyncClientDefaultConfig(config *rollup.Config, trustRPC bool) *SyncClientConfig {
	return &SyncClientConfig{
		*L2ClientDefaultConfig(config, trustRPC),
	}
}

clabby's avatar
clabby committed
61
func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, metrics caching.Metrics, config *SyncClientConfig) (*SyncClient, error) {
clabby's avatar
clabby committed
62 63 64 65
	l2Client, err := NewL2Client(client, log, metrics, &config.L2ClientConfig)
	if err != nil {
		return nil, err
	}
66 67
	// This resource context is shared between all workers that may be started
	resCtx, resCancel := context.WithCancel(context.Background())
clabby's avatar
clabby committed
68
	return &SyncClient{
69 70 71
		L2Client:       l2Client,
		resCtx:         resCtx,
		resCancel:      resCancel,
72
		requests:       make(chan uint64, 128),
73
		receivePayload: receiver,
clabby's avatar
clabby committed
74 75
	}, nil
}
clabby's avatar
clabby committed
76

77
// Start starts the syncing background work. This may not be called after Close().
clabby's avatar
clabby committed
78
func (s *SyncClient) Start() error {
79
	// TODO(CLI-3635): we can start multiple event loop runners as workers, to parallelize the work
clabby's avatar
clabby committed
80 81 82 83 84
	s.wg.Add(1)
	go s.eventLoop()
	return nil
}

85
// Close sends a signal to close all concurrent syncing work.
clabby's avatar
clabby committed
86
func (s *SyncClient) Close() error {
87
	s.resCancel()
clabby's avatar
clabby committed
88 89 90 91
	s.wg.Wait()
	return nil
}

92
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
93 94
	// Drain previous requests now that we have new information
	for len(s.requests) > 0 {
95 96 97 98 99
		select { // in case requests is being read at the same time, don't block on draining it.
		case <-s.requests:
		default:
			break
		}
100 101
	}

102 103 104 105 106 107 108 109 110 111 112 113
	endNum := end.Number
	if end == (eth.L2BlockRef{}) {
		n, err := s.rollupCfg.TargetBlockNumber(uint64(time.Now().Unix()))
		if err != nil {
			return err
		}
		if n <= start.Number {
			return nil
		}
		endNum = n
	}

114 115
	// TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method.

116
	s.log.Info("Scheduling to fetch trailing missing payloads from backup RPC", "start", start, "end", endNum, "size", endNum-start.Number-1)
117

118
	for i := start.Number + 1; i < endNum; i++ {
119
		select {
120
		case s.requests <- i:
121 122 123 124 125 126 127
		case <-ctx.Done():
			return ctx.Err()
		}
	}
	return nil
}

clabby's avatar
clabby committed
128 129 130
// eventLoop is the main event loop for the sync client.
func (s *SyncClient) eventLoop() {
	defer s.wg.Done()
clabby's avatar
clabby committed
131
	s.log.Info("Starting sync client event loop")
clabby's avatar
clabby committed
132

133
	backoffStrategy := &retry.ExponentialStrategy{
134 135 136
		Min:       1000 * time.Millisecond,
		Max:       20_000 * time.Millisecond,
		MaxJitter: 250 * time.Millisecond,
137 138
	}

clabby's avatar
clabby committed
139 140
	for {
		select {
141 142
		case <-s.resCtx.Done():
			s.log.Debug("Shutting down RPC sync worker")
clabby's avatar
clabby committed
143
			return
144
		case reqNum := <-s.requests:
145
			_, err := retry.Do(s.resCtx, 5, backoffStrategy, func() (interface{}, error) {
146 147 148 149
				// Limit the maximum time for fetching payloads
				ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
				defer cancel()
				// We are only fetching one block at a time here.
150
				return nil, s.fetchUnsafeBlockFromRpc(ctx, reqNum)
151 152
			})
			if err != nil {
153 154 155
				if err == s.resCtx.Err() {
					return
				}
156 157
				s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum)
				// Reschedule at end of queue
158
				select {
159
				case s.requests <- reqNum:
160 161 162 163
				default:
					// drop syncing job if we are too busy with sync jobs already.
				}
			}
clabby's avatar
clabby committed
164 165 166 167 168 169
		}
	}
}

// fetchUnsafeBlockFromRpc attempts to fetch an unsafe execution payload from the backup unsafe sync RPC.
// WARNING: This function fails silently (aside from warning logs).
clabby's avatar
clabby committed
170 171 172
//
// Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now,
// the `eth_getBlockByNumber` method is more widely available.
173
func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) error {
clabby's avatar
clabby committed
174
	s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber)
clabby's avatar
clabby committed
175 176 177

	payload, err := s.PayloadByNumber(ctx, blockNumber)
	if err != nil {
178
		return fmt.Errorf("failed to fetch payload by number (%d): %w", blockNumber, err)
clabby's avatar
clabby committed
179
	}
180
	// Note: the underlying RPC client used for syncing verifies the execution payload blockhash, if set to untrusted.
clabby's avatar
clabby committed
181

clabby's avatar
clabby committed
182
	s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID())
clabby's avatar
clabby committed
183 184

	// Send the retrieved payload to the `unsafeL2Payloads` channel.
clabby's avatar
clabby committed
185
	if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil {
186
		return fmt.Errorf("failed to send payload %s into the driver's unsafeL2Payloads channel: %w", payload.ID(), err)
clabby's avatar
clabby committed
187
	} else {
188 189
		s.log.Debug("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID())
		return nil
clabby's avatar
clabby committed
190
	}
clabby's avatar
clabby committed
191
}