Commit 3c0fe560 authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] part 1 - conductor main control loop (#8736)

* Add control loop logic and tests

* Cleanup code and tests
parent 789c9d34
...@@ -16,7 +16,11 @@ clean: ...@@ -16,7 +16,11 @@ clean:
test: test:
go test -v ./... go test -v ./...
generate-mocks:
go generate ./...
.PHONY: \ .PHONY: \
clean \ clean \
op-conductor \ op-conductor \
test test \
generate-mocks
// Code generated by mockery v2.28.1. DO NOT EDIT.
package mocks
import (
context "context"
common "github.com/ethereum/go-ethereum/common"
eth "github.com/ethereum-optimism/optimism/op-service/eth"
mock "github.com/stretchr/testify/mock"
)
// SequencerControl is an autogenerated mock type for the SequencerControl type
type SequencerControl struct {
mock.Mock
}
type SequencerControl_Expecter struct {
mock *mock.Mock
}
func (_m *SequencerControl) EXPECT() *SequencerControl_Expecter {
return &SequencerControl_Expecter{mock: &_m.Mock}
}
// LatestUnsafeBlock provides a mock function with given fields: ctx
func (_m *SequencerControl) LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error) {
ret := _m.Called(ctx)
var r0 eth.BlockInfo
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (eth.BlockInfo, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) eth.BlockInfo); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(eth.BlockInfo)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SequencerControl_LatestUnsafeBlock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestUnsafeBlock'
type SequencerControl_LatestUnsafeBlock_Call struct {
*mock.Call
}
// LatestUnsafeBlock is a helper method to define mock.On call
// - ctx context.Context
func (_e *SequencerControl_Expecter) LatestUnsafeBlock(ctx interface{}) *SequencerControl_LatestUnsafeBlock_Call {
return &SequencerControl_LatestUnsafeBlock_Call{Call: _e.mock.On("LatestUnsafeBlock", ctx)}
}
func (_c *SequencerControl_LatestUnsafeBlock_Call) Run(run func(ctx context.Context)) *SequencerControl_LatestUnsafeBlock_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *SequencerControl_LatestUnsafeBlock_Call) Return(_a0 eth.BlockInfo, _a1 error) *SequencerControl_LatestUnsafeBlock_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *SequencerControl_LatestUnsafeBlock_Call) RunAndReturn(run func(context.Context) (eth.BlockInfo, error)) *SequencerControl_LatestUnsafeBlock_Call {
_c.Call.Return(run)
return _c
}
// PostUnsafePayload provides a mock function with given fields: ctx, payload
func (_m *SequencerControl) PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error {
ret := _m.Called(ctx, payload)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *eth.ExecutionPayload) error); ok {
r0 = rf(ctx, payload)
} else {
r0 = ret.Error(0)
}
return r0
}
// SequencerControl_PostUnsafePayload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PostUnsafePayload'
type SequencerControl_PostUnsafePayload_Call struct {
*mock.Call
}
// PostUnsafePayload is a helper method to define mock.On call
// - ctx context.Context
// - payload *eth.ExecutionPayload
func (_e *SequencerControl_Expecter) PostUnsafePayload(ctx interface{}, payload interface{}) *SequencerControl_PostUnsafePayload_Call {
return &SequencerControl_PostUnsafePayload_Call{Call: _e.mock.On("PostUnsafePayload", ctx, payload)}
}
func (_c *SequencerControl_PostUnsafePayload_Call) Run(run func(ctx context.Context, payload *eth.ExecutionPayload)) *SequencerControl_PostUnsafePayload_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*eth.ExecutionPayload))
})
return _c
}
func (_c *SequencerControl_PostUnsafePayload_Call) Return(_a0 error) *SequencerControl_PostUnsafePayload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *SequencerControl_PostUnsafePayload_Call) RunAndReturn(run func(context.Context, *eth.ExecutionPayload) error) *SequencerControl_PostUnsafePayload_Call {
_c.Call.Return(run)
return _c
}
// SequencerActive provides a mock function with given fields: ctx
func (_m *SequencerControl) SequencerActive(ctx context.Context) (bool, error) {
ret := _m.Called(ctx)
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (bool, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) bool); ok {
r0 = rf(ctx)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SequencerControl_SequencerActive_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SequencerActive'
type SequencerControl_SequencerActive_Call struct {
*mock.Call
}
// SequencerActive is a helper method to define mock.On call
// - ctx context.Context
func (_e *SequencerControl_Expecter) SequencerActive(ctx interface{}) *SequencerControl_SequencerActive_Call {
return &SequencerControl_SequencerActive_Call{Call: _e.mock.On("SequencerActive", ctx)}
}
func (_c *SequencerControl_SequencerActive_Call) Run(run func(ctx context.Context)) *SequencerControl_SequencerActive_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *SequencerControl_SequencerActive_Call) Return(_a0 bool, _a1 error) *SequencerControl_SequencerActive_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *SequencerControl_SequencerActive_Call) RunAndReturn(run func(context.Context) (bool, error)) *SequencerControl_SequencerActive_Call {
_c.Call.Return(run)
return _c
}
// StartSequencer provides a mock function with given fields: ctx, hash
func (_m *SequencerControl) StartSequencer(ctx context.Context, hash common.Hash) error {
ret := _m.Called(ctx, hash)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, common.Hash) error); ok {
r0 = rf(ctx, hash)
} else {
r0 = ret.Error(0)
}
return r0
}
// SequencerControl_StartSequencer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartSequencer'
type SequencerControl_StartSequencer_Call struct {
*mock.Call
}
// StartSequencer is a helper method to define mock.On call
// - ctx context.Context
// - hash common.Hash
func (_e *SequencerControl_Expecter) StartSequencer(ctx interface{}, hash interface{}) *SequencerControl_StartSequencer_Call {
return &SequencerControl_StartSequencer_Call{Call: _e.mock.On("StartSequencer", ctx, hash)}
}
func (_c *SequencerControl_StartSequencer_Call) Run(run func(ctx context.Context, hash common.Hash)) *SequencerControl_StartSequencer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(common.Hash))
})
return _c
}
func (_c *SequencerControl_StartSequencer_Call) Return(_a0 error) *SequencerControl_StartSequencer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *SequencerControl_StartSequencer_Call) RunAndReturn(run func(context.Context, common.Hash) error) *SequencerControl_StartSequencer_Call {
_c.Call.Return(run)
return _c
}
// StopSequencer provides a mock function with given fields: ctx
func (_m *SequencerControl) StopSequencer(ctx context.Context) (common.Hash, error) {
ret := _m.Called(ctx)
var r0 common.Hash
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (common.Hash, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) common.Hash); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(common.Hash)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SequencerControl_StopSequencer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StopSequencer'
type SequencerControl_StopSequencer_Call struct {
*mock.Call
}
// StopSequencer is a helper method to define mock.On call
// - ctx context.Context
func (_e *SequencerControl_Expecter) StopSequencer(ctx interface{}) *SequencerControl_StopSequencer_Call {
return &SequencerControl_StopSequencer_Call{Call: _e.mock.On("StopSequencer", ctx)}
}
func (_c *SequencerControl_StopSequencer_Call) Run(run func(ctx context.Context)) *SequencerControl_StopSequencer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *SequencerControl_StopSequencer_Call) Return(_a0 common.Hash, _a1 error) *SequencerControl_StopSequencer_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *SequencerControl_StopSequencer_Call) RunAndReturn(run func(context.Context) (common.Hash, error)) *SequencerControl_StopSequencer_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewSequencerControl interface {
mock.TestingT
Cleanup(func())
}
// NewSequencerControl creates a new instance of SequencerControl. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewSequencerControl(t mockConstructorTestingTNewSequencerControl) *SequencerControl {
mock := &SequencerControl{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -10,9 +10,12 @@ import ( ...@@ -10,9 +10,12 @@ import (
) )
// SequencerControl defines the interface for controlling the sequencer. // SequencerControl defines the interface for controlling the sequencer.
//
//go:generate mockery --name SequencerControl --output mocks/ --with-expecter=true
type SequencerControl interface { type SequencerControl interface {
StartSequencer(ctx context.Context, hash common.Hash) error StartSequencer(ctx context.Context, hash common.Hash) error
StopSequencer(ctx context.Context) (common.Hash, error) StopSequencer(ctx context.Context) (common.Hash, error)
SequencerActive(ctx context.Context) (bool, error)
LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error) LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error)
PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error
} }
...@@ -47,6 +50,11 @@ func (s *sequencerController) StopSequencer(ctx context.Context) (common.Hash, e ...@@ -47,6 +50,11 @@ func (s *sequencerController) StopSequencer(ctx context.Context) (common.Hash, e
return s.node.StopSequencer(ctx) return s.node.StopSequencer(ctx)
} }
// SequencerActive implements SequencerControl.
func (s *sequencerController) SequencerActive(ctx context.Context) (bool, error) {
return s.node.SequencerActive(ctx)
}
// PostUnsafePayload implements SequencerControl. // PostUnsafePayload implements SequencerControl.
func (s *sequencerController) PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error { func (s *sequencerController) PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error {
return s.node.PostUnsafePayload(ctx, payload) return s.node.PostUnsafePayload(ctx, payload)
......
...@@ -38,6 +38,13 @@ type Config struct { ...@@ -38,6 +38,13 @@ type Config struct {
// ExecutionRPC is the HTTP provider URL for execution layer. // ExecutionRPC is the HTTP provider URL for execution layer.
ExecutionRPC string ExecutionRPC string
// Paused is true if the conductor should start in a paused state.
Paused bool
// HealthCheck is the health check configuration.
HealthCheck HealthCheckConfig
// RollupCfg is the rollup config.
RollupCfg rollup.Config RollupCfg rollup.Config
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
...@@ -66,6 +73,9 @@ func (c *Config) Check() error { ...@@ -66,6 +73,9 @@ func (c *Config) Check() error {
if c.ExecutionRPC == "" { if c.ExecutionRPC == "" {
return fmt.Errorf("missing geth RPC") return fmt.Errorf("missing geth RPC")
} }
if err := c.HealthCheck.Check(); err != nil {
return errors.Wrap(err, "invalid health check config")
}
if err := c.RollupCfg.Check(); err != nil { if err := c.RollupCfg.Check(); err != nil {
return errors.Wrap(err, "invalid rollup config") return errors.Wrap(err, "invalid rollup config")
} }
...@@ -99,6 +109,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { ...@@ -99,6 +109,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
RaftStorageDir: ctx.String(flags.RaftStorageDir.Name), RaftStorageDir: ctx.String(flags.RaftStorageDir.Name),
NodeRPC: ctx.String(flags.NodeRPC.Name), NodeRPC: ctx.String(flags.NodeRPC.Name),
ExecutionRPC: ctx.String(flags.ExecutionRPC.Name), ExecutionRPC: ctx.String(flags.ExecutionRPC.Name),
Paused: ctx.Bool(flags.Paused.Name),
HealthCheck: HealthCheckConfig{
Interval: ctx.Uint64(flags.HealthCheckInterval.Name),
SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name),
MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name),
},
RollupCfg: *rollupCfg, RollupCfg: *rollupCfg,
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
...@@ -106,3 +122,28 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { ...@@ -106,3 +122,28 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
RPC: oprpc.ReadCLIConfig(ctx), RPC: oprpc.ReadCLIConfig(ctx),
}, nil }, nil
} }
// HealthCheckConfig defines health check configuration.
type HealthCheckConfig struct {
// Interval is the interval (in seconds) to check the health of the sequencer.
Interval uint64
// SafeInterval is the interval between safe head progression measured in seconds.
SafeInterval uint64
// MinPeerCount is the minimum number of peers required for the sequencer to be healthy.
MinPeerCount uint64
}
func (c *HealthCheckConfig) Check() error {
if c.Interval == 0 {
return fmt.Errorf("missing health check interval")
}
if c.SafeInterval == 0 {
return fmt.Errorf("missing safe interval")
}
if c.MinPeerCount == 0 {
return fmt.Errorf("missing minimum peer count")
}
return nil
}
...@@ -3,20 +3,44 @@ package conductor ...@@ -3,20 +3,44 @@ package conductor
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/ethereum-optimism/optimism/op-conductor/client" "github.com/ethereum-optimism/optimism/op-conductor/client"
"github.com/ethereum-optimism/optimism/op-conductor/consensus" "github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-conductor/health"
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client" opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
) )
var (
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismarch = errors.New("unsafe head mismatch")
)
// New creates a new OpConductor instance. // New creates a new OpConductor instance.
func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpConductor, error) { func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpConductor, error) {
return NewOpConductor(ctx, cfg, log, version, nil, nil, nil)
}
// NewOpConductor creates a new OpConductor instance.
func NewOpConductor(
ctx context.Context,
cfg *Config,
log log.Logger,
version string,
ctrl client.SequencerControl,
cons consensus.Consensus,
hmon health.HealthMonitor,
) (*OpConductor, error) {
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return nil, errors.Wrap(err, "invalid config") return nil, errors.Wrap(err, "invalid config")
} }
...@@ -25,7 +49,23 @@ func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpC ...@@ -25,7 +49,23 @@ func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpC
log: log, log: log,
version: version, version: version,
cfg: cfg, cfg: cfg,
pauseCh: make(chan struct{}),
pauseDoneCh: make(chan struct{}),
resumeCh: make(chan struct{}),
resumeDoneCh: make(chan struct{}),
actionCh: make(chan struct{}, 1),
ctrl: ctrl,
cons: cons,
hmon: hmon,
} }
oc.actionFn = oc.action
// explicitly set all atomic.Bool values
oc.leader.Store(false) // upon start, it should not be the leader unless specified otherwise by raft bootstrap, in that case, it'll receive a leadership update from consensus.
oc.healthy.Store(true) // default to healthy unless reported otherwise by health monitor.
oc.seqActive.Store(false) // explicitly set to false by default, the real value will be reported after sequencer control initialization.
oc.paused.Store(cfg.Paused)
oc.stopped.Store(false)
err := oc.init(ctx) err := oc.init(ctx)
if err != nil { if err != nil {
...@@ -47,17 +87,24 @@ func (c *OpConductor) init(ctx context.Context) error { ...@@ -47,17 +87,24 @@ func (c *OpConductor) init(ctx context.Context) error {
if err := c.initConsensus(ctx); err != nil { if err := c.initConsensus(ctx); err != nil {
return errors.Wrap(err, "failed to initialize consensus") return errors.Wrap(err, "failed to initialize consensus")
} }
if err := c.initHealthMonitor(ctx); err != nil {
return errors.Wrap(err, "failed to initialize health monitor")
}
return nil return nil
} }
func (c *OpConductor) initSequencerControl(ctx context.Context) error { func (c *OpConductor) initSequencerControl(ctx context.Context) error {
if c.ctrl != nil {
return nil
}
ec, err := opclient.NewRPC(ctx, c.log, c.cfg.ExecutionRPC) ec, err := opclient.NewRPC(ctx, c.log, c.cfg.ExecutionRPC)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create geth rpc client") return errors.Wrap(err, "failed to create geth rpc client")
} }
gethCfg := sources.L2ClientDefaultConfig(&c.cfg.RollupCfg, true) execCfg := sources.L2ClientDefaultConfig(&c.cfg.RollupCfg, true)
// TODO: Add metrics tracer here. tracked by https://github.com/ethereum-optimism/protocol-quest/issues/45 // TODO: Add metrics tracer here. tracked by https://github.com/ethereum-optimism/protocol-quest/issues/45
geth, err := sources.NewEthClient(ec, c.log, nil, &gethCfg.EthClientConfig) exec, err := sources.NewEthClient(ec, c.log, nil, &execCfg.EthClientConfig)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create geth client") return errors.Wrap(err, "failed to create geth client")
} }
...@@ -67,11 +114,22 @@ func (c *OpConductor) initSequencerControl(ctx context.Context) error { ...@@ -67,11 +114,22 @@ func (c *OpConductor) initSequencerControl(ctx context.Context) error {
return errors.Wrap(err, "failed to create node rpc client") return errors.Wrap(err, "failed to create node rpc client")
} }
node := sources.NewRollupClient(nc) node := sources.NewRollupClient(nc)
c.ctrl = client.NewSequencerControl(geth, node) c.ctrl = client.NewSequencerControl(exec, node)
active, err := c.ctrl.SequencerActive(ctx)
if err != nil {
return errors.Wrap(err, "failed to get sequencer active status")
}
c.seqActive.Store(active)
return nil return nil
} }
func (c *OpConductor) initConsensus(ctx context.Context) error { func (c *OpConductor) initConsensus(ctx context.Context) error {
if c.cons != nil {
return nil
}
serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort) serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort)
cons, err := consensus.NewRaftConsensus(c.log, c.cfg.RaftServerID, serverAddr, c.cfg.RaftStorageDir, c.cfg.RaftBootstrap, &c.cfg.RollupCfg) cons, err := consensus.NewRaftConsensus(c.log, c.cfg.RaftServerID, serverAddr, c.cfg.RaftStorageDir, c.cfg.RaftBootstrap, &c.cfg.RollupCfg)
if err != nil { if err != nil {
...@@ -81,15 +139,44 @@ func (c *OpConductor) initConsensus(ctx context.Context) error { ...@@ -81,15 +139,44 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
return nil return nil
} }
func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
if c.hmon != nil {
return nil
}
nc, err := opclient.NewRPC(ctx, c.log, c.cfg.NodeRPC)
if err != nil {
return errors.Wrap(err, "failed to create node rpc client")
}
node := sources.NewRollupClient(nc)
pc, err := rpc.DialContext(ctx, c.cfg.NodeRPC)
if err != nil {
return errors.Wrap(err, "failed to create p2p rpc client")
}
p2p := opp2p.NewClient(pc)
c.hmon = health.NewSequencerHealthMonitor(
c.log,
c.cfg.HealthCheck.Interval,
c.cfg.HealthCheck.SafeInterval,
c.cfg.HealthCheck.MinPeerCount,
&c.cfg.RollupCfg,
node,
p2p,
)
return nil
}
// OpConductor represents a full conductor instance and its resources, it does: // OpConductor represents a full conductor instance and its resources, it does:
// 1. performs health checks on sequencer // 1. performs health checks on sequencer
// 2. participate in consensus protocol for leader election // 2. participate in consensus protocol for leader election
// 3. and control sequencer state based on leader and sequencer health status. // 3. and control sequencer state based on leader, sequencer health and sequencer active status.
// //
// OpConductor has three states: // OpConductor has three states:
// 1. running: it is running normally, which executes control loop and participates in leader election. // 1. running: it is running normally, which executes control loop and participates in leader election.
// 2. paused: control loop (sequencer start/stop) is paused, but it still participates in leader election. // 2. paused: control loop (sequencer start/stop) is paused, but it still participates in leader election, and receives health updates.
// it is paused for disaster recovery situation
// 3. stopped: it is stopped, which means it is not participating in leader election and control loop. OpConductor cannot be started again from stopped mode. // 3. stopped: it is stopped, which means it is not participating in leader election and control loop. OpConductor cannot be started again from stopped mode.
type OpConductor struct { type OpConductor struct {
log log.Logger log log.Logger
...@@ -98,21 +185,169 @@ type OpConductor struct { ...@@ -98,21 +185,169 @@ type OpConductor struct {
ctrl client.SequencerControl ctrl client.SequencerControl
cons consensus.Consensus cons consensus.Consensus
hmon health.HealthMonitor
leader atomic.Bool
healthy atomic.Bool
seqActive atomic.Bool
actionFn func() // actionFn defines the action to be executed to bring the sequencer to the desired state.
wg sync.WaitGroup
pauseCh chan struct{}
pauseDoneCh chan struct{}
resumeCh chan struct{}
resumeDoneCh chan struct{}
actionCh chan struct{}
paused atomic.Bool
stopped atomic.Bool
shutdownCtx context.Context
shutdownCancel context.CancelFunc
} }
var _ cliapp.Lifecycle = (*OpConductor)(nil) var _ cliapp.Lifecycle = (*OpConductor)(nil)
// Start implements cliapp.Lifecycle. // Start implements cliapp.Lifecycle.
func (*OpConductor) Start(ctx context.Context) error { func (oc *OpConductor) Start(ctx context.Context) error {
panic("unimplemented") oc.log.Info("starting OpConductor")
if err := oc.hmon.Start(); err != nil {
return errors.Wrap(err, "failed to start health monitor")
}
oc.shutdownCtx, oc.shutdownCancel = context.WithCancel(ctx)
oc.wg.Add(1)
go oc.loop()
oc.log.Info("OpConductor started")
return nil
} }
// Stop implements cliapp.Lifecycle. // Stop implements cliapp.Lifecycle.
func (*OpConductor) Stop(ctx context.Context) error { func (oc *OpConductor) Stop(ctx context.Context) error {
panic("unimplemented") oc.log.Info("stopping OpConductor")
var result *multierror.Error
// close control loop
oc.shutdownCancel()
oc.wg.Wait()
// stop health check
if err := oc.hmon.Stop(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to stop health monitor"))
}
if err := oc.cons.Shutdown(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to shutdown consensus"))
}
if result.ErrorOrNil() != nil {
oc.log.Error("failed to stop OpConductor", "err", result.ErrorOrNil())
return result.ErrorOrNil()
}
oc.stopped.Store(true)
oc.log.Info("OpConductor stopped")
return nil
} }
// Stopped implements cliapp.Lifecycle. // Stopped implements cliapp.Lifecycle.
func (*OpConductor) Stopped() bool { func (oc *OpConductor) Stopped() bool {
panic("unimplemented") return oc.stopped.Load()
}
// Pause pauses the control loop of OpConductor, but still allows it to participate in leader election.
func (oc *OpConductor) Pause(ctx context.Context) error {
select {
case oc.pauseCh <- struct{}{}:
<-oc.pauseDoneCh
return nil
case <-ctx.Done():
return ErrPauseTimeout
}
}
// Resume resumes the control loop of OpConductor.
func (oc *OpConductor) Resume(ctx context.Context) error {
select {
case oc.resumeCh <- struct{}{}:
<-oc.resumeDoneCh
return nil
case <-ctx.Done():
return ErrResumeTimeout
}
}
// Paused returns true if OpConductor is paused.
func (oc *OpConductor) Paused() bool {
return oc.paused.Load()
}
func (oc *OpConductor) loop() {
defer oc.wg.Done()
healthUpdate := oc.hmon.Subscribe()
leaderUpdate := oc.cons.LeaderCh()
for {
select {
// We process status update (health, leadership) first regardless of the paused state.
// This way we could properly bring the sequencer to the desired state when resumed.
case healthy := <-healthUpdate:
oc.handleHealthUpdate(healthy)
case leader := <-leaderUpdate:
oc.handleLeaderUpdate(leader)
case <-oc.pauseCh:
oc.paused.Store(true)
oc.pauseDoneCh <- struct{}{}
case <-oc.resumeCh:
oc.paused.Store(false)
oc.resumeDoneCh <- struct{}{}
// queue an action to make sure sequencer is in the desired state after resume.
oc.queueAction()
case <-oc.shutdownCtx.Done():
return
// Handle control action last, so that when executing the action, we have the latest status and bring the sequencer to the desired state.
case <-oc.actionCh:
oc.actionFn()
}
}
}
func (oc *OpConductor) queueAction() {
select {
case oc.actionCh <- struct{}{}:
default:
// do nothing if there's an action queued already, this is fine because whenever an action is executed,
// it is guaranteed to have the latest status and bring the sequencer to the desired state.
}
}
// handleLeaderUpdate handles leadership update from consensus.
func (oc *OpConductor) handleLeaderUpdate(leader bool) {
oc.log.Info("Leadership status changed", "server", oc.cons.ServerID(), "leader", leader)
oc.leader.Store(leader)
oc.queueAction()
}
// handleHealthUpdate handles health update from health monitor.
func (oc *OpConductor) handleHealthUpdate(healthy bool) {
if !healthy {
oc.log.Error("Sequencer is unhealthy", "server", oc.cons.ServerID())
}
if healthy != oc.healthy.Load() {
oc.healthy.Store(healthy)
oc.queueAction()
}
}
// action tries to bring the sequencer to the desired state, a retry will be queued if any action failed.
func (oc *OpConductor) action() {
if oc.Paused() {
return
}
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/47) implement
} }
package conductor
import (
"context"
"math/big"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
clientmocks "github.com/ethereum-optimism/optimism/op-conductor/client/mocks"
consensusmocks "github.com/ethereum-optimism/optimism/op-conductor/consensus/mocks"
healthmocks "github.com/ethereum-optimism/optimism/op-conductor/health/mocks"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func mockConfig(t *testing.T) Config {
now := uint64(time.Now().Unix())
dir, err := os.MkdirTemp("/tmp", "")
require.NoError(t, err)
return Config{
ConsensusAddr: "127.0.0.1",
ConsensusPort: 50050,
RaftServerID: "SequencerA",
RaftStorageDir: dir,
RaftBootstrap: false,
NodeRPC: "http://node:8545",
ExecutionRPC: "http://geth:8545",
HealthCheck: HealthCheckConfig{
Interval: 1,
SafeInterval: 5,
MinPeerCount: 1,
},
RollupCfg: rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{
Hash: [32]byte{1, 2},
Number: 100,
},
L2: eth.BlockID{
Hash: [32]byte{2, 3},
Number: 0,
},
L2Time: now,
SystemConfig: eth.SystemConfig{
BatcherAddr: [20]byte{1},
Overhead: [32]byte{1},
Scalar: [32]byte{1},
GasLimit: 30000000,
},
},
BlockTime: 2,
MaxSequencerDrift: 600,
SeqWindowSize: 3600,
ChannelTimeout: 300,
L1ChainID: big.NewInt(1),
L2ChainID: big.NewInt(2),
CanyonTime: &now,
BatchInboxAddress: [20]byte{1, 2},
DepositContractAddress: [20]byte{2, 3},
L1SystemConfigAddress: [20]byte{3, 4},
ProtocolVersionsAddress: [20]byte{4, 5},
},
}
}
type OpConductorTestSuite struct {
suite.Suite
conductor *OpConductor
healthUpdateCh chan bool
leaderUpdateCh chan bool
ctx context.Context
log log.Logger
cfg Config
version string
ctrl *clientmocks.SequencerControl
cons *consensusmocks.Consensus
hmon *healthmocks.HealthMonitor
}
func (s *OpConductorTestSuite) SetupSuite() {
s.ctx = context.Background()
s.log = testlog.Logger(s.T(), log.LvlDebug)
s.cfg = mockConfig(s.T())
s.version = "v0.0.1"
s.ctrl = &clientmocks.SequencerControl{}
s.cons = &consensusmocks.Consensus{}
s.hmon = &healthmocks.HealthMonitor{}
s.cons.EXPECT().ServerID().Return("SequencerA")
}
func (s *OpConductorTestSuite) SetupTest() {
conductor, err := NewOpConductor(s.ctx, &s.cfg, s.log, s.version, s.ctrl, s.cons, s.hmon)
s.NoError(err)
s.conductor = conductor
s.healthUpdateCh = make(chan bool)
s.hmon.EXPECT().Start().Return(nil)
s.hmon.EXPECT().Subscribe().Return(s.healthUpdateCh)
s.leaderUpdateCh = make(chan bool)
s.cons.EXPECT().LeaderCh().Return(s.leaderUpdateCh)
err = s.conductor.Start(s.ctx)
s.NoError(err)
s.False(s.conductor.Stopped())
}
// Scenario 1: pause -> resume -> stop
func (s *OpConductorTestSuite) TestControlLoop1() {
// Pause
err := s.conductor.Pause(s.ctx)
s.NoError(err)
s.True(s.conductor.Paused())
// Send health update, make sure it can still be consumed.
s.healthUpdateCh <- true
// Resume
err = s.conductor.Resume(s.ctx)
s.NoError(err)
s.False(s.conductor.Paused())
// Stop
s.hmon.EXPECT().Stop().Return(nil)
s.cons.EXPECT().Shutdown().Return(nil)
err = s.conductor.Stop(s.ctx)
s.NoError(err)
s.True(s.conductor.Stopped())
}
// Scenario 2: pause -> pause -> resume -> resume
func (s *OpConductorTestSuite) TestControlLoop2() {
// Pause
err := s.conductor.Pause(s.ctx)
s.NoError(err)
s.True(s.conductor.Paused())
// Pause again, this shouldn't block or cause any other issues
err = s.conductor.Pause(s.ctx)
s.NoError(err)
s.True(s.conductor.Paused())
// Resume
err = s.conductor.Resume(s.ctx)
s.NoError(err)
s.False(s.conductor.Paused())
// Resume
err = s.conductor.Resume(s.ctx)
s.NoError(err)
s.False(s.conductor.Paused())
}
// Scenario 3: pause -> stop
func (s *OpConductorTestSuite) TestControlLoop3() {
// Pause
err := s.conductor.Pause(s.ctx)
s.NoError(err)
s.True(s.conductor.Paused())
// Stop
s.hmon.EXPECT().Stop().Return(nil)
s.cons.EXPECT().Shutdown().Return(nil)
err = s.conductor.Stop(s.ctx)
s.NoError(err)
s.True(s.conductor.Stopped())
}
func TestHealthMonitor(t *testing.T) {
suite.Run(t, new(OpConductorTestSuite))
}
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
) )
// Consensus defines the consensus interface for leadership election. // Consensus defines the consensus interface for leadership election.
//
//go:generate mockery --name Consensus --output mocks/ --with-expecter=true
type Consensus interface { type Consensus interface {
// AddVoter adds a voting member into the cluster, voter is elegible to become leader. // AddVoter adds a voting member into the cluster, voter is elegible to become leader.
AddVoter(id, addr string) error AddVoter(id, addr string) error
......
// Code generated by mockery v2.28.1. DO NOT EDIT.
package mocks
import (
eth "github.com/ethereum-optimism/optimism/op-service/eth"
mock "github.com/stretchr/testify/mock"
)
// Consensus is an autogenerated mock type for the Consensus type
type Consensus struct {
mock.Mock
}
type Consensus_Expecter struct {
mock *mock.Mock
}
func (_m *Consensus) EXPECT() *Consensus_Expecter {
return &Consensus_Expecter{mock: &_m.Mock}
}
// AddNonVoter provides a mock function with given fields: id, addr
func (_m *Consensus) AddNonVoter(id string, addr string) error {
ret := _m.Called(id, addr)
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(id, addr)
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_AddNonVoter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddNonVoter'
type Consensus_AddNonVoter_Call struct {
*mock.Call
}
// AddNonVoter is a helper method to define mock.On call
// - id string
// - addr string
func (_e *Consensus_Expecter) AddNonVoter(id interface{}, addr interface{}) *Consensus_AddNonVoter_Call {
return &Consensus_AddNonVoter_Call{Call: _e.mock.On("AddNonVoter", id, addr)}
}
func (_c *Consensus_AddNonVoter_Call) Run(run func(id string, addr string)) *Consensus_AddNonVoter_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *Consensus_AddNonVoter_Call) Return(_a0 error) *Consensus_AddNonVoter_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_AddNonVoter_Call) RunAndReturn(run func(string, string) error) *Consensus_AddNonVoter_Call {
_c.Call.Return(run)
return _c
}
// AddVoter provides a mock function with given fields: id, addr
func (_m *Consensus) AddVoter(id string, addr string) error {
ret := _m.Called(id, addr)
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(id, addr)
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_AddVoter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddVoter'
type Consensus_AddVoter_Call struct {
*mock.Call
}
// AddVoter is a helper method to define mock.On call
// - id string
// - addr string
func (_e *Consensus_Expecter) AddVoter(id interface{}, addr interface{}) *Consensus_AddVoter_Call {
return &Consensus_AddVoter_Call{Call: _e.mock.On("AddVoter", id, addr)}
}
func (_c *Consensus_AddVoter_Call) Run(run func(id string, addr string)) *Consensus_AddVoter_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *Consensus_AddVoter_Call) Return(_a0 error) *Consensus_AddVoter_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string) error) *Consensus_AddVoter_Call {
_c.Call.Return(run)
return _c
}
// CommitUnsafePayload provides a mock function with given fields: payload
func (_m *Consensus) CommitUnsafePayload(payload eth.ExecutionPayload) error {
ret := _m.Called(payload)
var r0 error
if rf, ok := ret.Get(0).(func(eth.ExecutionPayload) error); ok {
r0 = rf(payload)
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_CommitUnsafePayload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CommitUnsafePayload'
type Consensus_CommitUnsafePayload_Call struct {
*mock.Call
}
// CommitUnsafePayload is a helper method to define mock.On call
// - payload eth.ExecutionPayload
func (_e *Consensus_Expecter) CommitUnsafePayload(payload interface{}) *Consensus_CommitUnsafePayload_Call {
return &Consensus_CommitUnsafePayload_Call{Call: _e.mock.On("CommitUnsafePayload", payload)}
}
func (_c *Consensus_CommitUnsafePayload_Call) Run(run func(payload eth.ExecutionPayload)) *Consensus_CommitUnsafePayload_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(eth.ExecutionPayload))
})
return _c
}
func (_c *Consensus_CommitUnsafePayload_Call) Return(_a0 error) *Consensus_CommitUnsafePayload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_CommitUnsafePayload_Call) RunAndReturn(run func(eth.ExecutionPayload) error) *Consensus_CommitUnsafePayload_Call {
_c.Call.Return(run)
return _c
}
// DemoteVoter provides a mock function with given fields: id
func (_m *Consensus) DemoteVoter(id string) error {
ret := _m.Called(id)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(id)
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_DemoteVoter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DemoteVoter'
type Consensus_DemoteVoter_Call struct {
*mock.Call
}
// DemoteVoter is a helper method to define mock.On call
// - id string
func (_e *Consensus_Expecter) DemoteVoter(id interface{}) *Consensus_DemoteVoter_Call {
return &Consensus_DemoteVoter_Call{Call: _e.mock.On("DemoteVoter", id)}
}
func (_c *Consensus_DemoteVoter_Call) Run(run func(id string)) *Consensus_DemoteVoter_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *Consensus_DemoteVoter_Call) Return(_a0 error) *Consensus_DemoteVoter_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string) error) *Consensus_DemoteVoter_Call {
_c.Call.Return(run)
return _c
}
// LatestUnsafePayload provides a mock function with given fields:
func (_m *Consensus) LatestUnsafePayload() eth.ExecutionPayload {
ret := _m.Called()
var r0 eth.ExecutionPayload
if rf, ok := ret.Get(0).(func() eth.ExecutionPayload); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(eth.ExecutionPayload)
}
return r0
}
// Consensus_LatestUnsafePayload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestUnsafePayload'
type Consensus_LatestUnsafePayload_Call struct {
*mock.Call
}
// LatestUnsafePayload is a helper method to define mock.On call
func (_e *Consensus_Expecter) LatestUnsafePayload() *Consensus_LatestUnsafePayload_Call {
return &Consensus_LatestUnsafePayload_Call{Call: _e.mock.On("LatestUnsafePayload")}
}
func (_c *Consensus_LatestUnsafePayload_Call) Run(run func()) *Consensus_LatestUnsafePayload_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(run)
return _c
}
// Leader provides a mock function with given fields:
func (_m *Consensus) Leader() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// Consensus_Leader_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Leader'
type Consensus_Leader_Call struct {
*mock.Call
}
// Leader is a helper method to define mock.On call
func (_e *Consensus_Expecter) Leader() *Consensus_Leader_Call {
return &Consensus_Leader_Call{Call: _e.mock.On("Leader")}
}
func (_c *Consensus_Leader_Call) Run(run func()) *Consensus_Leader_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_Leader_Call) Return(_a0 bool) *Consensus_Leader_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_Leader_Call) RunAndReturn(run func() bool) *Consensus_Leader_Call {
_c.Call.Return(run)
return _c
}
// LeaderCh provides a mock function with given fields:
func (_m *Consensus) LeaderCh() <-chan bool {
ret := _m.Called()
var r0 <-chan bool
if rf, ok := ret.Get(0).(func() <-chan bool); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan bool)
}
}
return r0
}
// Consensus_LeaderCh_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LeaderCh'
type Consensus_LeaderCh_Call struct {
*mock.Call
}
// LeaderCh is a helper method to define mock.On call
func (_e *Consensus_Expecter) LeaderCh() *Consensus_LeaderCh_Call {
return &Consensus_LeaderCh_Call{Call: _e.mock.On("LeaderCh")}
}
func (_c *Consensus_LeaderCh_Call) Run(run func()) *Consensus_LeaderCh_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_LeaderCh_Call) Return(_a0 <-chan bool) *Consensus_LeaderCh_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_LeaderCh_Call) RunAndReturn(run func() <-chan bool) *Consensus_LeaderCh_Call {
_c.Call.Return(run)
return _c
}
// RemoveServer provides a mock function with given fields: id
func (_m *Consensus) RemoveServer(id string) error {
ret := _m.Called(id)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(id)
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_RemoveServer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveServer'
type Consensus_RemoveServer_Call struct {
*mock.Call
}
// RemoveServer is a helper method to define mock.On call
// - id string
func (_e *Consensus_Expecter) RemoveServer(id interface{}) *Consensus_RemoveServer_Call {
return &Consensus_RemoveServer_Call{Call: _e.mock.On("RemoveServer", id)}
}
func (_c *Consensus_RemoveServer_Call) Run(run func(id string)) *Consensus_RemoveServer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *Consensus_RemoveServer_Call) Return(_a0 error) *Consensus_RemoveServer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_RemoveServer_Call) RunAndReturn(run func(string) error) *Consensus_RemoveServer_Call {
_c.Call.Return(run)
return _c
}
// ServerID provides a mock function with given fields:
func (_m *Consensus) ServerID() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Consensus_ServerID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ServerID'
type Consensus_ServerID_Call struct {
*mock.Call
}
// ServerID is a helper method to define mock.On call
func (_e *Consensus_Expecter) ServerID() *Consensus_ServerID_Call {
return &Consensus_ServerID_Call{Call: _e.mock.On("ServerID")}
}
func (_c *Consensus_ServerID_Call) Run(run func()) *Consensus_ServerID_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_ServerID_Call) Return(_a0 string) *Consensus_ServerID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_ServerID_Call) RunAndReturn(run func() string) *Consensus_ServerID_Call {
_c.Call.Return(run)
return _c
}
// Shutdown provides a mock function with given fields:
func (_m *Consensus) Shutdown() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_Shutdown_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Shutdown'
type Consensus_Shutdown_Call struct {
*mock.Call
}
// Shutdown is a helper method to define mock.On call
func (_e *Consensus_Expecter) Shutdown() *Consensus_Shutdown_Call {
return &Consensus_Shutdown_Call{Call: _e.mock.On("Shutdown")}
}
func (_c *Consensus_Shutdown_Call) Run(run func()) *Consensus_Shutdown_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_Shutdown_Call) Return(_a0 error) *Consensus_Shutdown_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_Shutdown_Call) RunAndReturn(run func() error) *Consensus_Shutdown_Call {
_c.Call.Return(run)
return _c
}
// TransferLeader provides a mock function with given fields:
func (_m *Consensus) TransferLeader() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_TransferLeader_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TransferLeader'
type Consensus_TransferLeader_Call struct {
*mock.Call
}
// TransferLeader is a helper method to define mock.On call
func (_e *Consensus_Expecter) TransferLeader() *Consensus_TransferLeader_Call {
return &Consensus_TransferLeader_Call{Call: _e.mock.On("TransferLeader")}
}
func (_c *Consensus_TransferLeader_Call) Run(run func()) *Consensus_TransferLeader_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_TransferLeader_Call) Return(_a0 error) *Consensus_TransferLeader_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_TransferLeader_Call) RunAndReturn(run func() error) *Consensus_TransferLeader_Call {
_c.Call.Return(run)
return _c
}
// TransferLeaderTo provides a mock function with given fields: id, addr
func (_m *Consensus) TransferLeaderTo(id string, addr string) error {
ret := _m.Called(id, addr)
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(id, addr)
} else {
r0 = ret.Error(0)
}
return r0
}
// Consensus_TransferLeaderTo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TransferLeaderTo'
type Consensus_TransferLeaderTo_Call struct {
*mock.Call
}
// TransferLeaderTo is a helper method to define mock.On call
// - id string
// - addr string
func (_e *Consensus_Expecter) TransferLeaderTo(id interface{}, addr interface{}) *Consensus_TransferLeaderTo_Call {
return &Consensus_TransferLeaderTo_Call{Call: _e.mock.On("TransferLeaderTo", id, addr)}
}
func (_c *Consensus_TransferLeaderTo_Call) Run(run func(id string, addr string)) *Consensus_TransferLeaderTo_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
})
return _c
}
func (_c *Consensus_TransferLeaderTo_Call) Return(_a0 error) *Consensus_TransferLeaderTo_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_TransferLeaderTo_Call) RunAndReturn(run func(string, string) error) *Consensus_TransferLeaderTo_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewConsensus interface {
mock.TestingT
Cleanup(func())
}
// NewConsensus creates a new instance of Consensus. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConsensus(t mockConstructorTestingTNewConsensus) *Consensus {
mock := &Consensus{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -48,6 +48,27 @@ var ( ...@@ -48,6 +48,27 @@ var (
Usage: "HTTP provider URL for execution layer", Usage: "HTTP provider URL for execution layer",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "EXECUTION_RPC"), EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "EXECUTION_RPC"),
} }
HealthCheckInterval = &cli.Uint64Flag{
Name: "healthcheck.interval",
Usage: "Interval between health checks",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_INTERVAL"),
}
HealthCheckSafeInterval = &cli.Uint64Flag{
Name: "healthcheck.safe-interval",
Usage: "Interval between safe head progression measured in seconds",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_SAFE_INTERVAL"),
}
HealthCheckMinPeerCount = &cli.Uint64Flag{
Name: "healthcheck.min-peer-count",
Usage: "Minimum number of peers required to be considered healthy",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_MIN_PEER_COUNT"),
}
Paused = &cli.BoolFlag{
Name: "paused",
Usage: "Whether the conductor is paused",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "PAUSED"),
Value: false,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -57,9 +78,14 @@ var requiredFlags = []cli.Flag{ ...@@ -57,9 +78,14 @@ var requiredFlags = []cli.Flag{
RaftStorageDir, RaftStorageDir,
NodeRPC, NodeRPC,
ExecutionRPC, ExecutionRPC,
HealthCheckInterval,
HealthCheckSafeInterval,
HealthCheckMinPeerCount,
} }
var optionalFlags = []cli.Flag{} var optionalFlags = []cli.Flag{
Paused,
}
func init() { func init() {
optionalFlags = append(optionalFlags, oprpc.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, oprpc.CLIFlags(EnvVarPrefix)...)
......
// Code generated by mockery v2.28.1. DO NOT EDIT.
package mocks
import mock "github.com/stretchr/testify/mock"
// HealthMonitor is an autogenerated mock type for the HealthMonitor type
type HealthMonitor struct {
mock.Mock
}
type HealthMonitor_Expecter struct {
mock *mock.Mock
}
func (_m *HealthMonitor) EXPECT() *HealthMonitor_Expecter {
return &HealthMonitor_Expecter{mock: &_m.Mock}
}
// Start provides a mock function with given fields:
func (_m *HealthMonitor) Start() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// HealthMonitor_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
type HealthMonitor_Start_Call struct {
*mock.Call
}
// Start is a helper method to define mock.On call
func (_e *HealthMonitor_Expecter) Start() *HealthMonitor_Start_Call {
return &HealthMonitor_Start_Call{Call: _e.mock.On("Start")}
}
func (_c *HealthMonitor_Start_Call) Run(run func()) *HealthMonitor_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *HealthMonitor_Start_Call) Return(_a0 error) *HealthMonitor_Start_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *HealthMonitor_Start_Call) RunAndReturn(run func() error) *HealthMonitor_Start_Call {
_c.Call.Return(run)
return _c
}
// Stop provides a mock function with given fields:
func (_m *HealthMonitor) Stop() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// HealthMonitor_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type HealthMonitor_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *HealthMonitor_Expecter) Stop() *HealthMonitor_Stop_Call {
return &HealthMonitor_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *HealthMonitor_Stop_Call) Run(run func()) *HealthMonitor_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *HealthMonitor_Stop_Call) Return(_a0 error) *HealthMonitor_Stop_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *HealthMonitor_Stop_Call) RunAndReturn(run func() error) *HealthMonitor_Stop_Call {
_c.Call.Return(run)
return _c
}
// Subscribe provides a mock function with given fields:
func (_m *HealthMonitor) Subscribe() <-chan bool {
ret := _m.Called()
var r0 <-chan bool
if rf, ok := ret.Get(0).(func() <-chan bool); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan bool)
}
}
return r0
}
// HealthMonitor_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe'
type HealthMonitor_Subscribe_Call struct {
*mock.Call
}
// Subscribe is a helper method to define mock.On call
func (_e *HealthMonitor_Expecter) Subscribe() *HealthMonitor_Subscribe_Call {
return &HealthMonitor_Subscribe_Call{Call: _e.mock.On("Subscribe")}
}
func (_c *HealthMonitor_Subscribe_Call) Run(run func()) *HealthMonitor_Subscribe_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *HealthMonitor_Subscribe_Call) Return(_a0 <-chan bool) *HealthMonitor_Subscribe_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *HealthMonitor_Subscribe_Call) RunAndReturn(run func() <-chan bool) *HealthMonitor_Subscribe_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewHealthMonitor interface {
mock.TestingT
Cleanup(func())
}
// NewHealthMonitor creates a new instance of HealthMonitor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewHealthMonitor(t mockConstructorTestingTNewHealthMonitor) *HealthMonitor {
mock := &HealthMonitor{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -13,6 +13,8 @@ import ( ...@@ -13,6 +13,8 @@ import (
) )
// HealthMonitor defines the interface for monitoring the health of the sequencer. // HealthMonitor defines the interface for monitoring the health of the sequencer.
//
//go:generate mockery --name HealthMonitor --output mocks/ --with-expecter=true
type HealthMonitor interface { type HealthMonitor interface {
// Subscribe returns a channel that will be notified for every health check. // Subscribe returns a channel that will be notified for every health check.
Subscribe() <-chan bool Subscribe() <-chan bool
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment