Commit 5e0dc463 authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] fix flaky e2e tests (#9144)

* op-conductor: fix ha test flakiness

* 3rd try

* 4th try

* 1st try with everything ready

* 2nd try

* 3rd try

* 4th try
parent 73f92f90
...@@ -116,6 +116,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { ...@@ -116,6 +116,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
Paused: ctx.Bool(flags.Paused.Name), Paused: ctx.Bool(flags.Paused.Name),
HealthCheck: HealthCheckConfig{ HealthCheck: HealthCheckConfig{
Interval: ctx.Uint64(flags.HealthCheckInterval.Name), Interval: ctx.Uint64(flags.HealthCheckInterval.Name),
UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name),
SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name), SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name),
MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name), MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name),
}, },
...@@ -133,6 +134,9 @@ type HealthCheckConfig struct { ...@@ -133,6 +134,9 @@ type HealthCheckConfig struct {
// Interval is the interval (in seconds) to check the health of the sequencer. // Interval is the interval (in seconds) to check the health of the sequencer.
Interval uint64 Interval uint64
// UnsafeInterval is the interval allowed between unsafe head and now in seconds.
UnsafeInterval uint64
// SafeInterval is the interval between safe head progression measured in seconds. // SafeInterval is the interval between safe head progression measured in seconds.
SafeInterval uint64 SafeInterval uint64
......
...@@ -175,6 +175,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error { ...@@ -175,6 +175,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
c.hmon = health.NewSequencerHealthMonitor( c.hmon = health.NewSequencerHealthMonitor(
c.log, c.log,
c.cfg.HealthCheck.Interval, c.cfg.HealthCheck.Interval,
c.cfg.HealthCheck.UnsafeInterval,
c.cfg.HealthCheck.SafeInterval, c.cfg.HealthCheck.SafeInterval,
c.cfg.HealthCheck.MinPeerCount, c.cfg.HealthCheck.MinPeerCount,
&c.cfg.RollupCfg, &c.cfg.RollupCfg,
...@@ -220,6 +221,12 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error { ...@@ -220,6 +221,12 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error {
Namespace: conductorrpc.NodeRPCNamespace, Namespace: conductorrpc.NodeRPCNamespace,
Service: nodeProxy, Service: nodeProxy,
}) })
nodeAdminProxy := conductorrpc.NewNodeAdminProxyBackend(oc.log, oc, nodeClient)
server.AddAPI(rpc.API{
Namespace: conductorrpc.NodeAdminRPCNamespace,
Service: nodeAdminProxy,
})
} }
oc.rpcServer = server oc.rpcServer = server
...@@ -572,7 +579,6 @@ func (oc *OpConductor) startSequencer() error { ...@@ -572,7 +579,6 @@ func (oc *OpConductor) startSequencer() error {
return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase") return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase")
} }
//if unsafeInCons.BlockHash != unsafeInNode.Hash() {
if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() { if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() {
oc.log.Warn( oc.log.Warn(
"latest unsafe block in consensus is not the same as the one in op-node", "latest unsafe block in consensus is not the same as the one in op-node",
......
...@@ -37,6 +37,7 @@ func mockConfig(t *testing.T) Config { ...@@ -37,6 +37,7 @@ func mockConfig(t *testing.T) Config {
Paused: false, Paused: false,
HealthCheck: HealthCheckConfig{ HealthCheck: HealthCheckConfig{
Interval: 1, Interval: 1,
UnsafeInterval: 3,
SafeInterval: 5, SafeInterval: 5,
MinPeerCount: 1, MinPeerCount: 1,
}, },
......
...@@ -53,6 +53,11 @@ var ( ...@@ -53,6 +53,11 @@ var (
Usage: "Interval between health checks", Usage: "Interval between health checks",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_INTERVAL"), EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_INTERVAL"),
} }
HealthCheckUnsafeInterval = &cli.Uint64Flag{
Name: "healthcheck.unsafe-interval",
Usage: "Interval allowed between unsafe head and now measured in seconds",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_UNSAFE_INTERVAL"),
}
HealthCheckSafeInterval = &cli.Uint64Flag{ HealthCheckSafeInterval = &cli.Uint64Flag{
Name: "healthcheck.safe-interval", Name: "healthcheck.safe-interval",
Usage: "Interval between safe head progression measured in seconds", Usage: "Interval between safe head progression measured in seconds",
......
...@@ -28,13 +28,14 @@ type HealthMonitor interface { ...@@ -28,13 +28,14 @@ type HealthMonitor interface {
// interval is the interval between health checks measured in seconds. // interval is the interval between health checks measured in seconds.
// safeInterval is the interval between safe head progress measured in seconds. // safeInterval is the interval between safe head progress measured in seconds.
// minPeerCount is the minimum number of peers required for the sequencer to be healthy. // minPeerCount is the minimum number of peers required for the sequencer to be healthy.
func NewSequencerHealthMonitor(log log.Logger, interval, safeInterval, minPeerCount uint64, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor { func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInterval, minPeerCount uint64, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor {
return &SequencerHealthMonitor{ return &SequencerHealthMonitor{
log: log, log: log,
done: make(chan struct{}), done: make(chan struct{}),
interval: interval, interval: interval,
healthUpdateCh: make(chan bool), healthUpdateCh: make(chan bool),
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
unsafeInterval: unsafeInterval,
safeInterval: safeInterval, safeInterval: safeInterval,
minPeerCount: minPeerCount, minPeerCount: minPeerCount,
node: node, node: node,
...@@ -49,10 +50,13 @@ type SequencerHealthMonitor struct { ...@@ -49,10 +50,13 @@ type SequencerHealthMonitor struct {
wg sync.WaitGroup wg sync.WaitGroup
rollupCfg *rollup.Config rollupCfg *rollup.Config
unsafeInterval uint64
safeInterval uint64 safeInterval uint64
minPeerCount uint64 minPeerCount uint64
interval uint64 interval uint64
healthUpdateCh chan bool healthUpdateCh chan bool
lastSeenUnsafeNum uint64
lastSeenUnsafeTime uint64
node dial.RollupClientInterface node dial.RollupClientInterface
p2p p2p.API p2p p2p.API
...@@ -104,8 +108,9 @@ func (hm *SequencerHealthMonitor) loop() { ...@@ -104,8 +108,9 @@ func (hm *SequencerHealthMonitor) loop() {
// healthCheck checks the health of the sequencer by 3 criteria: // healthCheck checks the health of the sequencer by 3 criteria:
// 1. unsafe head is progressing per block time // 1. unsafe head is progressing per block time
// 2. safe head is progressing every configured batch submission interval // 2. unsafe head is not too far behind now (measured by unsafeInterval)
// 3. peer count is above the configured minimum // 3. safe head is progressing every configured batch submission interval
// 4. peer count is above the configured minimum
func (hm *SequencerHealthMonitor) healthCheck() bool { func (hm *SequencerHealthMonitor) healthCheck() bool {
ctx := context.Background() ctx := context.Background()
status, err := hm.node.SyncStatus(ctx) status, err := hm.node.SyncStatus(ctx)
...@@ -115,14 +120,48 @@ func (hm *SequencerHealthMonitor) healthCheck() bool { ...@@ -115,14 +120,48 @@ func (hm *SequencerHealthMonitor) healthCheck() bool {
} }
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
// allow at most one block drift for unsafe head
if now-status.UnsafeL2.Time > hm.interval+hm.rollupCfg.BlockTime { if hm.lastSeenUnsafeNum != 0 {
hm.log.Error("unsafe head is not progressing", "lastSeenUnsafeBlock", status.UnsafeL2) diff := now - hm.lastSeenUnsafeTime
// how many blocks do we expect to see, minus 1 to account for edge case with respect to time.
// for example, if diff = 2.001s and block time = 2s, expecting to see 1 block could potentially cause sequencer to be considered unhealthy.
blocks := diff/hm.rollupCfg.BlockTime - 1
if diff > hm.rollupCfg.BlockTime && blocks > status.UnsafeL2.Number-hm.lastSeenUnsafeNum {
hm.log.Error(
"unsafe head is not progressing as expected",
"now", now,
"unsafe_head_num", status.UnsafeL2.Number,
"last_seen_unsafe_num", hm.lastSeenUnsafeNum,
"last_seen_unsafe_time", hm.lastSeenUnsafeTime,
"unsafe_interval", hm.unsafeInterval,
)
return false
}
}
if status.UnsafeL2.Number > hm.lastSeenUnsafeNum {
hm.lastSeenUnsafeNum = status.UnsafeL2.Number
hm.lastSeenUnsafeTime = now
}
if now-status.UnsafeL2.Time > hm.unsafeInterval {
hm.log.Error(
"unsafe head is not progressing as expected",
"now", now,
"unsafe_head_num", status.UnsafeL2.Number,
"unsafe_head_time", status.UnsafeL2.Time,
"unsafe_interval", hm.unsafeInterval,
)
return false return false
} }
if now-status.SafeL2.Time > hm.safeInterval { if now-status.SafeL2.Time > hm.safeInterval {
hm.log.Error("safe head is not progressing", "safe_head_time", status.SafeL2.Time, "now", now) hm.log.Error(
"safe head is not progressing as expected",
"now", now,
"safe_head_num", status.SafeL2.Number,
"safe_head_time", status.SafeL2.Time,
"safe_interval", hm.safeInterval,
)
return false return false
} }
......
...@@ -30,6 +30,7 @@ type HealthMonitorTestSuite struct { ...@@ -30,6 +30,7 @@ type HealthMonitorTestSuite struct {
rc *testutils.MockRollupClient rc *testutils.MockRollupClient
pc *p2pMocks.API pc *p2pMocks.API
interval uint64 interval uint64
unsafeInterval uint64
safeInterval uint64 safeInterval uint64
minPeerCount uint64 minPeerCount uint64
rollupCfg *rollup.Config rollupCfg *rollup.Config
...@@ -41,6 +42,7 @@ func (s *HealthMonitorTestSuite) SetupSuite() { ...@@ -41,6 +42,7 @@ func (s *HealthMonitorTestSuite) SetupSuite() {
s.rc = &testutils.MockRollupClient{} s.rc = &testutils.MockRollupClient{}
s.pc = &p2pMocks.API{} s.pc = &p2pMocks.API{}
s.interval = 1 s.interval = 1
s.unsafeInterval = 3
s.safeInterval = 5 s.safeInterval = 5
s.minPeerCount = minPeerCount s.minPeerCount = minPeerCount
s.rollupCfg = &rollup.Config{ s.rollupCfg = &rollup.Config{
...@@ -49,7 +51,7 @@ func (s *HealthMonitorTestSuite) SetupSuite() { ...@@ -49,7 +51,7 @@ func (s *HealthMonitorTestSuite) SetupSuite() {
} }
func (s *HealthMonitorTestSuite) SetupTest() { func (s *HealthMonitorTestSuite) SetupTest() {
s.monitor = NewSequencerHealthMonitor(s.log, s.interval, s.safeInterval, s.minPeerCount, s.rollupCfg, s.rc, s.pc) s.monitor = NewSequencerHealthMonitor(s.log, s.interval, s.unsafeInterval, s.safeInterval, s.minPeerCount, s.rollupCfg, s.rc, s.pc)
err := s.monitor.Start() err := s.monitor.Start()
s.NoError(err) s.NoError(err)
} }
...@@ -90,9 +92,11 @@ func (s *HealthMonitorTestSuite) TestUnhealthyUnsafeHeadNotProgressing() { ...@@ -90,9 +92,11 @@ func (s *HealthMonitorTestSuite) TestUnhealthyUnsafeHeadNotProgressing() {
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
ss1 := &eth.SyncStatus{ ss1 := &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{ UnsafeL2: eth.L2BlockRef{
Number: 5,
Time: now - 1, Time: now - 1,
}, },
SafeL2: eth.L2BlockRef{ SafeL2: eth.L2BlockRef{
Number: 1,
Time: now - 2, Time: now - 2,
}, },
} }
......
...@@ -3,15 +3,14 @@ package rpc ...@@ -3,15 +3,14 @@ package rpc
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
) )
var ( var ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer")
ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer")
)
type ServerInfo struct { type ServerInfo struct {
ID string `json:"id"` ID string `json:"id"`
...@@ -53,13 +52,19 @@ type API interface { ...@@ -53,13 +52,19 @@ type API interface {
// ExecutionProxyAPI defines the methods proxied to the execution rpc backend // ExecutionProxyAPI defines the methods proxied to the execution rpc backend
// This should include all methods that are called by op-batcher or op-proposer // This should include all methods that are called by op-batcher or op-proposer
type ExecutionProxyAPI interface { type ExecutionProxyAPI interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error)
} }
// NodeProxyAPI defines the methods proxied to the node rpc backend // NodeProxyAPI defines the methods proxied to the node rpc backend
// This should include all methods that are called by op-batcher or op-proposer // This should include all methods that are called by op-batcher or op-proposer
type NodeProxyAPI interface { type NodeProxyAPI interface {
OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error)
SequencerActive(ctx context.Context) (bool, error)
SyncStatus(ctx context.Context) (*eth.SyncStatus, error) SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
RollupConfig(ctx context.Context) (*rollup.Config, error)
}
// NodeProxyAPI defines the methods proxied to the node rpc backend
// This should include all methods that are called by op-batcher or op-proposer
type NodeAdminProxyAPI interface {
SequencerActive(ctx context.Context) (bool, error)
} }
...@@ -2,11 +2,10 @@ package rpc ...@@ -2,11 +2,10 @@ package rpc
import ( import (
"context" "context"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
) )
var ExecutionRPCNamespace = "eth" var ExecutionRPCNamespace = "eth"
...@@ -28,13 +27,14 @@ func NewExecutionProxyBackend(log log.Logger, con conductor, client *ethclient.C ...@@ -28,13 +27,14 @@ func NewExecutionProxyBackend(log log.Logger, con conductor, client *ethclient.C
} }
} }
func (api *ExecutionProxyBackend) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { func (api *ExecutionProxyBackend) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) {
block, err := api.client.BlockByNumber(ctx, number) var result map[string]interface{}
err := api.client.Client().Call(&result, "eth_getBlockByNumber", number, fullTx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !api.con.Leader(ctx) { if !api.con.Leader(ctx) {
return nil, ErrNotLeader return nil, ErrNotLeader
} }
return block, nil return result, nil
} }
package rpc
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
var NodeAdminRPCNamespace = "admin"
// NodeAdminProxyAPI implements a node admin rpc proxy with a leadership check to make sure only leader returns the result.
type NodeAdminProxyBackend struct {
log log.Logger
con conductor
client *sources.RollupClient
}
var _ NodeAdminProxyAPI = (*NodeAdminProxyBackend)(nil)
// NewNodeAdminProxyBackend creates a new NodeAdminProxyBackend instance.
func NewNodeAdminProxyBackend(log log.Logger, con conductor, client *sources.RollupClient) NodeAdminProxyAPI {
return &NodeAdminProxyBackend{
log: log,
con: con,
client: client,
}
}
func (api *NodeAdminProxyBackend) SequencerActive(ctx context.Context) (bool, error) {
active, err := api.client.SequencerActive(ctx)
if err != nil {
return false, err
}
if !api.con.Leader(ctx) {
return false, ErrNotLeader
}
return active, err
}
...@@ -3,9 +3,11 @@ package rpc ...@@ -3,9 +3,11 @@ package rpc
import ( import (
"context" "context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/log"
) )
var NodeRPCNamespace = "optimism" var NodeRPCNamespace = "optimism"
...@@ -49,13 +51,13 @@ func (api *NodeProxyBackend) OutputAtBlock(ctx context.Context, blockNum uint64) ...@@ -49,13 +51,13 @@ func (api *NodeProxyBackend) OutputAtBlock(ctx context.Context, blockNum uint64)
return output, nil return output, nil
} }
func (api *NodeProxyBackend) SequencerActive(ctx context.Context) (bool, error) { func (api *NodeProxyBackend) RollupConfig(ctx context.Context) (*rollup.Config, error) {
active, err := api.client.SequencerActive(ctx) config, err := api.client.RollupConfig(ctx)
if err != nil { if err != nil {
return false, err return nil, err
} }
if !api.con.Leader(ctx) { if !api.con.Leader(ctx) {
return false, ErrNotLeader return nil, ErrNotLeader
} }
return active, err return config, nil
} }
...@@ -32,10 +32,10 @@ import ( ...@@ -32,10 +32,10 @@ import (
) )
const ( const (
sequencer1Name = "sequencer1" Sequencer1Name = "sequencer1"
sequencer2Name = "sequencer2" Sequencer2Name = "sequencer2"
sequencer3Name = "sequencer3" Sequencer3Name = "sequencer3"
verifierName = "verifier" VerifierName = "verifier"
localhost = "127.0.0.1" localhost = "127.0.0.1"
) )
...@@ -52,15 +52,21 @@ func (c *conductor) ConsensusEndpoint() string { ...@@ -52,15 +52,21 @@ func (c *conductor) ConsensusEndpoint() string {
} }
func (c *conductor) RPCEndpoint() string { func (c *conductor) RPCEndpoint() string {
return fmt.Sprintf("%s:%d", localhost, c.rpcPort) return fmt.Sprintf("http://%s:%d", localhost, c.rpcPort)
} }
func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
InitParallel(t) InitParallel(t)
ctx := context.Background() ctx := context.Background()
conductorRpcPorts := map[string]int{
Sequencer1Name: findAvailablePort(t),
Sequencer2Name: findAvailablePort(t),
Sequencer3Name: findAvailablePort(t),
}
// 3 sequencers, 1 verifier, 1 active sequencer. // 3 sequencers, 1 verifier, 1 active sequencer.
cfg := sequencerFailoverSystemConfig(t) cfg := sequencerFailoverSystemConfig(t, conductorRpcPorts)
sys, err := cfg.Start(t) sys, err := cfg.Start(t)
require.NoError(t, err) require.NoError(t, err)
...@@ -70,45 +76,54 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { ...@@ -70,45 +76,54 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
// initialize all conductors in paused mode // initialize all conductors in paused mode
conductorCfgs := []struct { conductorCfgs := []struct {
name string name string
port int
bootstrap bool bootstrap bool
}{ }{
{sequencer1Name, true}, // one in bootstrap mode so that we can form a cluster. {Sequencer1Name, conductorRpcPorts[Sequencer1Name], true}, // one in bootstrap mode so that we can form a cluster.
{sequencer2Name, false}, {Sequencer2Name, conductorRpcPorts[Sequencer2Name], false},
{sequencer3Name, false}, {Sequencer3Name, conductorRpcPorts[Sequencer3Name], false},
} }
for _, cfg := range conductorCfgs { for _, cfg := range conductorCfgs {
cfg := cfg cfg := cfg
nodePRC := sys.RollupNodes[cfg.name].HTTPEndpoint() nodePRC := sys.RollupNodes[cfg.name].HTTPEndpoint()
engineRPC := sys.EthInstances[cfg.name].HTTPEndpoint() engineRPC := sys.EthInstances[cfg.name].HTTPEndpoint()
conductors[cfg.name] = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.bootstrap, *sys.RollupConfig) conductors[cfg.name] = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.port, cfg.bootstrap, *sys.RollupConfig)
} }
// 1 batcher that listens to all 3 sequencers, in started mode.
setupBatcher(t, sys, conductors)
// form a cluster // form a cluster
c1 := conductors[sequencer1Name] c1 := conductors[Sequencer1Name]
c2 := conductors[sequencer2Name] c2 := conductors[Sequencer2Name]
c3 := conductors[sequencer3Name] c3 := conductors[Sequencer3Name]
require.NoError(t, waitForLeadershipChange(t, c1, true)) require.NoError(t, waitForLeadershipChange(t, c1, true))
require.NoError(t, c1.client.AddServerAsVoter(ctx, sequencer2Name, c2.ConsensusEndpoint())) require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer2Name, c2.ConsensusEndpoint()))
require.NoError(t, c1.client.AddServerAsVoter(ctx, sequencer3Name, c3.ConsensusEndpoint())) require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer3Name, c3.ConsensusEndpoint()))
require.True(t, leader(t, ctx, c1)) require.True(t, leader(t, ctx, c1))
require.False(t, leader(t, ctx, c2)) require.False(t, leader(t, ctx, c2))
require.False(t, leader(t, ctx, c3)) require.False(t, leader(t, ctx, c3))
// start sequencing on leader
lid, _ := findLeader(t, conductors)
unsafeHead, err := sys.Clients[lid].BlockByNumber(ctx, nil)
require.NoError(t, err)
require.Equal(t, uint64(0), unsafeHead.NumberU64())
require.NoError(t, sys.RollupClient(lid).StartSequencer(ctx, unsafeHead.Hash()))
// 1 batcher that listens to all 3 sequencers, in started mode.
setupBatcher(t, sys, conductors)
// weirdly, batcher does not submit a batch until unsafe block 9. // weirdly, batcher does not submit a batch until unsafe block 9.
// It became normal after that and submits a batch every L1 block (2s) per configuration. // It became normal after that and submits a batch every L1 block (2s) per configuration.
// Since our health monitor checks on safe head progression, wait for batcher to become normal before proceeding. // Since our health monitor checks on safe head progression, wait for batcher to become normal before proceeding.
require.NoError(t, wait.ForNextSafeBlock(ctx, sys.Clients[sequencer1Name])) require.NoError(t, wait.ForNextSafeBlock(ctx, sys.Clients[Sequencer1Name]))
require.NoError(t, wait.ForNextSafeBlock(ctx, sys.Clients[sequencer1Name])) require.NoError(t, wait.ForNextSafeBlock(ctx, sys.Clients[Sequencer1Name]))
require.NoError(t, wait.ForNextSafeBlock(ctx, sys.Clients[sequencer1Name]))
// make sure conductor reports all sequencers as healthy, this means they're syncing correctly. // make sure conductor reports all sequencers as healthy, this means they're syncing correctly.
require.True(t, healthy(t, ctx, c1)) require.Eventually(t, func() bool {
require.True(t, healthy(t, ctx, c2)) return healthy(t, ctx, c1) &&
require.True(t, healthy(t, ctx, c3)) healthy(t, ctx, c2) &&
healthy(t, ctx, c3)
}, 30*time.Second, 500*time.Millisecond, "Expected sequencers to become healthy")
// unpause all conductors // unpause all conductors
require.NoError(t, c1.client.Resume(ctx)) require.NoError(t, c1.client.Resume(ctx))
...@@ -120,9 +135,9 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { ...@@ -120,9 +135,9 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
require.True(t, conductorActive(t, ctx, c2)) require.True(t, conductorActive(t, ctx, c2))
require.True(t, conductorActive(t, ctx, c3)) require.True(t, conductorActive(t, ctx, c3))
require.True(t, sequencerActive(t, ctx, sys.RollupClient(sequencer1Name))) require.True(t, sequencerActive(t, ctx, sys.RollupClient(Sequencer1Name)))
require.False(t, sequencerActive(t, ctx, sys.RollupClient(sequencer2Name))) require.False(t, sequencerActive(t, ctx, sys.RollupClient(Sequencer2Name)))
require.False(t, sequencerActive(t, ctx, sys.RollupClient(sequencer3Name))) require.False(t, sequencerActive(t, ctx, sys.RollupClient(Sequencer3Name)))
require.True(t, healthy(t, ctx, c1)) require.True(t, healthy(t, ctx, c1))
require.True(t, healthy(t, ctx, c2)) require.True(t, healthy(t, ctx, c2))
...@@ -134,13 +149,13 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { ...@@ -134,13 +149,13 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
func setupConductor( func setupConductor(
t *testing.T, t *testing.T,
serverID, dir, nodeRPC, engineRPC string, serverID, dir, nodeRPC, engineRPC string,
rpcPort int,
bootstrap bool, bootstrap bool,
rollupCfg rollup.Config, rollupCfg rollup.Config,
) *conductor { ) *conductor {
// it's unfortunate that it is not possible to pass 0 as consensus port and get back the actual assigned port from raft implementation. // it's unfortunate that it is not possible to pass 0 as consensus port and get back the actual assigned port from raft implementation.
// So we find an available port and pass it in to avoid test flakiness (avoid port already in use error). // So we find an available port and pass it in to avoid test flakiness (avoid port already in use error).
consensusPort := findAvailablePort(t) consensusPort := findAvailablePort(t)
rpcPort := findAvailablePort(t)
cfg := con.Config{ cfg := con.Config{
ConsensusAddr: localhost, ConsensusAddr: localhost,
ConsensusPort: consensusPort, ConsensusPort: consensusPort,
...@@ -152,8 +167,12 @@ func setupConductor( ...@@ -152,8 +167,12 @@ func setupConductor(
Paused: true, Paused: true,
HealthCheck: con.HealthCheckConfig{ HealthCheck: con.HealthCheckConfig{
Interval: 1, // per test setup, l2 block time is 1s. Interval: 1, // per test setup, l2 block time is 1s.
SafeInterval: 4, // per test setup (l1 block time = 2s, max channel duration = 1, 2s buffer)
MinPeerCount: 2, // per test setup, each sequencer has 2 peers MinPeerCount: 2, // per test setup, each sequencer has 2 peers
// CI is unstable in terms of the delay between now and the head time
// so we set the unsafe interval to 30s to avoid flakiness.
// This is fine because there's a progression check within health monitor to check progression.
UnsafeInterval: 30,
SafeInterval: 30,
}, },
RollupCfg: rollupCfg, RollupCfg: rollupCfg,
RPCEnableProxy: true, RPCEnableProxy: true,
...@@ -186,25 +205,18 @@ func setupConductor( ...@@ -186,25 +205,18 @@ func setupConductor(
} }
func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
var batchType uint = derive.SingularBatchType
if sys.Cfg.DeployConfig.L2GenesisDeltaTimeOffset != nil && *sys.Cfg.DeployConfig.L2GenesisDeltaTimeOffset == hexutil.Uint64(0) {
batchType = derive.SpanBatchType
}
batcherMaxL1TxSizeBytes := sys.Cfg.BatcherMaxL1TxSizeBytes
if batcherMaxL1TxSizeBytes == 0 {
batcherMaxL1TxSizeBytes = 240_000
}
// enable active sequencer follow mode. // enable active sequencer follow mode.
// in sequencer HA, all batcher / proposer requests will be proxied by conductor so that we can make sure
// that requests are always handled by leader.
l2EthRpc := strings.Join([]string{ l2EthRpc := strings.Join([]string{
conductors[sequencer1Name].RPCEndpoint(), conductors[Sequencer1Name].RPCEndpoint(),
conductors[sequencer2Name].RPCEndpoint(), conductors[Sequencer2Name].RPCEndpoint(),
conductors[sequencer3Name].RPCEndpoint(), conductors[Sequencer3Name].RPCEndpoint(),
}, ",") }, ",")
rollupRpc := strings.Join([]string{ rollupRpc := strings.Join([]string{
conductors[sequencer1Name].RPCEndpoint(), conductors[Sequencer1Name].RPCEndpoint(),
conductors[sequencer2Name].RPCEndpoint(), conductors[Sequencer2Name].RPCEndpoint(),
conductors[sequencer3Name].RPCEndpoint(), conductors[Sequencer3Name].RPCEndpoint(),
}, ",") }, ",")
batcherCLIConfig := &bss.CLIConfig{ batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(), L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
...@@ -212,21 +224,21 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { ...@@ -212,21 +224,21 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
RollupRpc: rollupRpc, RollupRpc: rollupRpc,
MaxPendingTransactions: 0, MaxPendingTransactions: 0,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: batcherMaxL1TxSizeBytes, MaxL1TxSize: 240_000,
CompressorConfig: compressor.CLIConfig{ CompressorConfig: compressor.CLIConfig{
TargetL1TxSizeBytes: sys.Cfg.BatcherTargetL1TxSizeBytes, TargetL1TxSizeBytes: sys.Cfg.BatcherTargetL1TxSizeBytes,
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 0.4, ApproxComprRatio: 0.4,
}, },
SubSafetyMargin: 0, SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond, PollInterval: 1 * time.Second,
TxMgrConfig: newTxMgrConfig(sys.EthInstances["l1"].WSEndpoint(), sys.Cfg.Secrets.Batcher), TxMgrConfig: newTxMgrConfig(sys.EthInstances["l1"].WSEndpoint(), sys.Cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: log.LvlInfo, Level: log.LvlDebug,
Format: oplog.FormatText, Format: oplog.FormatText,
}, },
Stopped: false, Stopped: false,
BatchType: batchType, BatchType: derive.SpanBatchType,
DataAvailabilityType: batcherFlags.CalldataType, DataAvailabilityType: batcherFlags.CalldataType,
ActiveSequencerCheckDuration: 0, ActiveSequencerCheckDuration: 0,
} }
...@@ -238,34 +250,38 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { ...@@ -238,34 +250,38 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
sys.BatchSubmitter = batcher sys.BatchSubmitter = batcher
} }
func sequencerFailoverSystemConfig(t *testing.T) SystemConfig { func sequencerFailoverSystemConfig(t *testing.T, ports map[string]int) SystemConfig {
cfg := DefaultSystemConfig(t) cfg := DefaultSystemConfig(t)
delete(cfg.Nodes, "sequencer") delete(cfg.Nodes, "sequencer")
cfg.Nodes[sequencer1Name] = sequencerCfg(true) cfg.Nodes[Sequencer1Name] = sequencerCfg(ports[Sequencer1Name])
cfg.Nodes[sequencer2Name] = sequencerCfg(false) cfg.Nodes[Sequencer2Name] = sequencerCfg(ports[Sequencer2Name])
cfg.Nodes[sequencer3Name] = sequencerCfg(false) cfg.Nodes[Sequencer3Name] = sequencerCfg(ports[Sequencer3Name])
delete(cfg.Loggers, "sequencer") delete(cfg.Loggers, "sequencer")
cfg.Loggers[sequencer1Name] = testlog.Logger(t, log.LvlInfo).New("role", sequencer1Name) cfg.Loggers[Sequencer1Name] = testlog.Logger(t, log.LvlInfo).New("role", Sequencer1Name)
cfg.Loggers[sequencer2Name] = testlog.Logger(t, log.LvlInfo).New("role", sequencer2Name) cfg.Loggers[Sequencer2Name] = testlog.Logger(t, log.LvlInfo).New("role", Sequencer2Name)
cfg.Loggers[sequencer3Name] = testlog.Logger(t, log.LvlInfo).New("role", sequencer3Name) cfg.Loggers[Sequencer3Name] = testlog.Logger(t, log.LvlInfo).New("role", Sequencer3Name)
cfg.P2PTopology = map[string][]string{ cfg.P2PTopology = map[string][]string{
sequencer1Name: {sequencer2Name, sequencer3Name}, Sequencer1Name: {Sequencer2Name, Sequencer3Name},
sequencer2Name: {sequencer3Name, verifierName}, Sequencer2Name: {Sequencer3Name, VerifierName},
sequencer3Name: {verifierName, sequencer1Name}, Sequencer3Name: {VerifierName, Sequencer1Name},
verifierName: {sequencer1Name, sequencer2Name}, VerifierName: {Sequencer1Name, Sequencer2Name},
} }
offset := hexutil.Uint64(0)
cfg.DeployConfig.L2GenesisDeltaTimeOffset = &offset
cfg.DeployConfig.L2GenesisEcotoneTimeOffset = &offset
return cfg return cfg
} }
func sequencerCfg(sequencerEnabled bool) *rollupNode.Config { func sequencerCfg(rpcPort int) *rollupNode.Config {
return &rollupNode.Config{ return &rollupNode.Config{
Driver: driver.Config{ Driver: driver.Config{
VerifierConfDepth: 0, VerifierConfDepth: 0,
SequencerConfDepth: 0, SequencerConfDepth: 0,
SequencerEnabled: sequencerEnabled, SequencerEnabled: true,
SequencerStopped: true,
}, },
// Submitter PrivKey is set in system start for rollup nodes where sequencer = true // Submitter PrivKey is set in system start for rollup nodes where sequencer = true
RPC: rollupNode.RPCConfig{ RPC: rollupNode.RPCConfig{
...@@ -277,6 +293,9 @@ func sequencerCfg(sequencerEnabled bool) *rollupNode.Config { ...@@ -277,6 +293,9 @@ func sequencerCfg(sequencerEnabled bool) *rollupNode.Config {
RuntimeConfigReloadInterval: time.Minute * 10, RuntimeConfigReloadInterval: time.Minute * 10,
ConfigPersistence: &rollupNode.DisabledConfigPersistence{}, ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
Sync: sync.Config{SyncMode: sync.CLSync}, Sync: sync.Config{SyncMode: sync.CLSync},
ConductorEnabled: true,
ConductorRpc: fmt.Sprintf("http://%s:%d", localhost, rpcPort),
ConductorRpcTimeout: 1 * time.Second,
} }
} }
...@@ -332,7 +351,8 @@ func findAvailablePort(t *testing.T) int { ...@@ -332,7 +351,8 @@ func findAvailablePort(t *testing.T) int {
case <-ctx.Done(): case <-ctx.Done():
t.Error("Failed to find available port") t.Error("Failed to find available port")
default: default:
port := rand.Intn(65535-1024) + 1024 // Random port in the range 1024-65535 // private / ephemeral ports are in the range 49152-65535
port := rand.Intn(65535-49152) + 49152
addr := fmt.Sprintf("127.0.0.1:%d", port) addr := fmt.Sprintf("127.0.0.1:%d", port)
l, err := net.Listen("tcp", addr) l, err := net.Listen("tcp", addr)
if err == nil { if err == nil {
...@@ -342,3 +362,12 @@ func findAvailablePort(t *testing.T) int { ...@@ -342,3 +362,12 @@ func findAvailablePort(t *testing.T) int {
} }
} }
} }
func findLeader(t *testing.T, conductors map[string]*conductor) (string, *conductor) {
for id, con := range conductors {
if leader(t, context.Background(), con) {
return id, con
}
}
return "", nil
}
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
// [Category: Initial Setup] // [Category: Initial Setup]
// In this test, we test that we can successfully setup a working cluster. // In this test, we test that we can successfully setup a working cluster.
func TestSequencerFailover_SetupCluster(t *testing.T) { func TestSequencerFailover_SetupCluster(t *testing.T) {
t.Skip("temporarily disable due to flakiness for now")
sys, conductors := setupSequencerFailoverTest(t) sys, conductors := setupSequencerFailoverTest(t)
defer sys.Close() defer sys.Close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment