Commit d83f12d3 authored by Francis Li's avatar Francis Li Committed by GitHub

feat(op-conductor): implement startup handshake (#12047)

* op-node waits establishes connection to conductor before starting in sequencer enabled mode

* Added conductor enabled api to op-node

* check node enabled conductor during conductor startup

* update logs

* Change back to lazy initialization

* Add method not found check
parent 5798c5f4
...@@ -25,6 +25,62 @@ func (_m *SequencerControl) EXPECT() *SequencerControl_Expecter { ...@@ -25,6 +25,62 @@ func (_m *SequencerControl) EXPECT() *SequencerControl_Expecter {
return &SequencerControl_Expecter{mock: &_m.Mock} return &SequencerControl_Expecter{mock: &_m.Mock}
} }
// ConductorEnabled provides a mock function with given fields: ctx
func (_m *SequencerControl) ConductorEnabled(ctx context.Context) (bool, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for ConductorEnabled")
}
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (bool, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) bool); ok {
r0 = rf(ctx)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SequencerControl_ConductorEnabled_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ConductorEnabled'
type SequencerControl_ConductorEnabled_Call struct {
*mock.Call
}
// ConductorEnabled is a helper method to define mock.On call
// - ctx context.Context
func (_e *SequencerControl_Expecter) ConductorEnabled(ctx interface{}) *SequencerControl_ConductorEnabled_Call {
return &SequencerControl_ConductorEnabled_Call{Call: _e.mock.On("ConductorEnabled", ctx)}
}
func (_c *SequencerControl_ConductorEnabled_Call) Run(run func(ctx context.Context)) *SequencerControl_ConductorEnabled_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *SequencerControl_ConductorEnabled_Call) Return(_a0 bool, _a1 error) *SequencerControl_ConductorEnabled_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *SequencerControl_ConductorEnabled_Call) RunAndReturn(run func(context.Context) (bool, error)) *SequencerControl_ConductorEnabled_Call {
_c.Call.Return(run)
return _c
}
// LatestUnsafeBlock provides a mock function with given fields: ctx // LatestUnsafeBlock provides a mock function with given fields: ctx
func (_m *SequencerControl) LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error) { func (_m *SequencerControl) LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error) {
ret := _m.Called(ctx) ret := _m.Called(ctx)
......
...@@ -18,6 +18,7 @@ type SequencerControl interface { ...@@ -18,6 +18,7 @@ type SequencerControl interface {
SequencerActive(ctx context.Context) (bool, error) SequencerActive(ctx context.Context) (bool, error)
LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error) LatestUnsafeBlock(ctx context.Context) (eth.BlockInfo, error)
PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
ConductorEnabled(ctx context.Context) (bool, error)
} }
// NewSequencerControl creates a new SequencerControl instance. // NewSequencerControl creates a new SequencerControl instance.
...@@ -59,3 +60,8 @@ func (s *sequencerController) SequencerActive(ctx context.Context) (bool, error) ...@@ -59,3 +60,8 @@ func (s *sequencerController) SequencerActive(ctx context.Context) (bool, error)
func (s *sequencerController) PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error { func (s *sequencerController) PostUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return s.node.PostUnsafePayload(ctx, payload) return s.node.PostUnsafePayload(ctx, payload)
} }
// ConductorEnabled implements SequencerControl.
func (s *sequencerController) ConductorEnabled(ctx context.Context) (bool, error) {
return s.node.ConductorEnabled(ctx)
}
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil" "github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/retry"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
) )
...@@ -140,6 +141,25 @@ func (c *OpConductor) initSequencerControl(ctx context.Context) error { ...@@ -140,6 +141,25 @@ func (c *OpConductor) initSequencerControl(ctx context.Context) error {
node := sources.NewRollupClient(nc) node := sources.NewRollupClient(nc)
c.ctrl = client.NewSequencerControl(exec, node) c.ctrl = client.NewSequencerControl(exec, node)
enabled, err := retry.Do(ctx, 60, retry.Fixed(5*time.Second), func() (bool, error) {
enabled, err := c.ctrl.ConductorEnabled(ctx)
if rpcErr, ok := err.(rpc.Error); ok {
errCode := rpcErr.ErrorCode()
errText := strings.ToLower(err.Error())
if errCode == -32601 || strings.Contains(errText, "method not found") { // method not found error
c.log.Warn("Warning: conductorEnabled method not found, please upgrade your op-node to the latest version, continuing...")
return true, nil
}
}
return enabled, err
})
if err != nil {
return errors.Wrap(err, "failed to connect to sequencer")
}
if !enabled {
return errors.New("conductor is not enabled on sequencer, exiting...")
}
return c.updateSequencerActiveStatus() return c.updateSequencerActiveStatus()
} }
......
...@@ -241,6 +241,10 @@ func (s *l2VerifierBackend) OnUnsafeL2Payload(ctx context.Context, envelope *eth ...@@ -241,6 +241,10 @@ func (s *l2VerifierBackend) OnUnsafeL2Payload(ctx context.Context, envelope *eth
return nil return nil
} }
func (s *l2VerifierBackend) ConductorEnabled(ctx context.Context) (bool, error) {
return false, nil
}
func (s *L2Verifier) DerivationMetricsTracer() *testutils.TestDerivationMetrics { func (s *L2Verifier) DerivationMetricsTracer() *testutils.TestDerivationMetrics {
return s.derivationMetrics return s.derivationMetrics
} }
......
...@@ -34,6 +34,7 @@ type driverClient interface { ...@@ -34,6 +34,7 @@ type driverClient interface {
SequencerActive(context.Context) (bool, error) SequencerActive(context.Context) (bool, error)
OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
OverrideLeader(ctx context.Context) error OverrideLeader(ctx context.Context) error
ConductorEnabled(ctx context.Context) (bool, error)
} }
type SafeDBReader interface { type SafeDBReader interface {
...@@ -98,6 +99,13 @@ func (n *adminAPI) OverrideLeader(ctx context.Context) error { ...@@ -98,6 +99,13 @@ func (n *adminAPI) OverrideLeader(ctx context.Context) error {
return n.dr.OverrideLeader(ctx) return n.dr.OverrideLeader(ctx)
} }
// ConductorEnabled returns true if the sequencer conductor is enabled.
func (n *adminAPI) ConductorEnabled(ctx context.Context) (bool, error) {
recordDur := n.M.RecordRPCServerRequest("admin_conductorEnabled")
defer recordDur()
return n.dr.ConductorEnabled(ctx)
}
type nodeAPI struct { type nodeAPI struct {
config *rollup.Config config *rollup.Config
client l2EthClient client l2EthClient
......
...@@ -32,7 +32,7 @@ type ConductorClient struct { ...@@ -32,7 +32,7 @@ type ConductorClient struct {
var _ conductor.SequencerConductor = &ConductorClient{} var _ conductor.SequencerConductor = &ConductorClient{}
// NewConductorClient returns a new conductor client for the op-conductor RPC service. // NewConductorClient returns a new conductor client for the op-conductor RPC service.
func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) *ConductorClient { func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) conductor.SequencerConductor {
return &ConductorClient{ return &ConductorClient{
cfg: cfg, cfg: cfg,
metrics: metrics, metrics: metrics,
...@@ -53,6 +53,11 @@ func (c *ConductorClient) initialize() error { ...@@ -53,6 +53,11 @@ func (c *ConductorClient) initialize() error {
return nil return nil
} }
// Enabled returns true if the conductor is enabled, and since the conductor client is initialized, the conductor is always enabled.
func (c *ConductorClient) Enabled(ctx context.Context) bool {
return true
}
// Leader returns true if this node is the leader sequencer. // Leader returns true if this node is the leader sequencer.
func (c *ConductorClient) Leader(ctx context.Context) (bool, error) { func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
if c.overrideLeader.Load() { if c.overrideLeader.Load() {
......
...@@ -287,6 +287,10 @@ func (c *mockDriverClient) OverrideLeader(ctx context.Context) error { ...@@ -287,6 +287,10 @@ func (c *mockDriverClient) OverrideLeader(ctx context.Context) error {
return c.Mock.MethodCalled("OverrideLeader").Get(0).(error) return c.Mock.MethodCalled("OverrideLeader").Get(0).(error)
} }
func (c *mockDriverClient) ConductorEnabled(ctx context.Context) (bool, error) {
return c.Mock.MethodCalled("ConductorEnabled").Get(0).(bool), nil
}
type mockSafeDBReader struct { type mockSafeDBReader struct {
mock.Mock mock.Mock
} }
......
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
// SequencerConductor is an interface for the driver to communicate with the sequencer conductor. // SequencerConductor is an interface for the driver to communicate with the sequencer conductor.
// It is used to determine if the current node is the active sequencer, and to commit unsafe payloads to the conductor log. // It is used to determine if the current node is the active sequencer, and to commit unsafe payloads to the conductor log.
type SequencerConductor interface { type SequencerConductor interface {
// Enabled returns true if the conductor is enabled.
Enabled(ctx context.Context) bool
// Leader returns true if this node is the leader sequencer. // Leader returns true if this node is the leader sequencer.
Leader(ctx context.Context) (bool, error) Leader(ctx context.Context) (bool, error)
// CommitUnsafePayload commits an unsafe payload to the conductor FSM. // CommitUnsafePayload commits an unsafe payload to the conductor FSM.
...@@ -24,6 +26,11 @@ type NoOpConductor struct{} ...@@ -24,6 +26,11 @@ type NoOpConductor struct{}
var _ SequencerConductor = &NoOpConductor{} var _ SequencerConductor = &NoOpConductor{}
// Enabled implements SequencerConductor.
func (c *NoOpConductor) Enabled(ctx context.Context) bool {
return false
}
// Leader returns true if this node is the leader sequencer. NoOpConductor always returns true. // Leader returns true if this node is the leader sequencer. NoOpConductor always returns true.
func (c *NoOpConductor) Leader(ctx context.Context) (bool, error) { func (c *NoOpConductor) Leader(ctx context.Context) (bool, error) {
return true, nil return true, nil
......
...@@ -483,6 +483,10 @@ func (s *Driver) OverrideLeader(ctx context.Context) error { ...@@ -483,6 +483,10 @@ func (s *Driver) OverrideLeader(ctx context.Context) error {
return s.sequencer.OverrideLeader(ctx) return s.sequencer.OverrideLeader(ctx)
} }
func (s *Driver) ConductorEnabled(ctx context.Context) (bool, error) {
return s.sequencer.ConductorEnabled(ctx), nil
}
// SyncStatus blocks the driver event loop and captures the syncing status. // SyncStatus blocks the driver event loop and captures the syncing status.
func (s *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { func (s *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
return s.statusTracker.SyncStatus(), nil return s.statusTracker.SyncStatus(), nil
......
...@@ -48,4 +48,8 @@ func (ds DisabledSequencer) OverrideLeader(ctx context.Context) error { ...@@ -48,4 +48,8 @@ func (ds DisabledSequencer) OverrideLeader(ctx context.Context) error {
return ErrSequencerNotEnabled return ErrSequencerNotEnabled
} }
func (ds DisabledSequencer) ConductorEnabled(ctx context.Context) bool {
return false
}
func (ds DisabledSequencer) Close() {} func (ds DisabledSequencer) Close() {}
...@@ -19,5 +19,6 @@ type SequencerIface interface { ...@@ -19,5 +19,6 @@ type SequencerIface interface {
Stop(ctx context.Context) (hash common.Hash, err error) Stop(ctx context.Context) (hash common.Hash, err error)
SetMaxSafeLag(ctx context.Context, v uint64) error SetMaxSafeLag(ctx context.Context, v uint64) error
OverrideLeader(ctx context.Context) error OverrideLeader(ctx context.Context) error
ConductorEnabled(ctx context.Context) bool
Close() Close()
} }
...@@ -617,8 +617,6 @@ func (d *Sequencer) Init(ctx context.Context, active bool) error { ...@@ -617,8 +617,6 @@ func (d *Sequencer) Init(ctx context.Context, active bool) error {
d.emitter.Emit(engine.ForkchoiceRequestEvent{}) d.emitter.Emit(engine.ForkchoiceRequestEvent{})
if active { if active {
// TODO(#11121): should the conductor be checked on startup?
// The conductor was previously not being checked in this case, but that may be a bug.
return d.forceStart() return d.forceStart()
} else { } else {
if err := d.listener.SequencerStopped(); err != nil { if err := d.listener.SequencerStopped(); err != nil {
...@@ -712,6 +710,10 @@ func (d *Sequencer) OverrideLeader(ctx context.Context) error { ...@@ -712,6 +710,10 @@ func (d *Sequencer) OverrideLeader(ctx context.Context) error {
return d.conductor.OverrideLeader(ctx) return d.conductor.OverrideLeader(ctx)
} }
func (d *Sequencer) ConductorEnabled(ctx context.Context) bool {
return d.conductor.Enabled(ctx)
}
func (d *Sequencer) Close() { func (d *Sequencer) Close() {
d.conductor.Close() d.conductor.Close()
d.asyncGossip.Stop() d.asyncGossip.Stop()
......
...@@ -105,6 +105,10 @@ type FakeConductor struct { ...@@ -105,6 +105,10 @@ type FakeConductor struct {
var _ conductor.SequencerConductor = &FakeConductor{} var _ conductor.SequencerConductor = &FakeConductor{}
func (c *FakeConductor) Enabled(ctx context.Context) bool {
return true
}
func (c *FakeConductor) Leader(ctx context.Context) (bool, error) { func (c *FakeConductor) Leader(ctx context.Context) (bool, error) {
return c.leader, nil return c.leader, nil
} }
......
...@@ -74,6 +74,12 @@ func (r *RollupClient) OverrideLeader(ctx context.Context) error { ...@@ -74,6 +74,12 @@ func (r *RollupClient) OverrideLeader(ctx context.Context) error {
return r.rpc.CallContext(ctx, nil, "admin_overrideLeader") return r.rpc.CallContext(ctx, nil, "admin_overrideLeader")
} }
func (r *RollupClient) ConductorEnabled(ctx context.Context) (bool, error) {
var result bool
err := r.rpc.CallContext(ctx, &result, "admin_conductorEnabled")
return result, err
}
func (r *RollupClient) SetLogLevel(ctx context.Context, lvl slog.Level) error { func (r *RollupClient) SetLogLevel(ctx context.Context, lvl slog.Level) error {
return r.rpc.CallContext(ctx, nil, "admin_setLogLevel", lvl.String()) return r.rpc.CallContext(ctx, nil, "admin_setLogLevel", lvl.String())
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment