Commit 56624482 authored by protolambda's avatar protolambda Committed by GitHub

op-conductor,op-node: allow system to select port, make op-node wait for...

op-conductor,op-node: allow system to select port, make op-node wait for conductor endpoint (#12863)

* op-conductor,op-node: allow system to select port, make op-node wait for conductor endpoint

* op-conductor,op-node: debugging conductor test

* op-conductor: more debugging

* op-e2e: increase conductor timeout
parent 4c656b3a
......@@ -19,12 +19,19 @@ import (
)
type Config struct {
// ConsensusAddr is the address to listen for consensus connections.
// ConsensusAddr is the address, excluding port, to listen on for consensus connections.
// E.g. 0.0.0.0 to bind to the external-facing network interface.
ConsensusAddr string
// ConsensusPort is the port to listen for consensus connections.
// ConsensusPort is the port to listen on for consensus connections.
// If 0, the server binds to a port selected by the system.
ConsensusPort int
// ConsensusAdvertisedAddr is the network address, including port, to advertise to other peers.
// This is optional: if empty, the address that the server network transport binds to is used instead.
// E.g. local tests may use temporary addresses, rather than preset known addresses.
ConsensusAdvertisedAddr string
// RaftServerID is the unique ID for this server used by raft consensus.
RaftServerID string
......@@ -119,6 +126,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
return &Config{
ConsensusAddr: ctx.String(flags.ConsensusAddr.Name),
ConsensusPort: ctx.Int(flags.ConsensusPort.Name),
// The consensus server will advertise the address it binds to if this is empty/unspecified.
ConsensusAdvertisedAddr: ctx.String(flags.AdvertisedFullAddr.Name),
RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name),
RaftServerID: ctx.String(flags.RaftServerID.Name),
RaftStorageDir: ctx.String(flags.RaftStorageDir.Name),
......
......@@ -169,10 +169,12 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
return nil
}
serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort)
raftConsensusConfig := &consensus.RaftConsensusConfig{
ServerID: c.cfg.RaftServerID,
ServerAddr: serverAddr,
// AdvertisedAddr may be empty: the server will then default to what it binds to.
AdvertisedAddr: raft.ServerAddress(c.cfg.ConsensusAdvertisedAddr),
ListenAddr: c.cfg.ConsensusAddr,
ListenPort: c.cfg.ConsensusPort,
StorageDir: c.cfg.RaftStorageDir,
Bootstrap: c.cfg.RaftBootstrap,
RollupCfg: &c.cfg.RollupCfg,
......@@ -472,6 +474,12 @@ func (oc *OpConductor) Paused() bool {
return oc.paused.Load()
}
// ConsensusEndpoint returns the raft consensus server address to connect to.
func (oc *OpConductor) ConsensusEndpoint() string {
return oc.cons.Addr()
}
// HTTPEndpoint returns the HTTP RPC endpoint
func (oc *OpConductor) HTTPEndpoint() string {
if oc.rpcServer == nil {
return ""
......@@ -613,7 +621,8 @@ func (oc *OpConductor) handleHealthUpdate(hcerr error) {
oc.queueAction()
}
if oc.healthy.Swap(healthy) != healthy {
if old := oc.healthy.Swap(healthy); old != healthy {
oc.log.Info("Health state changed", "old", old, "new", healthy)
// queue an action if health status changed.
oc.queueAction()
}
......
......@@ -30,7 +30,7 @@ func mockConfig(t *testing.T) Config {
now := uint64(time.Now().Unix())
return Config{
ConsensusAddr: "127.0.0.1",
ConsensusPort: 50050,
ConsensusPort: 0,
RaftServerID: "SequencerA",
RaftStorageDir: "/tmp/raft",
RaftBootstrap: false,
......
......@@ -42,6 +42,9 @@ type ServerInfo struct {
//
//go:generate mockery --name Consensus --output mocks/ --with-expecter=true
type Consensus interface {
// Addr returns the address of this consensus server.
// Internally the server may override what is advertised, or fall back to the address it listens to.
Addr() string
// AddVoter adds a voting member into the cluster, voter is eligible to become leader.
// If version is non-zero, this will only be applied if the current cluster version matches the expected version.
AddVoter(id, addr string, version uint64) error
......
// Code generated by mockery v2.39.1. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package mocks
......@@ -118,6 +118,51 @@ func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string, uint64)
return _c
}
// Addr provides a mock function with given fields:
func (_m *Consensus) Addr() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Addr")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Consensus_Addr_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Addr'
type Consensus_Addr_Call struct {
*mock.Call
}
// Addr is a helper method to define mock.On call
func (_e *Consensus_Expecter) Addr() *Consensus_Addr_Call {
return &Consensus_Addr_Call{Call: _e.mock.On("Addr")}
}
func (_c *Consensus_Addr_Call) Run(run func()) *Consensus_Addr_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_Addr_Call) Return(_a0 string) *Consensus_Addr_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_Addr_Call) RunAndReturn(run func() string) *Consensus_Addr_Call {
_c.Call.Return(run)
return _c
}
// ClusterMembership provides a mock function with given fields:
func (_m *Consensus) ClusterMembership() (*consensus.ClusterMembership, error) {
ret := _m.Called()
......
......@@ -29,12 +29,30 @@ type RaftConsensus struct {
serverID raft.ServerID
r *raft.Raft
transport *raft.NetworkTransport
// advertisedAddr is the host & port to contact this server.
// If empty, the address of the transport should be used instead.
advertisedAddr string
unsafeTracker *unsafeHeadTracker
}
type RaftConsensusConfig struct {
ServerID string
ServerAddr string
// AdvertisedAddr is the address to advertise,
// i.e. the address external raft peers use to contact us.
// If left empty, it defaults to the resulting
// local address that we bind the underlying transport to.
AdvertisedAddr raft.ServerAddress
// ListenPort is the port to bind the server to.
// This may be 0, an available port will then be selected by the system.
ListenPort int
// ListenAddr is the address to bind the server to.
// E.g. use 0.0.0.0 to bind to an external-facing network.
ListenAddr string
StorageDir string
Bootstrap bool
RollupCfg *rollup.Config
......@@ -86,18 +104,31 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus,
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q): %w`, baseDir, err)
}
addr, err := net.ResolveTCPAddr("tcp", cfg.ServerAddr)
var advertiseAddr net.Addr
if cfg.AdvertisedAddr == "" {
log.Warn("No advertised address specified. Advertising local address.")
} else {
x, err := net.ResolveTCPAddr("tcp", string(cfg.AdvertisedAddr))
if err != nil {
return nil, errors.Wrap(err, "failed to resolve tcp address")
return nil, fmt.Errorf("failed to resolve advertised TCP address %q: %w", string(cfg.AdvertisedAddr), err)
}
advertiseAddr = x
log.Info("Resolved advertising address", "adAddr", cfg.AdvertisedAddr,
"adIP", x.IP, "adPort", x.Port, "adZone", x.Zone)
}
bindAddr := fmt.Sprintf("%s:%d", cfg.ListenAddr, cfg.ListenPort)
log.Info("Binding raft server to network transport", "listenAddr", bindAddr)
maxConnPool := 10
timeout := 5 * time.Second
bindAddr := fmt.Sprintf("0.0.0.0:%d", addr.Port)
transport, err := raft.NewTCPTransportWithLogger(bindAddr, addr, maxConnPool, timeout, rc.Logger)
// When advertiseAddr == nil, the transport will use the local address that it is bound to.
transport, err := raft.NewTCPTransportWithLogger(bindAddr, advertiseAddr, maxConnPool, timeout, rc.Logger)
if err != nil {
return nil, errors.Wrap(err, "failed to create raft tcp transport")
}
log.Info("Raft server network transport is up", "addr", transport.LocalAddr())
fsm := NewUnsafeHeadTracker(log)
......@@ -110,11 +141,19 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus,
// If bootstrap = true, start raft in bootstrap mode, this will allow the current node to elect itself as leader when there's no other participants
// and allow other nodes to join the cluster.
if cfg.Bootstrap {
var advertisedAddr raft.ServerAddress
if cfg.AdvertisedAddr == "" {
advertisedAddr = transport.LocalAddr()
} else {
advertisedAddr = cfg.AdvertisedAddr
}
log.Info("Bootstrapping raft consensus cluster with self", "addr", advertisedAddr)
raftCfg := raft.Configuration{
Servers: []raft.Server{
{
ID: rc.LocalID,
Address: raft.ServerAddress(cfg.ServerAddr),
Address: advertisedAddr,
Suffrage: raft.Voter,
},
},
......@@ -132,9 +171,20 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus,
serverID: raft.ServerID(cfg.ServerID),
unsafeTracker: fsm,
rollupCfg: cfg.RollupCfg,
transport: transport,
}, nil
}
// Addr returns the address to contact this raft consensus server.
// If no explicit address to advertise was configured,
// the local network address that the raft-consensus server is listening on will be used.
func (rc *RaftConsensus) Addr() string {
if rc.advertisedAddr != "" {
return rc.advertisedAddr
}
return string(rc.transport.LocalAddr())
}
// AddNonVoter implements Consensus, it tries to add a non-voting member into the cluster.
func (rc *RaftConsensus) AddNonVoter(id string, addr string, version uint64) error {
if err := checkTCPPortOpen(addr); err != nil {
......
......@@ -28,7 +28,9 @@ func TestCommitAndRead(t *testing.T) {
}
raftConsensusConfig := &RaftConsensusConfig{
ServerID: "SequencerA",
ServerAddr: "127.0.0.1:0",
ListenPort: 0,
ListenAddr: "127.0.0.1", // local test, don't bind to external interface
AdvertisedAddr: "", // use local address that the server binds to
StorageDir: storageDir,
Bootstrap: true,
RollupCfg: rollupCfg,
......
......@@ -19,16 +19,22 @@ const EnvVarPrefix = "OP_CONDUCTOR"
var (
ConsensusAddr = &cli.StringFlag{
Name: "consensus.addr",
Usage: "Address to listen for consensus connections",
Usage: "Address (excluding port) to listen for consensus connections.",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "CONSENSUS_ADDR"),
Value: "127.0.0.1",
}
ConsensusPort = &cli.IntFlag{
Name: "consensus.port",
Usage: "Port to listen for consensus connections",
Usage: "Port to listen for consensus connections. May be 0 to let the system select a port.",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "CONSENSUS_PORT"),
Value: 50050,
}
AdvertisedFullAddr = &cli.StringFlag{
Name: "consensus.advertised",
Usage: "Full address (host and port) for other peers to contact the consensus server. Optional: if left empty, the local address is advertised.",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "CONSENSUS_ADVERTISED"),
Value: "",
}
RaftBootstrap = &cli.BoolFlag{
Name: "raft.bootstrap",
Usage: "If this node should bootstrap a new raft cluster",
......@@ -127,6 +133,7 @@ var requiredFlags = []cli.Flag{
}
var optionalFlags = []cli.Flag{
AdvertisedFullAddr,
Paused,
RPCEnableProxy,
RaftBootstrap,
......
......@@ -2,9 +2,8 @@ package conductor
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"strings"
"testing"
"time"
......@@ -53,26 +52,21 @@ var retryStrategy = &retry.FixedStrategy{Dur: 50 * time.Millisecond}
type conductor struct {
service *con.OpConductor
client conrpc.API
consensusPort int
rpcPort int
}
func (c *conductor) ConsensusEndpoint() string {
return fmt.Sprintf("%s:%d", localhost, c.consensusPort)
return c.service.ConsensusEndpoint()
}
func (c *conductor) RPCEndpoint() string {
return fmt.Sprintf("http://%s:%d", localhost, c.rpcPort)
return c.service.HTTPEndpoint()
}
func setupSequencerFailoverTest(t *testing.T) (*e2esys.System, map[string]*conductor, func()) {
op_e2e.InitParallel(t)
ctx := context.Background()
sys, conductors, err := retry.Do2(ctx, maxSetupRetries, retryStrategy, func() (*e2esys.System, map[string]*conductor, error) {
return setupHAInfra(t, ctx)
})
require.NoError(t, err, "Expected to successfully setup sequencers and conductors after retry")
sys, conductors := setupHAInfra(t, ctx)
// form a cluster
c1 := conductors[Sequencer1Name]
......@@ -143,79 +137,80 @@ func setupSequencerFailoverTest(t *testing.T) (*e2esys.System, map[string]*condu
}
}
func setupHAInfra(t *testing.T, ctx context.Context) (*e2esys.System, map[string]*conductor, error) {
func setupHAInfra(t *testing.T, ctx context.Context) (*e2esys.System, map[string]*conductor) {
startTime := time.Now()
var sys *e2esys.System
var conductors map[string]*conductor
var err error
// clean up if setup fails due to port in use.
defer func() {
if err != nil {
if sys != nil {
sys.Close()
}
t.Logf("setupHAInfra took %s\n", time.Since(startTime))
}()
for _, c := range conductors {
if c == nil || c.service == nil {
// pass. Sometimes we can get nil in this map
} else if serr := c.service.Stop(ctx); serr != nil {
t.Log("Failed to stop conductor", "error", serr)
conductorsReady := map[string]chan string{
Sequencer1Name: make(chan string, 1),
Sequencer2Name: make(chan string, 1),
Sequencer3Name: make(chan string, 1),
}
// The sequencer op-node & execution engine have to be up first, to get their endpoints running.
// The conductor is then started after, using the endpoints of op-node and execution engine.
// The op-node, while starting, will wait for the conductor to be up and running, to get its endpoint.
// No endpoint is reserved/hardcoded this way, this avoids CI test flakes in the setup.
conductorEndpointFn := func(ctx context.Context, name string) (endpoint string, err error) {
endpointCh, ok := conductorsReady[name]
if !ok {
return "", errors.New("conductor %s is not known")
}
select {
case <-ctx.Done():
return "", fmt.Errorf("failed to set up conductor timely: %w", err)
case endpoint := <-endpointCh:
return endpoint, nil
}
t.Logf("setupHAInfra took %s\n", time.Since(startTime))
}()
conductorRpcPorts := map[string]int{
Sequencer1Name: findAvailablePort(t),
Sequencer2Name: findAvailablePort(t),
Sequencer3Name: findAvailablePort(t),
}
// 3 sequencers, 1 verifier, 1 active sequencer.
cfg := sequencerFailoverSystemConfig(t, conductorRpcPorts)
if sys, err = cfg.Start(t); err != nil {
return nil, nil, err
}
cfg := sequencerFailoverSystemConfig(t, conductorEndpointFn)
// 3 conductors that connects to 1 sequencer each.
conductors = make(map[string]*conductor)
// sys is configured to close itself on test cleanup.
sys, err := cfg.Start(t)
require.NoError(t, err, "must start system")
out := make(map[string]*conductor)
// 3 conductors that connects to 1 sequencer each.
// initialize all conductors in paused mode
conductorCfgs := []struct {
name string
port int
bootstrap bool
}{
{Sequencer1Name, conductorRpcPorts[Sequencer1Name], true}, // one in bootstrap mode so that we can form a cluster.
{Sequencer2Name, conductorRpcPorts[Sequencer2Name], false},
{Sequencer3Name, conductorRpcPorts[Sequencer3Name], false},
{Sequencer1Name, true}, // one in bootstrap mode so that we can form a cluster.
{Sequencer2Name, false},
{Sequencer3Name, false},
}
for _, cfg := range conductorCfgs {
cfg := cfg
nodePRC := sys.RollupNodes[cfg.name].UserRPC().RPC()
engineRPC := sys.EthInstances[cfg.name].UserRPC().RPC()
if conductors[cfg.name], err = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.port, cfg.bootstrap, *sys.RollupConfig); err != nil {
return nil, nil, err
}
conduc, err := setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.bootstrap, *sys.RollupConfig)
require.NoError(t, err, "failed to set up conductor %s", cfg.name)
out[cfg.name] = conduc
// Signal that the conductor RPC endpoint is ready
conductorsReady[cfg.name] <- conduc.RPCEndpoint()
}
return sys, conductors, nil
return sys, out
}
func setupConductor(
t *testing.T,
serverID, dir, nodeRPC, engineRPC string,
rpcPort int,
bootstrap bool,
rollupCfg rollup.Config,
) (*conductor, error) {
consensusPort := findAvailablePort(t)
cfg := con.Config{
ConsensusAddr: localhost,
ConsensusPort: consensusPort,
ConsensusPort: 0, // let the system select a port, avoid conflicts
ConsensusAdvertisedAddr: "", // use the local address we bind to
RaftServerID: serverID,
RaftStorageDir: dir,
RaftBootstrap: bootstrap,
......@@ -237,17 +232,18 @@ func setupConductor(
RollupCfg: rollupCfg,
RPCEnableProxy: true,
LogConfig: oplog.CLIConfig{
Level: log.LevelInfo,
Level: log.LevelDebug,
Color: false,
},
RPC: oprpc.CLIConfig{
ListenAddr: localhost,
ListenPort: rpcPort,
ListenPort: 0, // let the system select a port
},
}
logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background()
service, err := con.New(ctx, &cfg, testlog.Logger(t, log.LevelInfo), "0.0.1")
service, err := con.New(ctx, &cfg, logger, "0.0.1")
if err != nil {
return nil, err
}
......@@ -257,6 +253,8 @@ func setupConductor(
return nil, err
}
logger.Info("Started conductor", "nodeRPC", nodeRPC, "engineRPC", engineRPC)
rawClient, err := rpc.DialContext(ctx, service.HTTPEndpoint())
if err != nil {
return nil, err
......@@ -267,8 +265,6 @@ func setupConductor(
return &conductor{
service: service,
client: client,
consensusPort: consensusPort,
rpcPort: rpcPort,
}, nil
}
......@@ -316,12 +312,18 @@ func setupBatcher(t *testing.T, sys *e2esys.System, conductors map[string]*condu
sys.BatchSubmitter = batcher
}
func sequencerFailoverSystemConfig(t *testing.T, ports map[string]int) e2esys.SystemConfig {
func sequencerFailoverSystemConfig(t *testing.T, conductorRPCEndpoints func(ctx context.Context, name string) (string, error)) e2esys.SystemConfig {
cfg := e2esys.EcotoneSystemConfig(t, new(hexutil.Uint64))
delete(cfg.Nodes, "sequencer")
cfg.Nodes[Sequencer1Name] = sequencerCfg(ports[Sequencer1Name])
cfg.Nodes[Sequencer2Name] = sequencerCfg(ports[Sequencer2Name])
cfg.Nodes[Sequencer3Name] = sequencerCfg(ports[Sequencer3Name])
cfg.Nodes[Sequencer1Name] = sequencerCfg(func(ctx context.Context) (string, error) {
return conductorRPCEndpoints(ctx, Sequencer1Name)
})
cfg.Nodes[Sequencer2Name] = sequencerCfg(func(ctx context.Context) (string, error) {
return conductorRPCEndpoints(ctx, Sequencer2Name)
})
cfg.Nodes[Sequencer3Name] = sequencerCfg(func(ctx context.Context) (string, error) {
return conductorRPCEndpoints(ctx, Sequencer3Name)
})
delete(cfg.Loggers, "sequencer")
cfg.Loggers[Sequencer1Name] = testlog.Logger(t, log.LevelInfo).New("role", Sequencer1Name)
......@@ -338,7 +340,7 @@ func sequencerFailoverSystemConfig(t *testing.T, ports map[string]int) e2esys.Sy
return cfg
}
func sequencerCfg(rpcPort int) *rollupNode.Config {
func sequencerCfg(conductorRPCEndpoint rollupNode.ConductorRPCFunc) *rollupNode.Config {
return &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
......@@ -357,8 +359,8 @@ func sequencerCfg(rpcPort int) *rollupNode.Config {
ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
Sync: sync.Config{SyncMode: sync.CLSync},
ConductorEnabled: true,
ConductorRpc: fmt.Sprintf("http://%s:%d", localhost, rpcPort),
ConductorRpcTimeout: 1 * time.Second,
ConductorRpc: conductorRPCEndpoint,
ConductorRpcTimeout: 5 * time.Second,
}
}
......@@ -453,26 +455,6 @@ func sequencerActive(t *testing.T, ctx context.Context, rollupClient *sources.Ro
return active
}
func findAvailablePort(t *testing.T) int {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
t.Error("Failed to find available port")
default:
// private / ephemeral ports are in the range 49152-65535
port := rand.Intn(65535-49152) + 49152
addr := fmt.Sprintf("127.0.0.1:%d", port)
l, err := net.Listen("tcp", addr)
if err == nil {
l.Close() // Close the listener and return the port if it's available
return port
}
}
}
}
func findLeader(t *testing.T, conductors map[string]*conductor) (string, *conductor) {
for id, con := range conductors {
if leader(t, context.Background(), con) {
......
......@@ -103,7 +103,6 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) {
t, VerifierName, t.TempDir(),
sys.RollupEndpoint(Sequencer3Name).RPC(),
sys.NodeEndpoint(Sequencer3Name).RPC(),
findAvailablePort(t),
false,
*sys.RollupConfig,
)
......
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-service/retry"
)
......@@ -21,7 +22,8 @@ type ConductorClient struct {
cfg *Config
metrics *metrics.Metrics
log log.Logger
apiClient *conductorRpc.APIClient
apiClient locks.RWValue[*conductorRpc.APIClient]
// overrideLeader is used to override the leader check for disaster recovery purposes.
// During disaster situations where the cluster is unhealthy (no leader, only 1 or less nodes up),
......@@ -41,15 +43,23 @@ func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) c
}
// Initialize initializes the conductor client.
func (c *ConductorClient) initialize() error {
if c.apiClient != nil {
func (c *ConductorClient) initialize(ctx context.Context) error {
c.apiClient.Lock()
defer c.apiClient.Unlock()
if c.apiClient.Value != nil {
return nil
}
conductorRpcClient, err := dial.DialRPCClientWithTimeout(context.Background(), time.Minute*1, c.log, c.cfg.ConductorRpc)
endpoint, err := retry.Do[string](ctx, 10, retry.Exponential(), func() (string, error) {
return c.cfg.ConductorRpc(ctx)
})
if err != nil {
return fmt.Errorf("no conductor RPC endpoint available: %w", err)
}
conductorRpcClient, err := dial.DialRPCClientWithTimeout(context.Background(), time.Minute*1, c.log, endpoint)
if err != nil {
return fmt.Errorf("failed to dial conductor RPC: %w", err)
}
c.apiClient = conductorRpc.NewAPIClient(conductorRpcClient)
c.apiClient.Value = conductorRpc.NewAPIClient(conductorRpcClient)
return nil
}
......@@ -64,7 +74,7 @@ func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
return true, nil
}
if err := c.initialize(); err != nil {
if err := c.initialize(ctx); err != nil {
return false, err
}
ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout)
......@@ -72,8 +82,11 @@ func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
isLeader, err := retry.Do(ctx, 2, retry.Fixed(50*time.Millisecond), func() (bool, error) {
record := c.metrics.RecordRPCClientRequest("conductor_leader")
result, err := c.apiClient.Leader(ctx)
result, err := c.apiClient.Get().Leader(ctx)
record(err)
if err != nil {
c.log.Error("Failed to check conductor for leadership", "err", err)
}
return result, err
})
return isLeader, err
......@@ -85,7 +98,7 @@ func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.
return nil
}
if err := c.initialize(); err != nil {
if err := c.initialize(ctx); err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout)
......@@ -93,7 +106,7 @@ func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.
err := retry.Do0(ctx, 2, retry.Fixed(50*time.Millisecond), func() error {
record := c.metrics.RecordRPCClientRequest("conductor_commitUnsafePayload")
err := c.apiClient.CommitUnsafePayload(ctx, payload)
err := c.apiClient.Get().CommitUnsafePayload(ctx, payload)
record(err)
return err
})
......@@ -107,9 +120,11 @@ func (c *ConductorClient) OverrideLeader(ctx context.Context) error {
}
func (c *ConductorClient) Close() {
if c.apiClient == nil {
c.apiClient.Lock()
defer c.apiClient.Unlock()
if c.apiClient.Value == nil {
return
}
c.apiClient.Close()
c.apiClient = nil
c.apiClient.Value.Close()
c.apiClient.Value = nil
}
......@@ -69,13 +69,16 @@ type Config struct {
// Conductor is used to determine this node is the leader sequencer.
ConductorEnabled bool
ConductorRpc string
ConductorRpc ConductorRPCFunc
ConductorRpcTimeout time.Duration
// AltDA config
AltDA altda.CLIConfig
}
// ConductorRPCFunc retrieves the endpoint. The RPC may not immediately be available.
type ConductorRPCFunc func(ctx context.Context) (string, error)
type RPCConfig struct {
ListenAddr string
ListenPort int
......
......@@ -445,6 +445,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error {
if err := server.Start(); err != nil {
return fmt.Errorf("unable to start RPC server: %w", err)
}
n.log.Info("Started JSON-RPC server", "addr", server.Addr())
n.server = server
return nil
}
......
package opnode
import (
"context"
"crypto/rand"
"encoding/json"
"errors"
......@@ -80,7 +81,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
ctx.IsSet(flags.HeartbeatURLFlag.Name) {
log.Warn("Heartbeat functionality is not supported anymore, CLI flags will be removed in following release.")
}
conductorRPCEndpoint := ctx.String(flags.ConductorRpcFlag.Name)
cfg := &node.Config{
L1: l1Endpoint,
L2: l2Endpoint,
......@@ -109,7 +110,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
RollupHalt: haltOption,
ConductorEnabled: ctx.Bool(flags.ConductorEnabledFlag.Name),
ConductorRpc: ctx.String(flags.ConductorRpcFlag.Name),
ConductorRpc: func(context.Context) (string, error) {
return conductorRPCEndpoint, nil
},
ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name),
AltDA: altda.ReadCLIConfig(ctx),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment