Commit 3c69e8a1 authored by Francis Li's avatar Francis Li Committed by GitHub

Add retry to avoid port in use error (#10062)

parent a3cc8f27
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/retry"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -37,8 +38,12 @@ const ( ...@@ -37,8 +38,12 @@ const (
VerifierName = "verifier" VerifierName = "verifier"
localhost = "127.0.0.1" localhost = "127.0.0.1"
maxSetupRetries = 5
) )
var retryStrategy = &retry.FixedStrategy{Dur: 50 * time.Millisecond}
type conductor struct { type conductor struct {
service *con.OpConductor service *con.OpConductor
client conrpc.API client conrpc.API
...@@ -58,36 +63,10 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { ...@@ -58,36 +63,10 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
InitParallel(t) InitParallel(t)
ctx := context.Background() ctx := context.Background()
conductorRpcPorts := map[string]int{ sys, conductors, err := retry.Do2(ctx, maxSetupRetries, retryStrategy, func() (*System, map[string]*conductor, error) {
Sequencer1Name: findAvailablePort(t), return setupHAInfra(t, ctx)
Sequencer2Name: findAvailablePort(t), })
Sequencer3Name: findAvailablePort(t), require.NoError(t, err, "Expected to successfully setup sequencers and conductors after retry")
}
// 3 sequencers, 1 verifier, 1 active sequencer.
cfg := sequencerFailoverSystemConfig(t, conductorRpcPorts)
sys, err := cfg.Start(t)
require.NoError(t, err)
// 3 conductors that connects to 1 sequencer each.
conductors := make(map[string]*conductor)
// initialize all conductors in paused mode
conductorCfgs := []struct {
name string
port int
bootstrap bool
}{
{Sequencer1Name, conductorRpcPorts[Sequencer1Name], true}, // one in bootstrap mode so that we can form a cluster.
{Sequencer2Name, conductorRpcPorts[Sequencer2Name], false},
{Sequencer3Name, conductorRpcPorts[Sequencer3Name], false},
}
for _, cfg := range conductorCfgs {
cfg := cfg
nodePRC := sys.RollupNodes[cfg.name].HTTPEndpoint()
engineRPC := sys.EthInstances[cfg.name].HTTPEndpoint()
conductors[cfg.name] = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.port, cfg.bootstrap, *sys.RollupConfig)
}
// form a cluster // form a cluster
c1 := conductors[Sequencer1Name] c1 := conductors[Sequencer1Name]
...@@ -147,15 +126,73 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { ...@@ -147,15 +126,73 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
return sys, conductors return sys, conductors
} }
func setupHAInfra(t *testing.T, ctx context.Context) (*System, map[string]*conductor, error) {
startTime := time.Now()
var sys *System
var conductors map[string]*conductor
var err error
// clean up if setup fails due to port in use.
defer func() {
if err != nil {
if sys != nil {
sys.Close()
}
for _, c := range conductors {
if serr := c.service.Stop(ctx); serr != nil {
t.Log("Failed to stop conductor", "error", serr)
}
}
}
t.Logf("setupHAInfra took %s\n", time.Since(startTime))
}()
conductorRpcPorts := map[string]int{
Sequencer1Name: findAvailablePort(t),
Sequencer2Name: findAvailablePort(t),
Sequencer3Name: findAvailablePort(t),
}
// 3 sequencers, 1 verifier, 1 active sequencer.
cfg := sequencerFailoverSystemConfig(t, conductorRpcPorts)
if sys, err = cfg.Start(t); err != nil {
return nil, nil, err
}
// 3 conductors that connects to 1 sequencer each.
conductors = make(map[string]*conductor)
// initialize all conductors in paused mode
conductorCfgs := []struct {
name string
port int
bootstrap bool
}{
{Sequencer1Name, conductorRpcPorts[Sequencer1Name], true}, // one in bootstrap mode so that we can form a cluster.
{Sequencer2Name, conductorRpcPorts[Sequencer2Name], false},
{Sequencer3Name, conductorRpcPorts[Sequencer3Name], false},
}
for _, cfg := range conductorCfgs {
cfg := cfg
nodePRC := sys.RollupNodes[cfg.name].HTTPEndpoint()
engineRPC := sys.EthInstances[cfg.name].HTTPEndpoint()
if conductors[cfg.name], err = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.port, cfg.bootstrap, *sys.RollupConfig); err != nil {
return nil, nil, err
}
}
return sys, conductors, nil
}
func setupConductor( func setupConductor(
t *testing.T, t *testing.T,
serverID, dir, nodeRPC, engineRPC string, serverID, dir, nodeRPC, engineRPC string,
rpcPort int, rpcPort int,
bootstrap bool, bootstrap bool,
rollupCfg rollup.Config, rollupCfg rollup.Config,
) *conductor { ) (*conductor, error) {
// it's unfortunate that it is not possible to pass 0 as consensus port and get back the actual assigned port from raft implementation.
// So we find an available port and pass it in to avoid test flakiness (avoid port already in use error).
consensusPort := findAvailablePort(t) consensusPort := findAvailablePort(t)
cfg := con.Config{ cfg := con.Config{
ConsensusAddr: localhost, ConsensusAddr: localhost,
...@@ -189,12 +226,19 @@ func setupConductor( ...@@ -189,12 +226,19 @@ func setupConductor(
ctx := context.Background() ctx := context.Background()
service, err := con.New(ctx, &cfg, testlog.Logger(t, log.LevelInfo), "0.0.1") service, err := con.New(ctx, &cfg, testlog.Logger(t, log.LevelInfo), "0.0.1")
require.NoError(t, err) if err != nil {
return nil, err
}
err = service.Start(ctx) err = service.Start(ctx)
require.NoError(t, err) if err != nil {
return nil, err
}
rawClient, err := rpc.DialContext(ctx, service.HTTPEndpoint()) rawClient, err := rpc.DialContext(ctx, service.HTTPEndpoint())
require.NoError(t, err) if err != nil {
return nil, err
}
client := conrpc.NewAPIClient(rawClient) client := conrpc.NewAPIClient(rawClient)
return &conductor{ return &conductor{
...@@ -202,7 +246,7 @@ func setupConductor( ...@@ -202,7 +246,7 @@ func setupConductor(
client: client, client: client,
consensusPort: consensusPort, consensusPort: consensusPort,
rpcPort: rpcPort, rpcPort: rpcPort,
} }, nil
} }
func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-conductor/consensus" "github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-service/retry"
) )
// [Category: Initial Setup] // [Category: Initial Setup]
...@@ -95,7 +96,8 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) { ...@@ -95,7 +96,8 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) {
// Test AddServerAsNonvoter, do not start a new sequencer just for this purpose, use Sequencer3's rpc to start conductor. // Test AddServerAsNonvoter, do not start a new sequencer just for this purpose, use Sequencer3's rpc to start conductor.
// This is fine as this mainly tests conductor's ability to add itself into the raft consensus cluster as a nonvoter. // This is fine as this mainly tests conductor's ability to add itself into the raft consensus cluster as a nonvoter.
t.Log("Testing AddServerAsNonvoter") t.Log("Testing AddServerAsNonvoter")
nonvoter := setupConductor( nonvoter, err := retry.Do[*conductor](ctx, maxSetupRetries, retryStrategy, func() (*conductor, error) {
return setupConductor(
t, VerifierName, t.TempDir(), t, VerifierName, t.TempDir(),
sys.RollupEndpoint(Sequencer3Name), sys.RollupEndpoint(Sequencer3Name),
sys.NodeEndpoint(Sequencer3Name), sys.NodeEndpoint(Sequencer3Name),
...@@ -103,6 +105,8 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) { ...@@ -103,6 +105,8 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) {
false, false,
*sys.RollupConfig, *sys.RollupConfig,
) )
})
require.NoError(t, err)
err = leader.client.AddServerAsNonvoter(ctx, VerifierName, nonvoter.ConsensusEndpoint()) err = leader.client.AddServerAsNonvoter(ctx, VerifierName, nonvoter.ConsensusEndpoint())
require.NoError(t, err, "Expected leader to add non-voter") require.NoError(t, err, "Expected leader to add non-voter")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment