node.go 9.35 KB
Newer Older
1 2 3 4 5 6 7 8 9
package node

import (
	"context"
	"fmt"
	"time"

	"github.com/libp2p/go-libp2p-core/peer"

10
	"github.com/hashicorp/go-multierror"
11

12
	"github.com/ethereum-optimism/optimism/op-node/client"
13
	"github.com/ethereum-optimism/optimism/op-node/eth"
14 15
	"github.com/ethereum-optimism/optimism/op-node/metrics"
	"github.com/ethereum-optimism/optimism/op-node/p2p"
16
	"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
17
	"github.com/ethereum-optimism/optimism/op-node/sources"
18 19 20 21 22 23 24 25 26

	"github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/event"
	"github.com/ethereum/go-ethereum/log"
)

type OpNode struct {
	log        log.Logger
	appVersion string
27
	metrics    *metrics.Metrics
28
	l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
29
	l1Source   *sources.L1Client     // L1 Client to fetch data from
30 31
	l2Driver   *driver.Driver        // L2 Engine to Sync
	l2Source   *sources.EngineClient // L2 Execution Engine RPC bindings
32
	server     *rpcServer            // RPC server hosting the rollup-node API
33
	p2pNode    *p2p.NodeP2P          // P2P node functionality
34 35 36 37 38 39 40 41 42 43 44 45
	p2pSigner  p2p.Signer            // p2p gogssip application messages will be signed with this signer
	tracer     Tracer                // tracer to get events for testing/debugging

	// some resources cannot be stopped directly, like the p2p gossipsub router (not our design),
	// and depend on this ctx to be closed.
	resourcesCtx   context.Context
	resourcesClose context.CancelFunc
}

// The OpNode handles incoming gossip
var _ p2p.GossipIn = (*OpNode)(nil)

46
func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logger, appVersion string, m *metrics.Metrics) (*OpNode, error) {
47 48 49 50 51 52 53
	if err := cfg.Check(); err != nil {
		return nil, err
	}

	n := &OpNode{
		log:        log,
		appVersion: appVersion,
54
		metrics:    m,
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
	}
	// not a context leak, gossipsub is closed with a context.
	n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())

	err := n.init(ctx, cfg, snapshotLog)
	if err != nil {
		// ensure we always close the node resources if we fail to initialize the node.
		if closeErr := n.Close(); closeErr != nil {
			return nil, multierror.Append(err, closeErr)
		}
		return nil, err
	}
	return n, nil
}

func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
	if err := n.initTracer(ctx, cfg); err != nil {
		return err
	}
	if err := n.initL1(ctx, cfg); err != nil {
		return err
	}
77
	if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
78 79 80 81 82 83 84 85 86 87 88 89
		return err
	}
	if err := n.initP2PSigner(ctx, cfg); err != nil {
		return err
	}
	if err := n.initP2P(ctx, cfg); err != nil {
		return err
	}
	// Only expose the server at the end, ensuring all RPC backend components are initialized.
	if err := n.initRPCServer(ctx, cfg); err != nil {
		return err
	}
90 91 92
	if err := n.initMetricsServer(ctx, cfg); err != nil {
		return err
	}
93 94 95 96 97 98 99 100 101 102 103 104 105
	return nil
}

func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error {
	if cfg.Tracer != nil {
		n.tracer = cfg.Tracer
	} else {
		n.tracer = new(noOpTracer)
	}
	return nil
}

func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
106
	l1Node, trustRPC, err := cfg.L1.Setup(ctx, n.log)
107
	if err != nil {
108
		return fmt.Errorf("failed to get L1 RPC client: %w", err)
109 110
	}

111 112 113
	n.l1Source, err = sources.NewL1Client(
		client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache,
		sources.L1ClientDefaultConfig(&cfg.Rollup, trustRPC))
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
	if err != nil {
		return fmt.Errorf("failed to create L1 source: %v", err)
	}

	// Keep subscribed to the L1 heads, which keeps the L1 maintainer pointing to the best headers to sync
	n.l1HeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
		if err != nil {
			n.log.Warn("resubscribing after failed L1 subscription", "err", err)
		}
		return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head)
	})
	go func() {
		err, ok := <-n.l1HeadsSub.Err()
		if !ok {
			return
		}
		n.log.Error("l1 heads subscription error", "err", err)
	}()
	return nil
}

135 136 137 138 139
func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
	rpcClient, err := cfg.L2.Setup(ctx, n.log)
	if err != nil {
		return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err)
	}
140 141 142 143 144

	n.l2Source, err = sources.NewEngineClient(
		client.NewInstrumentedRPC(rpcClient, n.metrics), n.log, n.metrics.L2SourceCache,
		sources.EngineClientDefaultConfig(&cfg.Rollup),
	)
145
	if err != nil {
146
		return fmt.Errorf("failed to create Engine client: %w", err)
147 148
	}

149
	n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n.log, snapshotLog, n.metrics)
150 151 152 153 154

	return nil
}

func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
155 156
	var err error
	n.server, err = newRPCServer(ctx, &cfg.RPC, &cfg.Rollup, n.l2Source.L2Client, n.l2Driver, n.log, n.appVersion, n.metrics)
157 158 159 160
	if err != nil {
		return err
	}
	if n.p2pNode != nil {
161
		n.server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
162
	}
163
	if cfg.RPC.EnableAdmin {
164
		n.server.EnableAdminAPI(newAdminAPI(n.l2Driver, n.metrics))
165
	}
166 167 168 169 170 171 172
	n.log.Info("Starting JSON-RPC server")
	if err := n.server.Start(); err != nil {
		return fmt.Errorf("unable to start RPC server: %w", err)
	}
	return nil
}

173 174 175 176 177 178 179 180 181 182 183 184 185 186
func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
	if !cfg.Metrics.Enabled {
		n.log.Info("metrics disabled")
		return nil
	}
	n.log.Info("starting metrics server", "addr", cfg.Metrics.ListenAddr, "port", cfg.Metrics.ListenPort)
	go func() {
		if err := n.metrics.Serve(ctx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
			log.Crit("error starting metrics server", "err", err)
		}
	}()
	return nil
}

187 188 189 190 191 192 193
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
	if cfg.P2P != nil {
		p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n)
		if err != nil {
			return err
		}
		n.p2pNode = p2pNode
194 195 196
		if n.p2pNode.Dv5Udp() != nil {
			go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
		}
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
	}
	return nil
}

func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
	// the p2p signer setup is optional
	if cfg.P2PSigner == nil {
		return nil
	}
	// p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional
	var err error
	n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx)
	return err
}

func (n *OpNode) Start(ctx context.Context) error {
213 214 215 216
	n.log.Info("Starting execution engine driver")
	// Request initial head update, default to genesis otherwise
	reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10)
	// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
217
	err := n.l2Driver.Start(reqCtx)
218 219 220 221
	reqCancel()
	if err != nil {
		n.log.Error("Could not start a rollup node", "err", err)
		return err
222 223 224 225 226 227 228 229
	}

	return nil
}

func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
	n.tracer.OnNewL1Head(ctx, sig)

230 231 232
	// Pass on the event to the L2 Engine
	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()
233
	if err := n.l2Driver.OnL1Head(ctx, sig); err != nil {
234
		n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
235
	}
236

237 238
}

239
func (n *OpNode) PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error {
240 241 242 243 244 245 246 247 248 249 250 251 252 253
	n.tracer.OnPublishL2Payload(ctx, payload)

	// publish to p2p, if we are running p2p at all
	if n.p2pNode != nil {
		if n.p2pSigner == nil {
			return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
		}
		n.log.Info("Publishing signed execution payload on p2p", "id", payload.ID())
		return n.p2pNode.GossipOut().PublishL2Payload(ctx, payload, n.p2pSigner)
	}
	// if p2p is not enabled then we just don't publish the payload
	return nil
}

254
func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error {
255 256 257 258 259 260 261 262 263
	// ignore if it's from ourselves
	if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
		return nil
	}

	n.tracer.OnUnsafeL2Payload(ctx, from, payload)

	n.log.Info("Received signed execution payload from p2p", "id", payload.ID(), "peer", from)

264 265 266
	// Pass on the event to the L2 Engine
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
267
	if err := n.l2Driver.OnUnsafeL2Payload(ctx, payload); err != nil {
268
		n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID())
269
	}
270

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
	return nil
}

func (n *OpNode) P2P() p2p.Node {
	return n.p2pNode
}

// Close closes all resources.
func (n *OpNode) Close() error {
	var result *multierror.Error

	if n.server != nil {
		n.server.Stop()
	}
	if n.p2pNode != nil {
		if err := n.p2pNode.Close(); err != nil {
287
			result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
288 289 290 291
		}
	}
	if n.p2pSigner != nil {
		if err := n.p2pSigner.Close(); err != nil {
292
			result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %w", err))
293 294 295 296 297 298 299 300 301 302 303 304
		}
	}

	if n.resourcesClose != nil {
		n.resourcesClose()
	}

	// stop L1 heads feed
	if n.l1HeadsSub != nil {
		n.l1HeadsSub.Unsubscribe()
	}

305 306 307
	// close L2 driver
	if n.l2Driver != nil {
		if err := n.l2Driver.Close(); err != nil {
308
			result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
309 310
		}
	}
311

312 313 314
	// close L2 engine RPC client
	if n.l2Source != nil {
		n.l2Source.Close()
315
	}
316

317 318 319 320 321 322
	// close L1 data source
	if n.l1Source != nil {
		n.l1Source.Close()
	}
	return result.ErrorOrNil()
}