Commit e7058c94 authored by Zach Howard's avatar Zach Howard Committed by GitHub

op-conductor: adds execution and node rpc proxy backends (#9059)

parent b22a3481
......@@ -61,6 +61,9 @@ type CLIConfig struct {
// the data availability type to use for posting batches, e.g. blobs vs calldata.
DataAvailabilityType flags.DataAvailabilityType
// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration
TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
......@@ -120,17 +123,18 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
PollInterval: ctx.Duration(flags.PollIntervalFlag.Name),
/* Optional Flags */
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
}
}
......@@ -131,7 +131,7 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er
if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
ethUrls := strings.Split(cfg.L2EthRpc, ",")
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, bs.Log)
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, bs.Log)
} else {
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
}
......
......@@ -93,6 +93,12 @@ var (
}(),
EnvVars: prefixEnvVars("DATA_AVAILABILITY_TYPE"),
}
ActiveSequencerCheckDurationFlag = &cli.DurationFlag{
Name: "active-sequencer-check-duration",
Usage: "The duration between checks to determine the active sequencer endpoint. ",
Value: 2 * time.Minute,
EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
......@@ -113,6 +119,7 @@ var optionalFlags = []cli.Flag{
SequencerHDPathFlag,
BatchTypeFlag,
DataAvailabilityTypeFlag,
ActiveSequencerCheckDurationFlag,
}
func init() {
......
......@@ -48,6 +48,9 @@ type Config struct {
// RollupCfg is the rollup config.
RollupCfg rollup.Config
// RPCEnableProxy is true if the sequencer RPC proxy should be enabled.
RPCEnableProxy bool
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
......@@ -116,11 +119,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name),
MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name),
},
RollupCfg: *rollupCfg,
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
RollupCfg: *rollupCfg,
RPCEnableProxy: ctx.Bool(flags.RPCEnableProxy.Name),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
}, nil
}
......
......@@ -21,6 +21,7 @@ import (
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
......@@ -198,6 +199,29 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error {
Version: oc.version,
Service: api,
})
if oc.cfg.RPCEnableProxy {
execClient, err := dial.DialEthClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.ExecutionRPC)
if err != nil {
return errors.Wrap(err, "failed to create execution rpc client")
}
executionProxy := conductorrpc.NewExecutionProxyBackend(oc.log, oc, execClient)
server.AddAPI(rpc.API{
Namespace: conductorrpc.ExecutionRPCNamespace,
Service: executionProxy,
})
nodeClient, err := dial.DialRollupClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.NodeRPC)
if err != nil {
return errors.Wrap(err, "failed to create node rpc client")
}
nodeProxy := conductorrpc.NewNodeProxyBackend(oc.log, oc, nodeClient)
server.AddAPI(rpc.API{
Namespace: conductorrpc.NodeRPCNamespace,
Service: nodeProxy,
})
}
oc.rpcServer = server
return nil
}
......
......@@ -70,6 +70,7 @@ func mockConfig(t *testing.T) Config {
L1SystemConfigAddress: [20]byte{3, 4},
ProtocolVersionsAddress: [20]byte{4, 5},
},
RPCEnableProxy: false,
}
}
......
......@@ -69,6 +69,12 @@ var (
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "PAUSED"),
Value: false,
}
RPCEnableProxy = &cli.BoolFlag{
Name: "rpc.enable-proxy",
Usage: "Enable the RPC proxy to underlying sequencer services",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RPC_ENABLE_PROXY"),
Value: true,
}
)
var requiredFlags = []cli.Flag{
......@@ -85,6 +91,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
Paused,
RPCEnableProxy,
}
func init() {
......
......@@ -2,8 +2,15 @@ package rpc
import (
"context"
"errors"
"math/big"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
)
var (
ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer")
)
type ServerInfo struct {
......@@ -42,3 +49,17 @@ type API interface {
// CommitUnsafePayload commits a unsafe payload (lastest head) to the consensus layer.
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
}
// ExecutionProxyAPI defines the methods proxied to the execution rpc backend
// This should include all methods that are called by op-batcher or op-proposer
type ExecutionProxyAPI interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
}
// NodeProxyAPI defines the methods proxied to the node rpc backend
// This should include all methods that are called by op-batcher or op-proposer
type NodeProxyAPI interface {
OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error)
SequencerActive(ctx context.Context) (bool, error)
SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
}
package rpc
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
var ExecutionRPCNamespace = "eth"
// ExecutionProxyBackend implements an execution rpc proxy with a leadership check before each call.
type ExecutionProxyBackend struct {
log log.Logger
con conductor
client *ethclient.Client
}
var _ ExecutionProxyAPI = (*ExecutionProxyBackend)(nil)
func NewExecutionProxyBackend(log log.Logger, con conductor, client *ethclient.Client) *ExecutionProxyBackend {
return &ExecutionProxyBackend{
log: log,
con: con,
client: client,
}
}
func (api *ExecutionProxyBackend) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
block, err := api.client.BlockByNumber(ctx, number)
if err != nil {
return nil, err
}
if !api.con.Leader(ctx) {
return nil, ErrNotLeader
}
return block, nil
}
package rpc
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/log"
)
var NodeRPCNamespace = "optimism"
// NodeProxyBackend implements a node rpc proxy with a leadership check before each call.
type NodeProxyBackend struct {
log log.Logger
con conductor
client *sources.RollupClient
}
var _ NodeProxyAPI = (*NodeProxyBackend)(nil)
func NewNodeProxyBackend(log log.Logger, con conductor, client *sources.RollupClient) *NodeProxyBackend {
return &NodeProxyBackend{
log: log,
con: con,
client: client,
}
}
func (api *NodeProxyBackend) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
status, err := api.client.SyncStatus(ctx)
if err != nil {
return nil, err
}
if !api.con.Leader(ctx) {
return nil, ErrNotLeader
}
return status, err
}
func (api *NodeProxyBackend) OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) {
output, err := api.client.OutputAtBlock(ctx, blockNum)
if err != nil {
return nil, err
}
if !api.con.Leader(ctx) {
return nil, ErrNotLeader
}
return output, nil
}
func (api *NodeProxyBackend) SequencerActive(ctx context.Context) (bool, error) {
active, err := api.client.SequencerActive(ctx)
if err != nil {
return false, err
}
if !api.con.Leader(ctx) {
return false, ErrNotLeader
}
return active, err
}
......@@ -44,12 +44,17 @@ type conductor struct {
service *con.OpConductor
client conrpc.API
consensusPort int
rpcPort int
}
func (c *conductor) ConsensusEndpoint() string {
return fmt.Sprintf("%s:%d", localhost, c.consensusPort)
}
func (c *conductor) RPCEndpoint() string {
return fmt.Sprintf("%s:%d", localhost, c.rpcPort)
}
func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
InitParallel(t)
ctx := context.Background()
......@@ -59,9 +64,6 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
sys, err := cfg.Start(t)
require.NoError(t, err)
// 1 batcher that listens to all 3 sequencers, in started mode.
setupBatcher(t, sys)
// 3 conductors that connects to 1 sequencer each.
conductors := make(map[string]*conductor)
......@@ -81,6 +83,9 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
conductors[cfg.name] = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.bootstrap, *sys.RollupConfig)
}
// 1 batcher that listens to all 3 sequencers, in started mode.
setupBatcher(t, sys, conductors)
// form a cluster
c1 := conductors[sequencer1Name]
c2 := conductors[sequencer2Name]
......@@ -128,20 +133,21 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
func setupConductor(
t *testing.T,
serverID, dir, nodePRC, engineRPC string,
serverID, dir, nodeRPC, engineRPC string,
bootstrap bool,
rollupCfg rollup.Config,
) *conductor {
// it's unfortunate that it is not possible to pass 0 as consensus port and get back the actual assigned port from raft implementation.
// So we find an available port and pass it in to avoid test flakiness (avoid port already in use error).
consensusPort := findAvailablePort(t)
rpcPort := findAvailablePort(t)
cfg := con.Config{
ConsensusAddr: localhost,
ConsensusPort: consensusPort,
RaftServerID: serverID,
RaftStorageDir: dir,
RaftBootstrap: bootstrap,
NodeRPC: nodePRC,
NodeRPC: nodeRPC,
ExecutionRPC: engineRPC,
Paused: true,
HealthCheck: con.HealthCheckConfig{
......@@ -149,14 +155,15 @@ func setupConductor(
SafeInterval: 4, // per test setup (l1 block time = 2s, max channel duration = 1, 2s buffer)
MinPeerCount: 2, // per test setup, each sequencer has 2 peers
},
RollupCfg: rollupCfg,
RollupCfg: rollupCfg,
RPCEnableProxy: true,
LogConfig: oplog.CLIConfig{
Level: log.LvlInfo,
Color: false,
},
RPC: oprpc.CLIConfig{
ListenAddr: localhost,
ListenPort: 0,
ListenPort: rpcPort,
},
}
......@@ -174,10 +181,11 @@ func setupConductor(
service: service,
client: client,
consensusPort: consensusPort,
rpcPort: rpcPort,
}
}
func setupBatcher(t *testing.T, sys *System) {
func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
var batchType uint = derive.SingularBatchType
if sys.Cfg.DeployConfig.L2GenesisDeltaTimeOffset != nil && *sys.Cfg.DeployConfig.L2GenesisDeltaTimeOffset == hexutil.Uint64(0) {
batchType = derive.SpanBatchType
......@@ -189,14 +197,14 @@ func setupBatcher(t *testing.T, sys *System) {
// enable active sequencer follow mode.
l2EthRpc := strings.Join([]string{
sys.EthInstances[sequencer1Name].WSEndpoint(),
sys.EthInstances[sequencer2Name].WSEndpoint(),
sys.EthInstances[sequencer3Name].WSEndpoint(),
conductors[sequencer1Name].RPCEndpoint(),
conductors[sequencer2Name].RPCEndpoint(),
conductors[sequencer3Name].RPCEndpoint(),
}, ",")
rollupRpc := strings.Join([]string{
sys.RollupNodes[sequencer1Name].HTTPEndpoint(),
sys.RollupNodes[sequencer2Name].HTTPEndpoint(),
sys.RollupNodes[sequencer3Name].HTTPEndpoint(),
conductors[sequencer1Name].RPCEndpoint(),
conductors[sequencer2Name].RPCEndpoint(),
conductors[sequencer3Name].RPCEndpoint(),
}, ",")
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
......@@ -217,9 +225,10 @@ func setupBatcher(t *testing.T, sys *System) {
Level: log.LvlInfo,
Format: oplog.FormatText,
},
Stopped: false,
BatchType: batchType,
DataAvailabilityType: batcherFlags.CalldataType,
Stopped: false,
BatchType: batchType,
DataAvailabilityType: batcherFlags.CalldataType,
ActiveSequencerCheckDuration: 0,
}
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
......
......@@ -69,6 +69,12 @@ var (
EnvVars: prefixEnvVars("DG_TYPE"),
Hidden: true,
}
ActiveSequencerCheckDurationFlag = &cli.DurationFlag{
Name: "active-sequencer-check-duration",
Usage: "The duration between checks to determine the active sequencer endpoint. ",
Value: 2 * time.Minute,
EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"),
}
// Legacy Flags
L2OutputHDPathFlag = txmgr.L2OutputHDPathFlag
)
......@@ -86,6 +92,7 @@ var optionalFlags = []cli.Flag{
DisputeGameFactoryAddressFlag,
ProposalIntervalFlag,
DisputeGameTypeFlag,
ActiveSequencerCheckDurationFlag,
}
func init() {
......
......@@ -55,6 +55,9 @@ type CLIConfig struct {
// DisputeGameType is the type of dispute game to create when submitting an output proposal.
DisputeGameType uint8
// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration
}
func (c *CLIConfig) Check() error {
......@@ -94,13 +97,14 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
PollInterval: ctx.Duration(flags.PollIntervalFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
// Optional Flags
AllowNonFinalized: ctx.Bool(flags.AllowNonFinalizedFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
DGFAddress: ctx.String(flags.DisputeGameFactoryAddressFlag.Name),
ProposalInterval: ctx.Duration(flags.ProposalIntervalFlag.Name),
DisputeGameType: uint8(ctx.Uint(flags.DisputeGameTypeFlag.Name)),
AllowNonFinalized: ctx.Bool(flags.AllowNonFinalizedFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
DGFAddress: ctx.String(flags.DisputeGameFactoryAddressFlag.Name),
ProposalInterval: ctx.Duration(flags.ProposalIntervalFlag.Name),
DisputeGameType: uint8(ctx.Uint(flags.DisputeGameTypeFlag.Name)),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
}
}
......@@ -127,7 +127,7 @@ func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) e
var rollupProvider dial.RollupProvider
if strings.Contains(cfg.RollupRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, ps.Log)
rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, ps.Log)
} else {
rollupProvider, err = dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment