SignalingController.go 6.28 KB
Newer Older
duanjinfei's avatar
duanjinfei committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
package controllers

import (
	"encoding/json"
	"github.com/astaxie/beego"
	"github.com/ethereum/go-ethereum/accounts/abi/bind"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/ethclient"
	"math/big"
	"schedulerNode/appNodeInterface"
	nodeCommon "schedulerNode/common"
	application "schedulerNode/contract/go_file"
	"schedulerNode/log"
	"schedulerNode/models"
	"schedulerNode/utils"
	"strings"
)

type SignalingController struct {
	BaseController
}

func (c *SignalingController) GetConnectSignalingStatus() {
	// TODO: 获取前端传入的ExternalIp 以及 Port
	body := c.Ctx.Input.RequestBody
	signallingStatus := models.SignallingConnectStatusReq{}
	err := json.Unmarshal(body, &signallingStatus)
	if err != nil {
		c.ResponseInfo(500, "Error unmarshalling", err)
		return
	}
	rpcUrl := beego.AppConfig.String("chain::rpcUrl")
	appContract := beego.AppConfig.String("contract_address::app_contract")
	client, err := ethclient.Dial(rpcUrl)
	if err != nil {
		c.ResponseInfo(nodeCommon.FaileCode, "Error connecting", err)
	}
	defer client.Close()
	appContractAddr := common.HexToAddress(appContract)
	applicationInstance, err := application.NewApplication(appContractAddr, client)
	if err != nil {
		c.ResponseInfo(nodeCommon.FaileCode, "Error creating application", err)
	}
	network, err := applicationInstance.GetNodeNetwork(&bind.CallOpts{}, signallingStatus.NodeId)
	if err != nil {
		c.ResponseInfo(nodeCommon.FaileCode, "Error get NodeNetwork application", err)
	}
	// TODO: 调用App节点接口获取连接信令状态
	statusResp := appNodeInterface.GetSignallingConnectStatus(network.ExternalIp, network.ServerPort)
	if !statusResp.Status || statusResp.Code != nodeCommon.SuccessCode {
		c.ResponseInfo(nodeCommon.SuccessCode, "disconnected", nil)
	}
	c.ResponseInfo(nodeCommon.SuccessCode, "success", nil)
}

func (c *SignalingController) GetAppNodeInfo() {
	body := c.Ctx.Input.RequestBody
	connectAppReq := &models.ConnectAppReq{}
	err := json.Unmarshal(body, connectAppReq)
	if err != nil {
		c.ResponseInfo(nodeCommon.FaileCode, "Error unmarshalling", err)
	}
	rpcUrl := beego.AppConfig.String("chain::rpcUrl")
	appContract := beego.AppConfig.String("contract_address::app_contract")
	client, err := ethclient.Dial(rpcUrl)
	if err != nil {
		c.ResponseInfo(nodeCommon.FaileCode, "Error connecting", err)
	}
	defer client.Close()
	// TODO 获取全部的节点Id
	appContractAddr := common.HexToAddress(appContract)
	applicationInstance, err := application.NewApplication(appContractAddr, client)
	if err != nil {
		c.ResponseInfo(nodeCommon.FaileCode, "Error creating application", err)
	}
	nodeIds, err := applicationInstance.GetNodeIds(&bind.CallOpts{}, connectAppReq.AppId, 0, 100)
	if err != nil {
		c.ResponseInfo(nodeCommon.FaileCode, "Error GetNodeIds ", err)
	}
	if len(nodeIds) == 0 {
		c.ResponseInfo(nodeCommon.SuccessCode, "There is currently no node provided to launch the application", nil)
	}
	nodeNetworkInfo := models.AppNodeNetworkInfo{}
	// TODO 通过节点ID获取网络信息
	for _, nodeId := range nodeIds {
		network, err := applicationInstance.GetNodeNetwork(&bind.CallOpts{}, nodeId)
		if err != nil {
			c.ResponseInfo(nodeCommon.FaileCode, "Error GetAppNodeNetwork ", err)
		}
		//if i == len(nodeIds)-1 {
		if utils.IsCreateVmCondition(network.ExternalIp) {
			nodeNetworkInfo.NodeId = nodeId
			nodeNetworkInfo.ExternalIp = network.ExternalIp
			nodeNetworkInfo.ServerPort = network.ServerPort
			nodeNetworkInfo.StunIp = network.StunIp
			nodeNetworkInfo.TurnIp = network.TurnIp
			nodeNetworkInfo.TurnUser = network.TurnUser
			nodeNetworkInfo.TurnPwd = network.TurnPwd
			nodeNetworkInfo.SignallingHttpIp = network.SignallingHttpIp
			nodeNetworkInfo.SignallingWsIp = network.SignallingWsIp
			break
		}
		//}
	}
	// TODO	如果没有对应的服务信息
	updateServerInfo(&nodeNetworkInfo)
	// TODO 通知应用节点连接信令节点
	sendSignallingReq := handlerSendSignallingInfo(&nodeNetworkInfo)
	rep := appNodeInterface.SendSignallingInfo(nodeNetworkInfo.ExternalIp, nodeNetworkInfo.ServerPort, sendSignallingReq)
	if rep.Code != nodeCommon.SuccessCode {
		log.Error("Error sending signalling information failed", rep.Msg)
		c.ResponseInfo(nodeCommon.FaileCode, "Notice signalling information failed", rep.Msg)
	}
	// TODO: 反馈前端信息
	res := models.AppNodeInfoRep{
		NodeID:           nodeNetworkInfo.NodeId,
		ExternalIp:       nodeNetworkInfo.ExternalIp,
		StunIp:           nodeNetworkInfo.StunIp,
		SignallingHttpIp: nodeNetworkInfo.SignallingHttpIp,
		SignallingWsIp:   nodeNetworkInfo.SignallingWsIp,
	}
	turnInfoArr := make([]*models.TurnInfo, 0)
	for i := 0; i < len(nodeNetworkInfo.TurnIp); i++ {
		turnInfo := &models.TurnInfo{
			TurnIp: nodeNetworkInfo.TurnIp[i],
			User:   nodeNetworkInfo.TurnUser[i],
			Pwd:    nodeNetworkInfo.TurnPwd[i],
		}
		turnInfoArr = append(turnInfoArr, turnInfo)
	}
	res.TurnInfo = turnInfoArr
	c.ResponseInfo(nodeCommon.SuccessCode, "success", res)
}

func handlerSendSignallingInfo(nodeNetworkInfo *models.AppNodeNetworkInfo) *models.SendSignallingReq {

	signallingHttpIpInfo := nodeNetworkInfo.SignallingHttpIp
	signallingHttpIpPort := strings.Split(signallingHttpIpInfo, ":")
	signallingHttpPort, _ := new(big.Int).SetString(signallingHttpIpPort[1], 10)

	signallingWsInfo := nodeNetworkInfo.SignallingWsIp
	signallingWsIpPort := strings.Split(signallingWsInfo, ":")
	signallingWsPort, _ := new(big.Int).SetString(signallingWsIpPort[1], 10)

	turnInfoReqArr := make([]*models.TurnInfoReq, 0)
	for i := 0; i < len(nodeNetworkInfo.TurnIp); i++ {
		turnInfoReq := &models.TurnInfoReq{
			Url:        nodeNetworkInfo.TurnIp[i],
			UserName:   nodeNetworkInfo.TurnUser[i],
			Credential: nodeNetworkInfo.TurnPwd[i],
		}
		turnInfoReqArr = append(turnInfoReqArr, turnInfoReq)
	}
	sendSignallingReq := &models.SendSignallingReq{
		MQTTConnectInfo: &models.MQTTConnectInfo{
			Broker: signallingHttpIpPort[0],
			Port:   signallingHttpPort,
			WsPort: signallingWsPort,
		},
		ICEConnectInfo: &models.ICEConnectInfo{
			Stuns: nodeNetworkInfo.StunIp,
			Turns: turnInfoReqArr,
		},
	}
	return sendSignallingReq
}

func updateServerInfo(info *models.AppNodeNetworkInfo) {
	// TODO: 获取网关合约提供的服务信息

	if len(info.TurnIp) == 0 {

	}
	if len(info.StunIp) == 0 {

	}
	if info.SignallingHttpIp == "" && info.SignallingWsIp == "" {

	}
	log.Info("updateServer", info.ExternalIp)
}