node.go 14.5 KB
Newer Older
1 2 3 4
package node

import (
	"context"
5
	"errors"
6 7 8
	"fmt"
	"time"

9
	"github.com/hashicorp/go-multierror"
10
	"github.com/libp2p/go-libp2p/core/peer"
11

12 13 14 15
	"github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/event"
	"github.com/ethereum/go-ethereum/log"

16 17 18
	"github.com/ethereum-optimism/optimism/op-node/client"
	"github.com/ethereum-optimism/optimism/op-node/metrics"
	"github.com/ethereum-optimism/optimism/op-node/p2p"
19
	"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
20
	"github.com/ethereum-optimism/optimism/op-node/sources"
21
	"github.com/ethereum-optimism/optimism/op-service/eth"
22 23 24 25 26
)

type OpNode struct {
	log        log.Logger
	appVersion string
27
	metrics    *metrics.Metrics
28 29 30 31 32 33 34 35

	l1HeadsSub     ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
	l1SafeSub      ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
	l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)

	l1Source  *sources.L1Client     // L1 Client to fetch data from
	l2Driver  *driver.Driver        // L2 Engine to Sync
	l2Source  *sources.EngineClient // L2 Execution Engine RPC bindings
36
	rpcSync   *sources.SyncClient   // Alt-sync RPC client, optional (may be nil)
37 38 39 40
	server    *rpcServer            // RPC server hosting the rollup-node API
	p2pNode   *p2p.NodeP2P          // P2P node functionality
	p2pSigner p2p.Signer            // p2p gogssip application messages will be signed with this signer
	tracer    Tracer                // tracer to get events for testing/debugging
41
	runCfg    *RuntimeConfig        // runtime configurables
42 43 44 45 46 47 48 49 50 51

	// some resources cannot be stopped directly, like the p2p gossipsub router (not our design),
	// and depend on this ctx to be closed.
	resourcesCtx   context.Context
	resourcesClose context.CancelFunc
}

// The OpNode handles incoming gossip
var _ p2p.GossipIn = (*OpNode)(nil)

52
func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logger, appVersion string, m *metrics.Metrics) (*OpNode, error) {
53 54 55 56 57 58 59
	if err := cfg.Check(); err != nil {
		return nil, err
	}

	n := &OpNode{
		log:        log,
		appVersion: appVersion,
60
		metrics:    m,
61 62 63 64 65 66
	}
	// not a context leak, gossipsub is closed with a context.
	n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())

	err := n.init(ctx, cfg, snapshotLog)
	if err != nil {
67
		log.Error("Error initializing the rollup node", "err", err)
68 69 70 71 72 73 74 75 76 77 78
		// ensure we always close the node resources if we fail to initialize the node.
		if closeErr := n.Close(); closeErr != nil {
			return nil, multierror.Append(err, closeErr)
		}
		return nil, err
	}
	return n, nil
}

func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
	if err := n.initTracer(ctx, cfg); err != nil {
79
		return fmt.Errorf("failed to init the trace: %w", err)
80 81
	}
	if err := n.initL1(ctx, cfg); err != nil {
82
		return fmt.Errorf("failed to init L1: %w", err)
83
	}
84
	if err := n.initRuntimeConfig(ctx, cfg); err != nil {
85
		return fmt.Errorf("failed to init the runtime config: %w", err)
86
	}
87
	if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
88
		return fmt.Errorf("failed to init L2: %w", err)
89
	}
90
	if err := n.initRPCSync(ctx, cfg); err != nil {
91
		return fmt.Errorf("failed to init RPC sync: %w", err)
92
	}
93
	if err := n.initP2PSigner(ctx, cfg); err != nil {
94
		return fmt.Errorf("failed to init the P2P signer: %w", err)
95 96
	}
	if err := n.initP2P(ctx, cfg); err != nil {
97
		return fmt.Errorf("failed to init the P2P stack: %w", err)
98 99 100
	}
	// Only expose the server at the end, ensuring all RPC backend components are initialized.
	if err := n.initRPCServer(ctx, cfg); err != nil {
101
		return fmt.Errorf("failed to init the RPC server: %w", err)
102
	}
103
	if err := n.initMetricsServer(ctx, cfg); err != nil {
104
		return fmt.Errorf("failed to init the metrics server: %w", err)
105
	}
106 107 108 109 110 111 112 113 114 115 116 117 118
	return nil
}

func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error {
	if cfg.Tracer != nil {
		n.tracer = cfg.Tracer
	} else {
		n.tracer = new(noOpTracer)
	}
	return nil
}

func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
119
	l1Node, rpcCfg, err := cfg.L1.Setup(ctx, n.log, &cfg.Rollup)
120
	if err != nil {
121
		return fmt.Errorf("failed to get L1 RPC client: %w", err)
122 123
	}

124
	n.l1Source, err = sources.NewL1Client(
125
		client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, rpcCfg)
126
	if err != nil {
127
		return fmt.Errorf("failed to create L1 source: %w", err)
128 129
	}

Andreas Bigger's avatar
Andreas Bigger committed
130
	if err := cfg.Rollup.ValidateL1Config(ctx, n.l1Source); err != nil {
131
		return fmt.Errorf("failed to validate the L1 config: %w", err)
132 133
	}

134 135 136 137 138 139 140 141 142 143 144 145 146 147
	// Keep subscribed to the L1 heads, which keeps the L1 maintainer pointing to the best headers to sync
	n.l1HeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
		if err != nil {
			n.log.Warn("resubscribing after failed L1 subscription", "err", err)
		}
		return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head)
	})
	go func() {
		err, ok := <-n.l1HeadsSub.Err()
		if !ok {
			return
		}
		n.log.Error("l1 heads subscription error", "err", err)
	}()
148 149 150 151 152 153 154

	// Poll for the safe L1 block and finalized block,
	// which only change once per epoch at most and may be delayed.
	n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Safe, eth.Safe,
		cfg.L1EpochPollInterval, time.Second*10)
	n.l1FinalizedSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized,
		cfg.L1EpochPollInterval, time.Second*10)
155 156 157
	return nil
}

158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
	// attempt to load runtime config, repeat N times
	n.runCfg = NewRuntimeConfig(n.log, n.l1Source, &cfg.Rollup)

	for i := 0; i < 5; i++ {
		fetchCtx, fetchCancel := context.WithTimeout(ctx, time.Second*10)
		l1Head, err := n.l1Source.L1BlockRefByLabel(fetchCtx, eth.Unsafe)
		fetchCancel()
		if err != nil {
			n.log.Error("failed to fetch L1 head for runtime config initialization", "err", err)
			continue
		}

		fetchCtx, fetchCancel = context.WithTimeout(ctx, time.Second*10)
		err = n.runCfg.Load(fetchCtx, l1Head)
		fetchCancel()
		if err != nil {
			n.log.Error("failed to fetch runtime config data", "err", err)
			continue
		}

		return nil
	}

	return errors.New("failed to load runtime configuration repeatedly")
}

185
func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
186
	rpcClient, rpcCfg, err := cfg.L2.Setup(ctx, n.log, &cfg.Rollup)
187 188 189
	if err != nil {
		return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err)
	}
190 191

	n.l2Source, err = sources.NewEngineClient(
192
		client.NewInstrumentedRPC(rpcClient, n.metrics), n.log, n.metrics.L2SourceCache, rpcCfg,
193
	)
194
	if err != nil {
195
		return fmt.Errorf("failed to create Engine client: %w", err)
196 197
	}

Andreas Bigger's avatar
Andreas Bigger committed
198
	if err := cfg.Rollup.ValidateL2Config(ctx, n.l2Source); err != nil {
199
		return err
200 201
	}

202
	n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence, &cfg.Sync)
clabby's avatar
clabby committed
203

204 205
	return nil
}
clabby's avatar
clabby committed
206

207
func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error {
208
	rpcSyncClient, rpcCfg, err := cfg.L2Sync.Setup(ctx, n.log, &cfg.Rollup)
209 210 211 212 213
	if err != nil {
		return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
	}
	if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client
		return nil
clabby's avatar
clabby committed
214
	}
215
	syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, rpcCfg)
216 217 218 219
	if err != nil {
		return fmt.Errorf("failed to create sync client: %w", err)
	}
	n.rpcSync = syncClient
220 221 222 223
	return nil
}

func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
224
	server, err := newRPCServer(ctx, &cfg.RPC, &cfg.Rollup, n.l2Source.L2Client, n.l2Driver, n.log, n.appVersion, n.metrics)
225 226 227 228
	if err != nil {
		return err
	}
	if n.p2pNode != nil {
229
		server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
230
	}
231
	if cfg.RPC.EnableAdmin {
232
		server.EnableAdminAPI(NewAdminAPI(n.l2Driver, n.metrics))
Michael de Hoog's avatar
Michael de Hoog committed
233
		n.log.Info("Admin RPC enabled")
234
	}
235
	n.log.Info("Starting JSON-RPC server")
236
	if err := server.Start(); err != nil {
237 238
		return fmt.Errorf("unable to start RPC server: %w", err)
	}
239
	n.server = server
240 241 242
	return nil
}

243 244 245 246 247 248 249 250 251 252 253 254 255 256
func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
	if !cfg.Metrics.Enabled {
		n.log.Info("metrics disabled")
		return nil
	}
	n.log.Info("starting metrics server", "addr", cfg.Metrics.ListenAddr, "port", cfg.Metrics.ListenPort)
	go func() {
		if err := n.metrics.Serve(ctx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
			log.Crit("error starting metrics server", "err", err)
		}
	}()
	return nil
}

257 258
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
	if cfg.P2P != nil {
259
		p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics)
260
		if err != nil || p2pNode == nil {
261 262 263
			return err
		}
		n.p2pNode = p2pNode
264 265 266
		if n.p2pNode.Dv5Udp() != nil {
			go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
		}
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
	}
	return nil
}

func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
	// the p2p signer setup is optional
	if cfg.P2PSigner == nil {
		return nil
	}
	// p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional
	var err error
	n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx)
	return err
}

func (n *OpNode) Start(ctx context.Context) error {
283
	n.log.Info("Starting execution engine driver")
clabby's avatar
clabby committed
284

285
	// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
clabby's avatar
clabby committed
286
	if err := n.l2Driver.Start(); err != nil {
287 288
		n.log.Error("Could not start a rollup node", "err", err)
		return err
289 290
	}

clabby's avatar
clabby committed
291
	// If the backup unsafe sync client is enabled, start its event loop
292 293
	if n.rpcSync != nil {
		if err := n.rpcSync.Start(); err != nil {
clabby's avatar
clabby committed
294 295 296
			n.log.Error("Could not start the backup sync client", "err", err)
			return err
		}
297
		n.log.Info("Started L2-RPC sync service")
clabby's avatar
clabby committed
298 299
	}

300 301 302 303 304 305
	return nil
}

func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
	n.tracer.OnNewL1Head(ctx, sig)

306 307 308
	if n.l2Driver == nil {
		return
	}
309 310 311
	// Pass on the event to the L2 Engine
	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()
312
	if err := n.l2Driver.OnL1Head(ctx, sig); err != nil {
313
		n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
314
	}
315 316 317
}

func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) {
318 319 320
	if n.l2Driver == nil {
		return
	}
321 322 323 324 325 326 327
	// Pass on the event to the L2 Engine
	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()
	if err := n.l2Driver.OnL1Safe(ctx, sig); err != nil {
		n.log.Warn("failed to notify engine driver of L1 safe block change", "err", err)
	}
}
328

329
func (n *OpNode) OnNewL1Finalized(ctx context.Context, sig eth.L1BlockRef) {
330 331 332
	if n.l2Driver == nil {
		return
	}
333 334 335 336 337 338
	// Pass on the event to the L2 Engine
	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()
	if err := n.l2Driver.OnL1Finalized(ctx, sig); err != nil {
		n.log.Warn("failed to notify engine driver of L1 finalized block change", "err", err)
	}
339 340
}

341
func (n *OpNode) PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error {
342 343 344 345 346 347 348 349 350 351 352 353 354 355
	n.tracer.OnPublishL2Payload(ctx, payload)

	// publish to p2p, if we are running p2p at all
	if n.p2pNode != nil {
		if n.p2pSigner == nil {
			return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
		}
		n.log.Info("Publishing signed execution payload on p2p", "id", payload.ID())
		return n.p2pNode.GossipOut().PublishL2Payload(ctx, payload, n.p2pSigner)
	}
	// if p2p is not enabled then we just don't publish the payload
	return nil
}

356
func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error {
357 358 359 360 361 362 363 364 365
	// ignore if it's from ourselves
	if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
		return nil
	}

	n.tracer.OnUnsafeL2Payload(ctx, from, payload)

	n.log.Info("Received signed execution payload from p2p", "id", payload.ID(), "peer", from)

366 367 368
	// Pass on the event to the L2 Engine
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
369
	if err := n.l2Driver.OnUnsafeL2Payload(ctx, payload); err != nil {
370
		n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID())
371
	}
372

373 374 375
	return nil
}

376
func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
377 378 379
	if n.rpcSync != nil {
		return n.rpcSync.RequestL2Range(ctx, start, end)
	}
380
	if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
381 382 383 384
		if unixTimeStale(start.Time, 12*time.Hour) {
			n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
			return nil
		}
385 386 387
		return n.p2pNode.RequestL2Range(ctx, start, end)
	}
	n.log.Debug("ignoring request to sync L2 range, no sync method available", "start", start, "end", end)
388 389 390
	return nil
}

391 392 393 394 395
// unixTimeStale returns true if the unix timestamp is before the current time minus the supplied duration.
func unixTimeStale(timestamp uint64, duration time.Duration) bool {
	return time.Unix(int64(timestamp), 0).Before(time.Now().Add(-1 * duration))
}

396 397 398 399 400 401 402 403 404 405 406 407 408
func (n *OpNode) P2P() p2p.Node {
	return n.p2pNode
}

// Close closes all resources.
func (n *OpNode) Close() error {
	var result *multierror.Error

	if n.server != nil {
		n.server.Stop()
	}
	if n.p2pNode != nil {
		if err := n.p2pNode.Close(); err != nil {
409
			result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
410 411 412 413
		}
	}
	if n.p2pSigner != nil {
		if err := n.p2pSigner.Close(); err != nil {
414
			result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %w", err))
415 416 417 418 419 420 421 422 423 424 425 426
		}
	}

	if n.resourcesClose != nil {
		n.resourcesClose()
	}

	// stop L1 heads feed
	if n.l1HeadsSub != nil {
		n.l1HeadsSub.Unsubscribe()
	}

427 428 429
	// close L2 driver
	if n.l2Driver != nil {
		if err := n.l2Driver.Close(); err != nil {
430
			result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
431
		}
clabby's avatar
clabby committed
432 433

		// If the L2 sync client is present & running, close it.
434 435
		if n.rpcSync != nil {
			if err := n.rpcSync.Close(); err != nil {
clabby's avatar
clabby committed
436 437 438
				result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
			}
		}
439
	}
440

441 442 443
	// close L2 engine RPC client
	if n.l2Source != nil {
		n.l2Source.Close()
444
	}
445

446 447 448 449 450 451
	// close L1 data source
	if n.l1Source != nil {
		n.l1Source.Close()
	}
	return result.ErrorOrNil()
}
452 453 454 455 456 457 458 459

func (n *OpNode) ListenAddr() string {
	return n.server.listenAddr.String()
}

func (n *OpNode) HTTPEndpoint() string {
	return fmt.Sprintf("http://%s", n.ListenAddr())
}