Commit dbb1be88 authored by vicotor's avatar vicotor

update protocol

parent 80fdafa1
...@@ -141,7 +141,7 @@ func main() { ...@@ -141,7 +141,7 @@ func main() {
msg := &omanager.WorkerMessage{ msg := &omanager.WorkerMessage{
Message: &omanager.WorkerMessage_SubmitTaskResult{ Message: &omanager.WorkerMessage_SubmitTaskResult{
SubmitTaskResult: &omanager.SubmitTaskResult{ SubmitTaskResult: &omanager.SubmitTaskResult{
TaskUuid: b.PushTaskMessage.TaskUuid, TaskId: b.PushTaskMessage.TaskId,
ContainerSignature: make([]byte, 65), ContainerSignature: make([]byte, 65),
MinerSignature: make([]byte, 65), MinerSignature: make([]byte, 65),
TaskResultBody: []byte(demoResult), TaskResultBody: []byte(demoResult),
......
...@@ -3,7 +3,6 @@ local_host="127.0.0.1" ...@@ -3,7 +3,6 @@ local_host="127.0.0.1"
port=10001 port=10001
metrics_port = 28010 metrics_port = 28010
private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC" private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC"
enable_pay = false
[redis] [redis]
addr="127.0.0.1:6379" addr="127.0.0.1:6379"
...@@ -22,6 +21,7 @@ brokers="127.0.0.1:9092" ...@@ -22,6 +21,7 @@ brokers="127.0.0.1:9092"
receipt_topic="taskreceipt" receipt_topic="taskreceipt"
[ticker] [ticker]
#second
heart_beat = 10 heart_beat = 10
status_ticker = 10 status_ticker = 10
device_info_ticker = 120 device_info_ticker = 120
......
...@@ -33,6 +33,7 @@ func (n *NodeManagerService) ManagerList(ctx context.Context, request *omanager. ...@@ -33,6 +33,7 @@ func (n *NodeManagerService) ManagerList(ctx context.Context, request *omanager.
func (n *NodeManagerService) RegisterWorker(client omanager.NodeManagerService_RegisterWorkerServer) error { func (n *NodeManagerService) RegisterWorker(client omanager.NodeManagerService_RegisterWorkerServer) error {
uuid := utils.GetSnowflakeId() uuid := utils.GetSnowflakeId()
worker, err := n.node.wm.AddNewWorker(uuid, client) worker, err := n.node.wm.AddNewWorker(uuid, client)
if err != nil { if err != nil {
return err return err
...@@ -65,7 +66,7 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager ...@@ -65,7 +66,7 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
} }
res := new(omanager.DispatchTaskResponse) res := new(omanager.DispatchTaskResponse)
res.TaskUuid = request.TaskData.TaskUuid res.TaskId = request.TaskData.TaskId
res.Miner = request.Miner res.Miner = request.Miner
return res, nil return res, nil
} }
...@@ -157,13 +157,13 @@ func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse ...@@ -157,13 +157,13 @@ func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse
if err != nil { if err != nil {
log.WithError(err).Error("marshal task response failed") log.WithError(err).Error("marshal task response failed")
} else { } else {
log.WithField("taskid", response.TaskUuid).Debug("marshal task response") log.WithField("task-id", response.TaskId).Debug("marshal task response")
} }
err = utils.Post(hook, d) err = utils.Post(hook, d)
if err != nil { if err != nil {
log.WithError(err).Error("post task result failed") log.WithError(err).Error("post task result failed")
} else { } else {
log.WithField("taskid", response.TaskUuid).Debug("post task result") log.WithField("task-id", response.TaskId).Debug("post task result")
} }
} }
...@@ -280,8 +280,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -280,8 +280,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
task := dtask.task task := dtask.task
taskMsg := new(omanager.ManagerMessage_PushTaskMessage) taskMsg := new(omanager.ManagerMessage_PushTaskMessage)
taskMsg.PushTaskMessage = &omanager.PushTaskMessage{ taskMsg.PushTaskMessage = &omanager.PushTaskMessage{
TaskUuid: task.TaskUuid, TaskId: task.TaskId,
TaskType: task.TaskType, TaskType: task.TaskType,
TaskKind: task.TaskKind,
Workload: uint64(task.TaskWorkload), Workload: uint64(task.TaskWorkload),
TaskCmd: task.TaskCmd, TaskCmd: task.TaskCmd,
TaskParam: task.TaskParam, TaskParam: task.TaskParam,
...@@ -290,7 +291,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -290,7 +291,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
callback = func(err error) bool { callback = func(err error) bool {
if err == nil { if err == nil {
// add task to cache. // add task to cache.
worker.recentTask.Add(task.TaskUuid, task) worker.recentTask.Add(task.TaskId, task)
} }
log.WithField("worker", worker.uuid).Info("dispatch task to worker") log.WithField("worker", worker.uuid).Info("dispatch task to worker")
...@@ -303,23 +304,23 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -303,23 +304,23 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
} }
case result := <-worker.resultCh: case result := <-worker.resultCh:
// verify result and make a new signature. // verify result and make a new signature.
data, exist := worker.recentTask.Get(result.TaskUuid) data, exist := worker.recentTask.Get(result.TaskId)
if !exist { if !exist {
log.WithField("worker", worker.uuid).Error("task not found for verify result") log.WithField("worker", worker.uuid).Error("task not found for verify result")
continue continue
} }
task := data.(*odysseus.TaskContent) task := data.(*odysseus.TaskContent)
if result.TaskUuid != task.TaskUuid { if result.TaskId != task.TaskId {
log.WithField("worker", worker.uuid).Error("task id not match") log.WithField("worker", worker.uuid).Error("task id not match")
continue continue
} }
log.WithField("task-uuid", task.TaskUuid).WithField("result", result).Debug("got task result") log.WithField("task-id", task.TaskId).WithField("result", result).Debug("got task result")
if result.IsSuccessed == false { if result.IsSuccessed == false {
taskResponse := &odysseus.TaskResponse{ taskResponse := &odysseus.TaskResponse{
TaskUuid: task.TaskUuid, TaskId: task.TaskId,
TaskResultHeader: result.TaskResultHeader, TaskResultHeader: result.TaskResultHeader,
TaskResultBody: result.TaskResultBody, TaskResultBody: result.TaskResultBody,
TaskUid: task.TaskUid, TaskUid: task.TaskUid,
...@@ -350,7 +351,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -350,7 +351,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result))) // miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
paramHash := crypto.Keccak256Hash(task.TaskParam) paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody) resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskUuid), paramHash[:], resultHash[:])) dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey
verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature) verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature)
log.WithField("minerSignatureVerify", verified).Debug("miner signature verify") log.WithField("minerSignatureVerify", verified).Debug("miner signature verify")
...@@ -365,7 +366,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -365,7 +366,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload)) //manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
paramHash := crypto.Keccak256Hash(task.TaskParam) paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody) resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskUuid), paramHash[:], resultHash[:], dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:],
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes())) worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()))
signature, err := wm.node.Sign(dataHash[:]) signature, err := wm.node.Sign(dataHash[:])
...@@ -376,20 +377,24 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -376,20 +377,24 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
proof := new(omanager.ManagerMessage_ProofTaskResult) proof := new(omanager.ManagerMessage_ProofTaskResult)
proof.ProofTaskResult = &omanager.ProofTaskResult{ proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskUuid: result.TaskUuid, TaskId: result.TaskId,
ManagerSignature: signature, ManagerSignature: signature,
Workload: uint64(task.TaskWorkload), Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey), ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
} }
log.WithFields(log.Fields{
"task-id": result.TaskId,
"workload": task.TaskWorkload,
}).Debug("send proof to worker")
msg.Message = proof msg.Message = proof
callback = func(err error) bool { callback = func(err error) bool {
if err == nil { if err == nil {
// remove task from cache. // remove task from cache.
worker.recentTask.Remove(result.TaskUuid) worker.recentTask.Remove(result.TaskId)
} }
taskResponse := &odysseus.TaskResponse{ taskResponse := &odysseus.TaskResponse{
TaskUuid: task.TaskUuid, TaskId: task.TaskId,
TaskResultHeader: result.TaskResultHeader, TaskResultHeader: result.TaskResultHeader,
TaskResultBody: result.TaskResultBody, TaskResultBody: result.TaskResultBody,
TaskUid: task.TaskUid, TaskUid: task.TaskUid,
...@@ -535,9 +540,9 @@ func (wm *WorkerManager) Payment(task *odysseus.TaskContent) error { ...@@ -535,9 +540,9 @@ func (wm *WorkerManager) Payment(task *odysseus.TaskContent) error {
func (wm *WorkerManager) makeReceipt(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult, err error) *odysseus.TaskReceipt { func (wm *WorkerManager) makeReceipt(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult, err error) *odysseus.TaskReceipt {
now := uint64(time.Now().UnixNano()) now := uint64(time.Now().UnixNano())
receipt := &odysseus.TaskReceipt{ receipt := &odysseus.TaskReceipt{
TaskUuid: task.TaskUuid,
TaskTimestamp: task.TaskTimestamp,
TaskId: task.TaskId, TaskId: task.TaskId,
TaskTimestamp: task.TaskTimestamp,
TaskType: task.TaskType,
TaskUid: task.TaskUid, TaskUid: task.TaskUid,
TaskWorkload: task.TaskWorkload, TaskWorkload: task.TaskWorkload,
TaskDuration: int64(now-task.TaskTimestamp) / 1000, TaskDuration: int64(now-task.TaskTimestamp) / 1000,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment