Commit e23d89f8 authored by vicotor's avatar vicotor

update log

parent 318381ff
package server package server
import ( import (
"encoding/hex"
"errors" "errors"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/odysseus/nodemanager/utils" "github.com/odysseus/nodemanager/utils"
...@@ -14,6 +13,7 @@ import ( ...@@ -14,6 +13,7 @@ import (
var ( var (
ErrExecuteFailed = errors.New("execute failed") ErrExecuteFailed = errors.New("execute failed")
ErrWorkerFailed = errors.New("worker failed")
) )
func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) { func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) {
...@@ -31,13 +31,13 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo ...@@ -31,13 +31,13 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
if worker.info.nodeInfo == nil { if worker.info.nodeInfo == nil {
return nil, errors.New("unknown worker node info") return nil, errors.New("unknown worker node info")
} }
log.WithFields(log.Fields{ //log.WithFields(log.Fields{
"task-id": task.TaskId, // "task-id": task.TaskId,
"task-type": task.TaskType, // "task-type": task.TaskType,
"task-kind": task.TaskKind, // "task-kind": task.TaskKind,
"task-success": result.IsSuccessed, // "task-success": result.IsSuccessed,
"task-execute-duration": result.TaskExecuteDuration, // "task-execute-duration": result.TaskExecuteDuration,
}).Debug("got task result") //}).Debug("got task result")
if result.IsSuccessed == false { if result.IsSuccessed == false {
taskResponse := &odysseus.TaskResponse{ taskResponse := &odysseus.TaskResponse{
...@@ -50,7 +50,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo ...@@ -50,7 +50,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
TaskResultCode: result.TaskResultCode, TaskResultCode: result.TaskResultCode,
TaskError: ErrExecuteFailed.Error(), TaskError: ErrExecuteFailed.Error(),
} }
receipt := wm.makeReceipt(worker, task, result, errors.New("worker failed")) receipt := wm.makeReceipt(worker, task, result, ErrWorkerFailed)
wm.node.PostResult(receipt) wm.node.PostResult(receipt)
go wm.doCallback(task.TaskCallback, taskResponse) go wm.doCallback(task.TaskCallback, taskResponse)
return nil, nil return nil, nil
...@@ -88,11 +88,9 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo ...@@ -88,11 +88,9 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
verified := crypto.VerifySignature(pubkey, dataHash[:], signature) verified := crypto.VerifySignature(pubkey, dataHash[:], signature)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"task-id": task.TaskId,
"containerSignatureVerify": verified, "containerSignatureVerify": verified,
"taskkind": task.TaskKind, "taskkind": task.TaskKind,
"dataHash": hex.EncodeToString(dataHash[:]),
"containerPubkey": hex.EncodeToString(pubkey),
"signature": hex.EncodeToString(signature),
}).Debug("container signature verify") }).Debug("container signature verify")
if !verified { if !verified {
// todo: handle signature verify failed // todo: handle signature verify failed
...@@ -108,6 +106,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo ...@@ -108,6 +106,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
} }
verified := crypto.VerifySignature(utils.FromHex(worker.info.nodeInfo.MinerPubkey), dataHash[:], signature) verified := crypto.VerifySignature(utils.FromHex(worker.info.nodeInfo.MinerPubkey), dataHash[:], signature)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified, "minerSignatureVerify": verified,
"taskkind": task.TaskKind, "taskkind": task.TaskKind,
}).Debug("miner signature verify") }).Debug("miner signature verify")
...@@ -141,9 +140,8 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo ...@@ -141,9 +140,8 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
}) })
wm.node.PostProof(proof) wm.node.PostProof(proof)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"task-id": result.TaskId, "task-id": result.TaskId,
"workload": task.TaskWorkload, }).Debug("send proof to kafka")
}).Debug("send proof to worker")
return proof, nil return proof, nil
} }
...@@ -151,16 +149,16 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC ...@@ -151,16 +149,16 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
if worker.info.nodeInfo == nil { if worker.info.nodeInfo == nil {
return nil, errors.New("unknown worker node info") return nil, errors.New("unknown worker node info")
} }
log.WithFields(log.Fields{ //log.WithFields(log.Fields{
"task-id": task.TaskId, // "task-id": task.TaskId,
"task-type": task.TaskType, // "task-type": task.TaskType,
"task-kind": task.TaskKind, // "task-kind": task.TaskKind,
"task-success": result.IsSuccessed, // "task-success": result.IsSuccessed,
"task-execute-duration": result.TaskExecuteDuration, // "task-execute-duration": result.TaskExecuteDuration,
}).Debug("got task result") //}).Debug("got task result")
if result.IsSuccessed == false { if result.IsSuccessed == false {
receipt := wm.makeReceipt(worker, task, result, errors.New("worker failed")) receipt := wm.makeReceipt(worker, task, result, ErrWorkerFailed)
wm.node.PostResult(receipt) wm.node.PostResult(receipt)
return nil, nil return nil, nil
} }
...@@ -199,11 +197,9 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC ...@@ -199,11 +197,9 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
verified := crypto.VerifySignature(pubkey, dataHash[:], signature) verified := crypto.VerifySignature(pubkey, dataHash[:], signature)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"task-id": task.TaskId,
"containerSignatureVerify": verified, "containerSignatureVerify": verified,
"taskkind": task.TaskKind, "taskkind": task.TaskKind,
"dataHash": hex.EncodeToString(dataHash[:]),
"containerPubkey": hex.EncodeToString(pubkey),
"signature": hex.EncodeToString(signature),
}).Debug("container signature verify") }).Debug("container signature verify")
if !verified { if !verified {
// todo: handle signature verify failed // todo: handle signature verify failed
...@@ -219,6 +215,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC ...@@ -219,6 +215,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
} }
verified := crypto.VerifySignature(utils.FromHex(worker.info.nodeInfo.MinerPubkey), dataHash[:], signature) verified := crypto.VerifySignature(utils.FromHex(worker.info.nodeInfo.MinerPubkey), dataHash[:], signature)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified, "minerSignatureVerify": verified,
"taskkind": task.TaskKind, "taskkind": task.TaskKind,
}).Debug("miner signature verify") }).Debug("miner signature verify")
...@@ -252,8 +249,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC ...@@ -252,8 +249,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
wm.node.PostProof(proof) wm.node.PostProof(proof)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"task-id": result.TaskId, "task-id": result.TaskId,
"workload": task.TaskWorkload,
}).Debug("send proof to kafka") }).Debug("send proof to kafka")
return proof, nil return proof, nil
} }
...@@ -177,7 +177,7 @@ func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerServi ...@@ -177,7 +177,7 @@ func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerServi
w := &Worker{ w := &Worker{
quit: make(chan interface{}), quit: make(chan interface{}),
taskCh: make(chan *dispatchTask), taskCh: make(chan *dispatchTask),
resultCh: make(chan *omanager.SubmitTaskResult, 10), resultCh: make(chan *omanager.SubmitTaskResult, 30),
uuid: id, uuid: id,
registed: false, registed: false,
...@@ -316,7 +316,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -316,7 +316,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
heartBeatTicker.Reset(heartBeatDuration) heartBeatTicker.Reset(heartBeatDuration)
msg.Message = hb msg.Message = hb
callback = func(err error) bool { callback = func(err error) bool {
log.WithField("worker", worker.uuid).Info("send hear beat to worker") log.WithField("worker", worker.uuid).Debug("send heart beat to worker")
return true return true
} }
...@@ -377,7 +377,11 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -377,7 +377,11 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
} }
} }
} }
log.WithField("worker", worker.uuid).Info("dispatch task to worker") log.WithFields(log.Fields{
"task": task.TaskId,
"worker": worker.uuid,
"worker addr": worker.workerAddr,
}).Info("dispatch task to worker")
select { select {
case dtask.errCh <- err: case dtask.errCh <- err:
...@@ -387,6 +391,12 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -387,6 +391,12 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
return true return true
} }
case result := <-worker.resultCh: case result := <-worker.resultCh:
log.WithFields(log.Fields{
"task": result.TaskId,
"worker addr": worker.workerAddr,
"success": result.IsSuccessed,
"duration": result.TaskExecuteDuration / 1000,
}).Info("got result from worker")
// verify result and make a new signature. // verify result and make a new signature.
data, exist := worker.recentTask.Get(result.TaskId) data, exist := worker.recentTask.Get(result.TaskId)
if !exist { if !exist {
...@@ -486,13 +496,13 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -486,13 +496,13 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
wm.UpdateHeartBeat(worker.uuid) wm.UpdateHeartBeat(worker.uuid)
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
"hearBeat": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp), "heartBeat": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp),
}).Debug("receive worker heartbeat") }).Debug("receive worker heartbeat")
case *omanager.WorkerMessage_NodeInfo: case *omanager.WorkerMessage_NodeInfo:
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", msg.NodeInfo.DeviceIp) matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", msg.NodeInfo.DeviceIp)
if err != nil { if err != nil {
fmt.Println("ip匹配出现错误") l.WithField("nodeinfo.ip", msg.NodeInfo.DeviceIp).Error("ip匹配出现错误")
return //return
} }
if !matched { if !matched {
msg.NodeInfo.DeviceIp = "" msg.NodeInfo.DeviceIp = ""
...@@ -656,8 +666,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -656,8 +666,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.registed = true worker.registed = true
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", msg.RegisteMessage.DeviceIp) matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", msg.RegisteMessage.DeviceIp)
if err != nil { if err != nil {
fmt.Println("ip匹配出现错误") log.WithField("registed.ip", msg.RegisteMessage.DeviceIp).Error("ip匹配出现错误")
return
} }
if !matched { if !matched {
msg.RegisteMessage.DeviceIp = "" msg.RegisteMessage.DeviceIp = ""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment