Commit c836f1cb authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Remove multi L2 Engine Option (#2699)

* op-node: Remove multi L2 Engine Option

This stops the rollup node from driving multiple L2 engines at once.
This makes node lifecycle management easier.
It also has associated command line and testing changes.

* PR Fixups
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent ac062f8f
......@@ -335,9 +335,9 @@ func (cfg SystemConfig) start() (*System, error) {
L1NodeAddr: l1Node.WSEndpoint(),
L1TrustRPC: false,
}
rollupCfg.L2s = &rollupNode.L2EndpointsConfig{
L2EngineAddrs: []string{sys.nodes[name].WSAuthEndpoint()},
L2EngineJWTSecrets: [][32]byte{cfg.JWTSecret},
rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: sys.nodes[name].WSAuthEndpoint(),
L2EngineJWTSecret: cfg.JWTSecret,
}
}
......
......@@ -19,9 +19,9 @@ var (
Value: "http://127.0.0.1:8545",
EnvVar: prefixEnvVar("L1_ETH_RPC"),
}
L2EngineAddrs = cli.StringSliceFlag{
L2EngineAddr = cli.StringFlag{
Name: "l2",
Usage: "Addresses of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)",
Usage: "Address of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)",
Required: true,
EnvVar: prefixEnvVar("L2_ENGINE_RPC"),
}
......@@ -50,13 +50,13 @@ var (
Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data",
EnvVar: prefixEnvVar("L1_TRUST_RPC"),
}
L2EngineJWTSecret = cli.StringSliceFlag{
Name: "l2.jwt-secret",
Usage: "Paths to JWT secret keys, one per L2 endpoint, in the same order as the provided l2 addresses. " +
"Keys are 32 bytes, hex encoded in a file. A new key per endpoint will be generated if left empty.",
Required: false,
Value: &cli.StringSlice{},
EnvVar: prefixEnvVar("L2_ENGINE_AUTH"),
L2EngineJWTSecret = cli.StringFlag{
Name: "l2.jwt-secret",
Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.",
EnvVar: prefixEnvVar("L2_ENGINE_AUTH"),
Required: false,
Value: "",
Destination: new(string),
}
SequencingEnabledFlag = cli.BoolFlag{
Name: "sequencing.enabled",
......@@ -91,7 +91,7 @@ var (
var requiredFlags = []cli.Flag{
L1NodeAddr,
L2EngineAddrs,
L2EngineAddr,
RollupConfig,
RPCListenAddr,
RPCListenPort,
......
......@@ -10,9 +10,9 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
type L2EndpointsSetup interface {
type L2EndpointSetup interface {
// Setup a RPC client to a L2 execution engine to process rollup blocks with.
Setup(ctx context.Context, log log.Logger) (cl []*rpc.Client, err error)
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, err error)
Check() error
}
......@@ -21,62 +21,53 @@ type L1EndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error)
}
type L2EndpointsConfig struct {
L2EngineAddrs []string // Addresses of L2 Engine JSON-RPC endpoints to use (engine and eth namespace required)
type L2EndpointConfig struct {
L2EngineAddr string // Address of L2 Engine JSON-RPC endpoint to use (engine and eth namespace required)
// JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication, one per L2 engine.
// JWT secrets for L2 Engine API authentication during HTTP or initial Websocket communication.
// Any value for an IPC connection.
L2EngineJWTSecrets [][32]byte
L2EngineJWTSecret [32]byte
}
var _ L2EndpointsSetup = (*L2EndpointsConfig)(nil)
var _ L2EndpointSetup = (*L2EndpointConfig)(nil)
func (cfg *L2EndpointsConfig) Check() error {
if len(cfg.L2EngineAddrs) == 0 {
return errors.New("need at least one L2 engine to connect to")
}
if len(cfg.L2EngineAddrs) != len(cfg.L2EngineJWTSecrets) {
return fmt.Errorf("have %d L2 engines, but %d authentication secrets", len(cfg.L2EngineAddrs), len(cfg.L2EngineJWTSecrets))
func (cfg *L2EndpointConfig) Check() error {
if cfg.L2EngineAddr == "" {
return errors.New("empty L2 Engine Address")
}
return nil
}
func (cfg *L2EndpointsConfig) Setup(ctx context.Context, log log.Logger) ([]*rpc.Client, error) {
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
if err := cfg.Check(); err != nil {
return nil, err
}
var out []*rpc.Client
for i, addr := range cfg.L2EngineAddrs {
auth := rpc.NewJWTAuthProvider(cfg.L2EngineJWTSecrets[i])
l2Node, err := dialRPCClientWithBackoff(ctx, log, addr, auth)
if err != nil {
// close clients again if we cannot complete the full setup
for _, cl := range out {
cl.Close()
}
return out, err
}
out = append(out, l2Node)
auth := rpc.NewJWTAuthProvider(cfg.L2EngineJWTSecret)
l2Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L2EngineAddr, auth)
if err != nil {
return nil, err
}
return out, nil
return l2Node, nil
}
// PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines
type PreparedL2Endpoints struct {
Clients []*rpc.Client
Client *rpc.Client
}
func (p *PreparedL2Endpoints) Check() error {
if len(p.Clients) == 0 {
return errors.New("need at least one L2 engine to connect to")
if p.Client == nil {
return errors.New("client cannot be nil")
}
return nil
}
var _ L2EndpointsSetup = (*PreparedL2Endpoints)(nil)
var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil)
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) ([]*rpc.Client, error) {
return p.Clients, nil
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) {
return p.Client, nil
}
type L1EndpointConfig struct {
......
......@@ -9,8 +9,8 @@ import (
)
type Config struct {
L1 L1EndpointSetup
L2s L2EndpointsSetup
L1 L1EndpointSetup
L2 L2EndpointSetup
Rollup rollup.Config
......@@ -36,7 +36,7 @@ type RPCConfig struct {
// Check verifies that the given configuration makes sense
func (cfg *Config) Check() error {
if err := cfg.L2s.Check(); err != nil {
if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %v", err)
}
if err := cfg.Rollup.Check(); err != nil {
......
......@@ -2,9 +2,7 @@ package node
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
......@@ -29,9 +27,8 @@ type OpNode struct {
appVersion string
l1HeadsSub ethereum.Subscription // Subscription to get L1 heads (automatically re-subscribes on error)
l1Source *l1.Source // Source to fetch data from (also implements the Downloader interface)
l2Lock sync.Mutex // Mutex to safely add and use different L2 resources in parallel
l2Engines []*driver.Driver // engines to keep synced
l2Nodes []*rpc.Client // L2 Execution Engines to close at shutdown
l2Engine *driver.Driver // L2 Engine to Sync
l2Node *rpc.Client // L2 Execution Engine RPC connections to close at shutdown
server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
......@@ -76,7 +73,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL1(ctx, cfg); err != nil {
return err
}
if err := n.initL2s(ctx, cfg, snapshotLog); err != nil {
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return err
}
if err := n.initP2PSigner(ctx, cfg); err != nil {
......@@ -129,48 +126,26 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
return nil
}
// AttachEngine attaches an engine to the rollup node.
func (n *OpNode) AttachEngine(ctx context.Context, cfg *Config, tag string, cl *rpc.Client, snapshotLog log.Logger) error {
n.l2Lock.Lock()
defer n.l2Lock.Unlock()
engLog := n.log.New("engine", tag)
client, err := l2.NewSource(cl, &cfg.Rollup.Genesis, engLog)
func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
rpcClient, err := cfg.L2.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client: %w", err)
}
n.l2Node = rpcClient
client, err := l2.NewSource(rpcClient, &cfg.Rollup.Genesis, n.log)
if err != nil {
cl.Close()
return err
}
snap := snapshotLog.New("engine_addr", tag)
engine := driver.NewDriver(cfg.Rollup, client, n.l1Source, n, engLog, snap, cfg.Sequencer)
n.l2Nodes = append(n.l2Nodes, cl)
n.l2Engines = append(n.l2Engines, engine)
return nil
}
snap := snapshotLog.New()
n.l2Engine = driver.NewDriver(cfg.Rollup, client, n.l1Source, n, n.log, snap, cfg.Sequencer)
func (n *OpNode) initL2s(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
clients, err := cfg.L2s.Setup(ctx, n.log)
if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client(s): %v", err)
}
for i, cl := range clients {
if err := n.AttachEngine(ctx, cfg, fmt.Sprintf("eng_%d", i), cl, snapshotLog); err != nil {
return fmt.Errorf("failed to attach configured engine %d: %v", i, err)
}
}
return nil
}
func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
if len(n.l2Nodes) == 0 {
return errors.New("need at least one L2 node to serve rollup RPC")
}
l2Node := n.l2Nodes[0]
// TODO: attach the p2p node ID to the snapshot logger
client, err := l2.NewReadOnlySource(l2Node, &cfg.Rollup.Genesis, n.log)
client, err := l2.NewReadOnlySource(n.l2Node, &cfg.Rollup.Genesis, n.log)
if err != nil {
return err
}
......@@ -214,38 +189,30 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
}
func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Starting execution engine driver(s)")
for _, eng := range n.l2Engines {
// Request initial head update, default to genesis otherwise
reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10)
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := eng.Start(reqCtx)
reqCancel()
if err != nil {
n.log.Error("Could not start a rollup node", "err", err)
return err
}
n.log.Info("Starting execution engine driver")
// Request initial head update, default to genesis otherwise
reqCtx, reqCancel := context.WithTimeout(ctx, time.Second*10)
// start driving engine: sync blocks by deriving them from L1 and driving them into the engine
err := n.l2Engine.Start(reqCtx)
reqCancel()
if err != nil {
n.log.Error("Could not start a rollup node", "err", err)
return err
}
return nil
}
func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
n.l2Lock.Lock()
defer n.l2Lock.Unlock()
n.tracer.OnNewL1Head(ctx, sig)
// fan-out to all engine drivers
for _, eng := range n.l2Engines {
go func(eng *driver.Driver) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := eng.OnL1Head(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
}
}(eng)
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := n.l2Engine.OnL1Head(ctx, sig); err != nil {
n.log.Warn("failed to notify engine driver of L1 head change", "err", err)
}
}
func (n *OpNode) PublishL2Payload(ctx context.Context, payload *l2.ExecutionPayload) error {
......@@ -264,9 +231,6 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, payload *l2.ExecutionPayl
}
func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l2.ExecutionPayload) error {
n.l2Lock.Lock()
defer n.l2Lock.Unlock()
// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
return nil
......@@ -276,16 +240,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l
n.log.Info("Received signed execution payload from p2p", "id", payload.ID(), "peer", from)
// fan-out to all engine drivers
for _, eng := range n.l2Engines {
go func(eng *driver.Driver) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if err := eng.OnUnsafeL2Payload(ctx, payload); err != nil {
n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID())
}
}(eng)
// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if err := n.l2Engine.OnUnsafeL2Payload(ctx, payload); err != nil {
n.log.Warn("failed to notify engine driver of new L2 payload", "err", err, "id", payload.ID())
}
return nil
}
......@@ -302,12 +263,12 @@ func (n *OpNode) Close() error {
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %v", err))
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
}
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %v", err))
result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %w", err))
}
}
......@@ -320,16 +281,18 @@ func (n *OpNode) Close() error {
n.l1HeadsSub.Unsubscribe()
}
// close L2 engines
for _, eng := range n.l2Engines {
if err := eng.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %v", err))
// close L2 engine
if n.l2Engine != nil {
if err := n.l2Engine.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine driver cleanly: %w", err))
}
}
// close L2 nodes
for _, n := range n.l2Nodes {
n.Close()
// close L2 node
if n.l2Node != nil {
n.l2Node.Close()
}
// close L1 data source
if n.l1Source != nil {
n.l1Source.Close()
......
......@@ -44,14 +44,14 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load l1 endpoint info: %v", err)
}
l2Endpoints, err := NewL2EndpointsConfig(ctx, log)
l2Endpoint, err := NewL2EndpointConfig(ctx, log)
if err != nil {
return nil, fmt.Errorf("failed to load l2 endpoints info: %v", err)
}
cfg := &node.Config{
L1: l1Endpoint,
L2s: l2Endpoints,
L2: l2Endpoint,
Rollup: *rollupConfig,
Sequencer: enableSequencing,
RPC: node.RPCConfig{
......@@ -74,38 +74,33 @@ func NewL1EndpointConfig(ctx *cli.Context) (*node.L1EndpointConfig, error) {
}, nil
}
func NewL2EndpointsConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointsConfig, error) {
l2Addrs := ctx.GlobalStringSlice(flags.L2EngineAddrs.Name)
engineJWTSecrets := ctx.GlobalStringSlice(flags.L2EngineJWTSecret.Name)
var secrets [][32]byte
for i, fileName := range engineJWTSecrets {
fileName = strings.TrimSpace(fileName)
if fileName == "" {
return nil, fmt.Errorf("file-name of jwt secret %d is empty", i)
func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConfig, error) {
l2Addr := ctx.GlobalString(flags.L2EngineAddr.Name)
fileName := ctx.GlobalString(flags.L2EngineJWTSecret.Name)
var secret [32]byte
fileName = strings.TrimSpace(fileName)
if fileName == "" {
return nil, fmt.Errorf("file-name of jwt secret is empty")
}
if data, err := os.ReadFile(fileName); err == nil {
jwtSecret := common.FromHex(strings.TrimSpace(string(data)))
if len(jwtSecret) != 32 {
return nil, fmt.Errorf("invalid jwt secret in path %s, not 32 hex-formatted bytes", fileName)
}
copy(secret[:], jwtSecret)
} else {
log.Warn("Failed to read JWT secret from file, generating a new one now. Configure L2 geth with --authrpc.jwt-secret=" + fmt.Sprintf("%q", fileName))
if _, err := io.ReadFull(rand.Reader, secret[:]); err != nil {
return nil, fmt.Errorf("failed to generate jwt secret: %v", err)
}
if data, err := os.ReadFile(fileName); err == nil {
jwtSecret := common.FromHex(strings.TrimSpace(string(data)))
if len(jwtSecret) != 32 {
return nil, fmt.Errorf("invalid jwt secret in path %s, not 32 hex-formatted bytes", fileName)
}
var secret [32]byte
copy(secret[:], jwtSecret)
secrets = append(secrets, secret)
} else {
log.Warn("Failed to read JWT secret from file, generating a new one now. Configure L2 geth with --authrpc.jwt-secret=" + fmt.Sprintf("%q", fileName))
var secret [32]byte
if _, err := io.ReadFull(rand.Reader, secret[:]); err != nil {
return nil, fmt.Errorf("failed to generate jwt secret: %v", err)
}
secrets = append(secrets, secret)
if err := os.WriteFile(fileName, []byte(hexutil.Encode(secret[:])), 0600); err != nil {
return nil, err
}
if err := os.WriteFile(fileName, []byte(hexutil.Encode(secret[:])), 0600); err != nil {
return nil, err
}
}
return &node.L2EndpointsConfig{
L2EngineAddrs: l2Addrs,
L2EngineJWTSecrets: secrets,
return &node.L2EndpointConfig{
L2EngineAddr: l2Addr,
L2EngineJWTSecret: secret,
}, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment