Commit 103efd0d authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] rpc api / client / backend (#8874)

* Add API interface

* Finish API Server implementation

* Finish API client implementation

* Finish rpc server setup in conductor

* minor fixup

* Fix merge issue
parent 8ba2e1e1
......@@ -17,9 +17,12 @@ import (
"github.com/ethereum-optimism/optimism/op-conductor/client"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-conductor/health"
conductorrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
......@@ -93,6 +96,9 @@ func (c *OpConductor) init(ctx context.Context) error {
if err := c.initHealthMonitor(ctx); err != nil {
return errors.Wrap(err, "failed to initialize health monitor")
}
if err := c.initRPCServer(ctx); err != nil {
return errors.Wrap(err, "failed to initialize rpc server")
}
return nil
}
......@@ -174,6 +180,23 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
return nil
}
func (oc *OpConductor) initRPCServer(ctx context.Context) error {
server := oprpc.NewServer(
oc.cfg.RPC.ListenAddr,
oc.cfg.RPC.ListenPort,
oc.version,
oprpc.WithLogger(oc.log),
)
api := conductorrpc.NewAPIBackend(oc.log, oc)
server.AddAPI(rpc.API{
Namespace: conductorrpc.RPCNamespace,
Version: oc.version,
Service: api,
})
oc.rpcServer = server
return nil
}
// OpConductor represents a full conductor instance and its resources, it does:
// 1. performs health checks on sequencer
// 2. participate in consensus protocol for leader election
......@@ -210,6 +233,8 @@ type OpConductor struct {
stopped atomic.Bool
shutdownCtx context.Context
shutdownCancel context.CancelFunc
rpcServer *oprpc.Server
}
var _ cliapp.Lifecycle = (*OpConductor)(nil)
......@@ -222,6 +247,11 @@ func (oc *OpConductor) Start(ctx context.Context) error {
return errors.Wrap(err, "failed to start health monitor")
}
oc.log.Info("starting JSON-RPC server")
if err := oc.rpcServer.Start(); err != nil {
return errors.Wrap(err, "failed to start JSON-RPC server")
}
oc.shutdownCtx, oc.shutdownCancel = context.WithCancel(ctx)
oc.wg.Add(1)
go oc.loop()
......@@ -244,6 +274,10 @@ func (oc *OpConductor) Stop(ctx context.Context) error {
oc.shutdownCancel()
oc.wg.Wait()
if err := oc.rpcServer.Stop(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to stop rpc server"))
}
// stop health check
if err := oc.hmon.Stop(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to stop health monitor"))
......@@ -295,6 +329,46 @@ func (oc *OpConductor) Paused() bool {
return oc.paused.Load()
}
// Leader returns true if OpConductor is the leader.
func (oc *OpConductor) Leader(_ context.Context) bool {
return oc.cons.Leader()
}
// LeaderWithID returns the current leader's server ID and address.
func (oc *OpConductor) LeaderWithID(_ context.Context) (string, string) {
return oc.cons.LeaderWithID()
}
// AddServerAsVoter adds a server as a voter to the cluster.
func (oc *OpConductor) AddServerAsVoter(_ context.Context, id string, addr string) error {
return oc.cons.AddVoter(id, addr)
}
// AddServerAsNonvoter adds a server as a non-voter to the cluster. non-voter will not participate in leader election.
func (oc *OpConductor) AddServerAsNonvoter(_ context.Context, id string, addr string) error {
return oc.cons.AddNonVoter(id, addr)
}
// RemoveServer removes a server from the cluster.
func (oc *OpConductor) RemoveServer(_ context.Context, id string) error {
return oc.cons.RemoveServer(id)
}
// TransferLeader transfers leadership to another server.
func (oc *OpConductor) TransferLeader(_ context.Context) error {
return oc.cons.TransferLeader()
}
// TransferLeaderToServer transfers leadership to a specific server.
func (oc *OpConductor) TransferLeaderToServer(_ context.Context, id string, addr string) error {
return oc.cons.TransferLeaderTo(id, addr)
}
// CommitUnsafePayload commits a unsafe payload (lastest head) to the cluster FSM.
func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.ExecutionPayload) error {
return oc.cons.CommitUnsafePayload(payload)
}
func (oc *OpConductor) loop() {
defer oc.wg.Done()
......@@ -444,6 +518,9 @@ func (oc *OpConductor) startSequencer() error {
// When starting sequencer, we need to make sure that the current node has the latest unsafe head from the consensus protocol
// If not, then we wait for the unsafe head to catch up or gossip it to op-node manually from op-conductor.
unsafeInCons := oc.cons.LatestUnsafePayload()
if unsafeInCons == nil {
return errors.New("failed to get latest unsafe block from consensus")
}
unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(context.Background())
if err != nil {
return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase")
......@@ -460,7 +537,7 @@ func (oc *OpConductor) startSequencer() error {
if uint64(unsafeInCons.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
if err = oc.ctrl.PostUnsafePayload(context.Background(), &unsafeInCons); err != nil {
if err = oc.ctrl.PostUnsafePayload(context.Background(), unsafeInCons); err != nil {
oc.log.Error("failed to post unsafe head payload to op-node", "err", err)
}
}
......
......@@ -275,7 +275,7 @@ func (s *OpConductorTestSuite) TestScenario2() {
func (s *OpConductorTestSuite) TestScenario3() {
s.enableSynchronization()
mockPayload := eth.ExecutionPayload{
mockPayload := &eth.ExecutionPayload{
BlockNumber: 1,
Timestamp: hexutil.Uint64(time.Now().Unix()),
BlockHash: [32]byte{1, 2, 3},
......@@ -311,7 +311,7 @@ func (s *OpConductorTestSuite) TestScenario4() {
// unsafe in consensus is 1 block ahead of unsafe in sequencer, we try to post the unsafe payload to sequencer and return error to allow retry
// this is normal because the latest unsafe (in consensus) might not arrive at sequencer through p2p yet
mockPayload := eth.ExecutionPayload{
mockPayload := &eth.ExecutionPayload{
BlockNumber: 2,
Timestamp: hexutil.Uint64(time.Now().Unix()),
BlockHash: [32]byte{1, 2, 3},
......
......@@ -20,6 +20,8 @@ type Consensus interface {
LeaderCh() <-chan bool
// Leader returns if it is the leader of the cluster.
Leader() bool
// LeaderWithID returns the leader's server ID and address.
LeaderWithID() (string, string)
// ServerID returns the server ID of the consensus.
ServerID() string
// TransferLeader triggers leadership transfer to another member in the cluster.
......@@ -28,9 +30,9 @@ type Consensus interface {
TransferLeaderTo(id, addr string) error
// CommitPayload commits latest unsafe payload to the FSM.
CommitUnsafePayload(payload eth.ExecutionPayload) error
CommitUnsafePayload(payload *eth.ExecutionPayload) error
// LatestUnsafeBlock returns the latest unsafe payload from FSM.
LatestUnsafePayload() eth.ExecutionPayload
LatestUnsafePayload() *eth.ExecutionPayload
// Shutdown shuts down the consensus protocol client.
Shutdown() error
......
......@@ -107,11 +107,11 @@ func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string) error)
}
// CommitUnsafePayload provides a mock function with given fields: payload
func (_m *Consensus) CommitUnsafePayload(payload eth.ExecutionPayload) error {
func (_m *Consensus) CommitUnsafePayload(payload *eth.ExecutionPayload) error {
ret := _m.Called(payload)
var r0 error
if rf, ok := ret.Get(0).(func(eth.ExecutionPayload) error); ok {
if rf, ok := ret.Get(0).(func(*eth.ExecutionPayload) error); ok {
r0 = rf(payload)
} else {
r0 = ret.Error(0)
......@@ -126,14 +126,14 @@ type Consensus_CommitUnsafePayload_Call struct {
}
// CommitUnsafePayload is a helper method to define mock.On call
// - payload eth.ExecutionPayload
// - payload *eth.ExecutionPayload
func (_e *Consensus_Expecter) CommitUnsafePayload(payload interface{}) *Consensus_CommitUnsafePayload_Call {
return &Consensus_CommitUnsafePayload_Call{Call: _e.mock.On("CommitUnsafePayload", payload)}
}
func (_c *Consensus_CommitUnsafePayload_Call) Run(run func(payload eth.ExecutionPayload)) *Consensus_CommitUnsafePayload_Call {
func (_c *Consensus_CommitUnsafePayload_Call) Run(run func(payload *eth.ExecutionPayload)) *Consensus_CommitUnsafePayload_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(eth.ExecutionPayload))
run(args[0].(*eth.ExecutionPayload))
})
return _c
}
......@@ -143,7 +143,7 @@ func (_c *Consensus_CommitUnsafePayload_Call) Return(_a0 error) *Consensus_Commi
return _c
}
func (_c *Consensus_CommitUnsafePayload_Call) RunAndReturn(run func(eth.ExecutionPayload) error) *Consensus_CommitUnsafePayload_Call {
func (_c *Consensus_CommitUnsafePayload_Call) RunAndReturn(run func(*eth.ExecutionPayload) error) *Consensus_CommitUnsafePayload_Call {
_c.Call.Return(run)
return _c
}
......@@ -191,14 +191,16 @@ func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string) error) *Cons
}
// LatestUnsafePayload provides a mock function with given fields:
func (_m *Consensus) LatestUnsafePayload() eth.ExecutionPayload {
func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayload {
ret := _m.Called()
var r0 eth.ExecutionPayload
if rf, ok := ret.Get(0).(func() eth.ExecutionPayload); ok {
var r0 *eth.ExecutionPayload
if rf, ok := ret.Get(0).(func() *eth.ExecutionPayload); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(eth.ExecutionPayload)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*eth.ExecutionPayload)
}
}
return r0
......@@ -221,12 +223,12 @@ func (_c *Consensus_LatestUnsafePayload_Call) Run(run func()) *Consensus_LatestU
return _c
}
func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call {
func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 *eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call {
func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() *eth.ExecutionPayload) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(run)
return _c
}
......@@ -315,6 +317,57 @@ func (_c *Consensus_LeaderCh_Call) RunAndReturn(run func() <-chan bool) *Consens
return _c
}
// LeaderWithID provides a mock function with given fields:
func (_m *Consensus) LeaderWithID() (string, string) {
ret := _m.Called()
var r0 string
var r1 string
if rf, ok := ret.Get(0).(func() (string, string)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func() string); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(string)
}
return r0, r1
}
// Consensus_LeaderWithID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LeaderWithID'
type Consensus_LeaderWithID_Call struct {
*mock.Call
}
// LeaderWithID is a helper method to define mock.On call
func (_e *Consensus_Expecter) LeaderWithID() *Consensus_LeaderWithID_Call {
return &Consensus_LeaderWithID_Call{Call: _e.mock.On("LeaderWithID")}
}
func (_c *Consensus_LeaderWithID_Call) Run(run func()) *Consensus_LeaderWithID_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_LeaderWithID_Call) Return(_a0 string, _a1 string) *Consensus_LeaderWithID_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Consensus_LeaderWithID_Call) RunAndReturn(run func() (string, string)) *Consensus_LeaderWithID_Call {
_c.Call.Return(run)
return _c
}
// RemoveServer provides a mock function with given fields: id
func (_m *Consensus) RemoveServer(id string) error {
ret := _m.Called(id)
......
......@@ -144,6 +144,12 @@ func (rc *RaftConsensus) Leader() bool {
return id == rc.serverID
}
// LeaderWithID implements Consensus, it returns the leader's server ID and address.
func (rc *RaftConsensus) LeaderWithID() (string, string) {
addr, id := rc.r.LeaderWithID()
return string(id), string(addr)
}
// LeaderCh implements Consensus, it returns a channel that will be notified when leadership status changes (true = leader, false = follower).
func (rc *RaftConsensus) LeaderCh() <-chan bool {
return rc.r.LeaderCh()
......@@ -196,7 +202,7 @@ func (rc *RaftConsensus) Shutdown() error {
}
// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM.
func (rc *RaftConsensus) CommitUnsafePayload(payload eth.ExecutionPayload) error {
func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayload) error {
blockVersion := eth.BlockV1
if rc.rollupCfg.IsCanyon(uint64(payload.Timestamp)) {
blockVersion = eth.BlockV2
......@@ -204,7 +210,7 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload eth.ExecutionPayload) error
data := unsafeHeadData{
version: blockVersion,
payload: payload,
payload: *payload,
}
var buf bytes.Buffer
......@@ -221,6 +227,7 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload eth.ExecutionPayload) error
}
// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM.
func (rc *RaftConsensus) LatestUnsafePayload() eth.ExecutionPayload {
return rc.unsafeTracker.UnsafeHead()
func (rc *RaftConsensus) LatestUnsafePayload() *eth.ExecutionPayload {
payload := rc.unsafeTracker.UnsafeHead()
return &payload
}
......@@ -36,7 +36,7 @@ func TestCommitAndRead(t *testing.T) {
<-cons.LeaderCh()
// eth.BlockV1
payload := eth.ExecutionPayload{
payload := &eth.ExecutionPayload{
BlockNumber: 1,
Timestamp: hexutil.Uint64(now - 20),
Transactions: []eth.Data{},
......@@ -50,7 +50,7 @@ func TestCommitAndRead(t *testing.T) {
require.Equal(t, payload, unsafeHead)
// eth.BlockV2
payload = eth.ExecutionPayload{
payload = &eth.ExecutionPayload{
BlockNumber: 2,
Timestamp: hexutil.Uint64(time.Now().Unix()),
Transactions: []eth.Data{},
......
package rpc
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type ServerInfo struct {
ID string `json:"id"`
Addr string `json:"addr"`
}
// API defines the interface for the op-conductor API.
type API interface {
// Pause pauses op-conductor.
Pause(ctx context.Context) error
// Resume resumes op-conductor.
Resume(ctx context.Context) error
// Consensus related APIs
// Leader returns true if the server is the leader.
Leader(ctx context.Context) (bool, error)
// LeaderWithID returns the current leader's server info.
LeaderWithID(ctx context.Context) (*ServerInfo, error)
// AddServerAsVoter adds a server as a voter to the cluster.
AddServerAsVoter(ctx context.Context, id string, addr string) error
// AddServerAsNonvoter adds a server as a non-voter to the cluster. non-voter will not participate in leader election.
AddServerAsNonvoter(ctx context.Context, id string, addr string) error
// RemoveServer removes a server from the cluster.
RemoveServer(ctx context.Context, id string) error
// TransferLeader transfers leadership to another server.
TransferLeader(ctx context.Context) error
// TransferLeaderToServer transfers leadership to a specific server.
TransferLeaderToServer(ctx context.Context, id string, addr string) error
// APIs called by op-node
// Active returns true if op-conductor is active.
Active(ctx context.Context) (bool, error)
// CommitUnsafePayload commits a unsafe payload (lastest head) to the consensus layer.
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error
}
package rpc
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type conductor interface {
Pause(ctx context.Context) error
Resume(ctx context.Context) error
Paused() bool
Stopped() bool
Leader(ctx context.Context) bool
LeaderWithID(ctx context.Context) (string, string)
AddServerAsVoter(ctx context.Context, id string, addr string) error
AddServerAsNonvoter(ctx context.Context, id string, addr string) error
RemoveServer(ctx context.Context, id string) error
TransferLeader(ctx context.Context) error
TransferLeaderToServer(ctx context.Context, id string, addr string) error
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error
}
// APIBackend is the backend implementation of the API.
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/45) Add metrics tracer here.
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/44) add tests after e2e setup.
type APIBackend struct {
log log.Logger
con conductor
}
// NewAPIBackend creates a new APIBackend instance.
func NewAPIBackend(log log.Logger, con conductor) *APIBackend {
return &APIBackend{
log: log,
con: con,
}
}
var _ API = (*APIBackend)(nil)
// Active implements API.
func (api *APIBackend) Active(_ context.Context) (bool, error) {
return !api.con.Stopped() && !api.con.Paused(), nil
}
// AddServerAsNonvoter implements API.
func (api *APIBackend) AddServerAsNonvoter(ctx context.Context, id string, addr string) error {
return api.con.AddServerAsNonvoter(ctx, id, addr)
}
// AddServerAsVoter implements API.
func (api *APIBackend) AddServerAsVoter(ctx context.Context, id string, addr string) error {
return api.con.AddServerAsVoter(ctx, id, addr)
}
// CommitUnsafePayload implements API.
func (api *APIBackend) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error {
return api.con.CommitUnsafePayload(ctx, payload)
}
// Leader implements API, returns true if current conductor is leader of the cluster.
func (api *APIBackend) Leader(ctx context.Context) (bool, error) {
return api.con.Leader(ctx), nil
}
// LeaderWithID implements API, returns the leader's server ID and address (not necessarily the current conductor).
func (api *APIBackend) LeaderWithID(ctx context.Context) (*ServerInfo, error) {
id, addr := api.con.LeaderWithID(ctx)
return &ServerInfo{
ID: id,
Addr: addr,
}, nil
}
// Pause implements API.
func (api *APIBackend) Pause(ctx context.Context) error {
return api.con.Pause(ctx)
}
// RemoveServer implements API.
func (api *APIBackend) RemoveServer(ctx context.Context, id string) error {
return api.con.RemoveServer(ctx, id)
}
// Resume implements API.
func (api *APIBackend) Resume(ctx context.Context) error {
return api.con.Resume(ctx)
}
// TransferLeader implements API.
func (api *APIBackend) TransferLeader(ctx context.Context) error {
return api.con.TransferLeader(ctx)
}
// TransferLeaderToServer implements API.
func (api *APIBackend) TransferLeaderToServer(ctx context.Context, id string, addr string) error {
return api.con.TransferLeaderToServer(ctx, id, addr)
}
package rpc
import (
"context"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
var RPCNamespace = "conductor"
// APIClient provides a client for calling API methods.
type APIClient struct {
c *rpc.Client
}
var _ API = (*APIClient)(nil)
// NewAPIClient creates a new APIClient instance.
func NewAPIClient(c *rpc.Client) *APIClient {
return &APIClient{c: c}
}
func prefixRPC(method string) string {
return RPCNamespace + "_" + method
}
// Active implements API.
func (c *APIClient) Active(ctx context.Context) (bool, error) {
var active bool
err := c.c.CallContext(ctx, &active, prefixRPC("active"))
return active, err
}
// AddServerAsNonvoter implements API.
func (c *APIClient) AddServerAsNonvoter(ctx context.Context, id string, addr string) error {
return c.c.CallContext(ctx, nil, prefixRPC("addServerAsNonvoter"), id, addr)
}
// AddServerAsVoter implements API.
func (c *APIClient) AddServerAsVoter(ctx context.Context, id string, addr string) error {
return c.c.CallContext(ctx, nil, prefixRPC("addServerAsVoter"), id, addr)
}
// CommitUnsafePayload implements API.
func (c *APIClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload) error {
return c.c.CallContext(ctx, nil, prefixRPC("commitUnsafePayload"), payload)
}
// Leader implements API.
func (c *APIClient) Leader(ctx context.Context) (bool, error) {
var leader bool
err := c.c.CallContext(ctx, &leader, prefixRPC("leader"))
return leader, err
}
// LeaderWithID implements API.
func (c *APIClient) LeaderWithID(ctx context.Context) (*ServerInfo, error) {
var info *ServerInfo
err := c.c.CallContext(ctx, &info, prefixRPC("leaderWithID"))
return info, err
}
// Pause implements API.
func (c *APIClient) Pause(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("pause"))
}
// RemoveServer implements API.
func (c *APIClient) RemoveServer(ctx context.Context, id string) error {
return c.c.CallContext(ctx, nil, prefixRPC("removeServer"), id)
}
// Resume implements API.
func (c *APIClient) Resume(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("resume"))
}
// TransferLeader implements API.
func (c *APIClient) TransferLeader(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("transferLeader"))
}
// TransferLeaderToServer implements API.
func (c *APIClient) TransferLeaderToServer(ctx context.Context, id string, addr string) error {
return c.c.CallContext(ctx, nil, prefixRPC("transferLeaderToServer"), id, addr)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment