Commit d10b9420 authored by vicotor's avatar vicotor

add task proof post

parent c412f1d0
remote_host=""
local_host="127.0.0.1"
remote_host="192.168.1.112"
local_host="192.168.1.112"
port=10001
metrics_port = 28010
private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC"
standard_task_file = "standardtask.json"
standard_task_file = "/Users/luxq/work/wuban/nodemanager/standardtask.json"
[redis]
addr="127.0.0.1:6379"
......@@ -20,6 +20,7 @@ database="liuxuzhong"
[kafka]
brokers="127.0.0.1:9092"
receipt_topic="taskreceipt"
proof_topic="taskproof"
[ticker]
#second
......
......@@ -19,6 +19,7 @@ type KafkaConfig struct {
Brokers string `json:"brokers" toml:"brokers"`
ReceiptTopic string `json:"receipt_topic" toml:"receipt_topic"`
TaskTopic string `json:"task_topic" toml:"task_topic"`
ProofTopic string `json:"proof_topic" toml:"proof_topic"`
}
type RedisConfig struct {
......
......@@ -31,6 +31,7 @@ type Node struct {
cache *cachedata.CacheData
kafkaProducer sarama.AsyncProducer
taskResultCh chan *basev1.TaskReceipt
taskProofCh chan *basev1.TaskProof
}
func NewNode() *Node {
......@@ -84,6 +85,7 @@ func NewNode() *Node {
apiServer: grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)),
kafkaProducer: producer,
taskResultCh: make(chan *basev1.TaskReceipt, 100000),
taskProofCh: make(chan *basev1.TaskProof, 100000),
}
node.wm = NewWorkerManager(rdb, node)
......@@ -138,6 +140,16 @@ func (n *Node) PostResult(result *basev1.TaskReceipt) {
n.taskResultCh <- result
}
func (n *Node) PostProof(proof *basev1.TaskProof) {
defer func() {
// handler recover
if err := recover(); err != nil {
log.WithError(err.(error)).Error("post result panic")
}
}()
n.taskProofCh <- proof
}
func (n *Node) postLoop() {
for {
select {
......@@ -148,6 +160,13 @@ func (n *Node) postLoop() {
if err := utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic); err != nil {
log.WithError(err).Error("fire task receipt to kafka failed")
}
case proof, ok := <-n.taskProofCh:
if !ok {
return
}
if err := utils.FireTaskProof(n.kafkaProducer, proof, config.GetConfig().Kafka.ProofTopic); err != nil {
log.WithError(err).Error("fire task receipt to kafka failed")
}
}
}
}
......@@ -157,4 +176,5 @@ func (n *Node) Stop() {
n.register.Stop()
n.apiServer.Stop()
close(n.taskResultCh)
close(n.taskProofCh)
}
......@@ -12,7 +12,7 @@ import (
"time"
)
func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) {
switch task.TaskKind {
case odysseus.TaskKind_ComputeTask:
return wm.computeTaskResult(worker, task, result)
......@@ -23,7 +23,7 @@ func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent,
return nil, errors.New("unsupport task kind")
}
func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) {
if worker.info.nodeInfo == nil {
return nil, errors.New("unknown worker node info")
}
......@@ -105,14 +105,15 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
return nil, err
}
proof := new(omanager.ManagerMessage_ProofTaskResult)
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
Timestamp: uint64(now),
Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
proof := wm.makeTaskProof(worker, task, taskProof{
paramHash: paramHash.Bytes(),
resultHash: resultHash.Bytes(),
finishTime: uint64(now),
nmSignature: signature,
containerSignature: result.ContainerSignature,
minerSignature: result.MinerSignature,
})
wm.node.PostProof(proof)
log.WithFields(log.Fields{
"task-id": result.TaskId,
"workload": task.TaskWorkload,
......@@ -120,7 +121,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
return proof, nil
}
func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) {
if worker.info.nodeInfo == nil {
return nil, errors.New("unknown worker node info")
}
......@@ -193,18 +194,19 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
log.WithError(err).Error("sign result failed")
return nil, err
}
proof := wm.makeTaskProof(worker, task, taskProof{
paramHash: paramHash.Bytes(),
resultHash: resultHash.Bytes(),
finishTime: uint64(now),
nmSignature: signature,
containerSignature: result.ContainerSignature,
minerSignature: result.MinerSignature,
})
wm.node.PostProof(proof)
proof := new(omanager.ManagerMessage_ProofTaskResult)
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
Timestamp: uint64(now),
Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
log.WithFields(log.Fields{
"task-id": result.TaskId,
"workload": task.TaskWorkload,
}).Debug("send proof to worker")
}).Debug("send proof to kafka")
return proof, nil
}
......@@ -2,6 +2,7 @@ package server
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
......@@ -397,17 +398,13 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
if err != nil {
continue
}
msg.Message = proof
_ = proof
callback = func(err error) bool {
// remove task from cache.
worker.recentTask.Remove(result.TaskId)
if task.TaskKind != odysseus.TaskKind_StandardTask {
_ = wm.AddWorkerSingle(worker)
wm.Payment(task)
}
return true
}
}
if msg != nil {
......@@ -668,3 +665,29 @@ func (wm *WorkerManager) makeReceipt(worker *Worker, task *odysseus.TaskContent,
}
return receipt
}
type taskProof struct {
paramHash []byte
resultHash []byte
finishTime uint64
nmSignature []byte
containerSignature []byte
minerSignature []byte
}
func (wm *WorkerManager) makeTaskProof(worker *Worker, task *odysseus.TaskContent, t taskProof) *odysseus.TaskProof {
proof := &odysseus.TaskProof{
TaskId: task.TaskId,
TaskFinishTimestamp: t.finishTime,
TaskType: task.TaskType,
TaskWorkload: uint64(task.TaskWorkload),
TaskReqHash: hex.EncodeToString(t.paramHash),
TaskRespHash: hex.EncodeToString(t.resultHash),
TaskManagerSignature: hex.EncodeToString(t.nmSignature),
TaskContainerSignature: hex.EncodeToString(t.containerSignature),
TaskMinerSignature: hex.EncodeToString(t.minerSignature),
TaskWorkerAccount: worker.workerAddr,
TaskProfitAccount: worker.info.nodeInfo.BenefitAddress,
}
return proof
}
......@@ -31,10 +31,6 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
}
func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error {
// Check time for partition key
//var partitionKey sarama.StringEncoder
//partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02"))
b, err := proto.Marshal(receipt)
if err != nil {
return err
......@@ -42,10 +38,22 @@ func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceip
value := sarama.ByteEncoder(b)
producer.Input() <- &sarama.ProducerMessage{
//Key: partitionKey,
Topic: topic,
Value: value,
}
return nil
}
func FireTaskProof(producer sarama.AsyncProducer, proof *odysseus.TaskProof, topic string) error {
b, err := proto.Marshal(proof)
if err != nil {
return err
}
value := sarama.ByteEncoder(b)
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: value,
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment