Commit 783e5e2c authored by vicotor's avatar vicotor

add standard task

parent dbb1be88
package server
import (
"crypto/ecdsa"
"errors"
"github.com/ethereum/go-ethereum/crypto"
"github.com/odysseus/nodemanager/standardlib"
"github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
log "github.com/sirupsen/logrus"
"math/big"
)
func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
switch task.TaskKind {
case odysseus.TaskKind_ComputeTask:
return wm.computeTaskResult(worker, task, result)
case odysseus.TaskKind_StandardTask:
return wm.standardTaskResult(worker, task, result)
}
return nil, errors.New("unsupport task kind")
}
func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
log.WithField("task-id", task.TaskId).WithField("result", result).Debug("got task result")
if result.IsSuccessed == false {
taskResponse := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskResultHeader: result.TaskResultHeader,
TaskResultBody: result.TaskResultBody,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: false,
TaskError: "worker failed",
}
receipt := wm.makeReceipt(worker, task, result, errors.New("worker failed"))
wm.node.PostResult(receipt)
go wm.doCallback(task.TaskCallback, taskResponse)
return nil, nil
}
//{
// // verify container_signature and miner_signature
// // container_signature = sign(hash(task_id+hash(task_param)+hash(task_result)))
// paramHash := crypto.Keccak256Hash(task.TaskParam)
// resultHash := crypto.Keccak256Hash(result.TaskResult)
// dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskUuid), paramHash[:], resultHash[:]))
// containerPubkey, _ := utils.HexToPubkey(hex.EncodeToString(task.ContainerPubkey))
// verified := ecdsa.VerifyASN1(containerPubkey, dataHash[:], result.ContainerSignature)
// if !verified {
// // todo: handle signature verify failed
// }
//}
{
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey
verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature)
log.WithField("minerSignatureVerify", verified).Debug("miner signature verify")
if !verified {
// todo: handle signature verify failed
}
}
receipt := wm.makeReceipt(worker, task, result, Succeed)
wm.node.PostResult(receipt)
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:],
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()))
signature, err := wm.node.Sign(dataHash[:])
if err != nil {
log.WithError(err).Error("sign result failed")
return nil, err
}
proof := new(omanager.ManagerMessage_ProofTaskResult)
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
log.WithFields(log.Fields{
"task-id": result.TaskId,
"workload": task.TaskWorkload,
}).Debug("send proof to worker")
return proof, nil
}
func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
log.WithField("task-id", task.TaskId).WithField("result", result).Debug("got standard task result")
if result.IsSuccessed == false {
receipt := wm.makeReceipt(worker, task, result, errors.New("worker failed"))
wm.node.PostResult(receipt)
return nil, nil
}
stdlib := standardlib.GetStdLib(task.TaskType)
if stdlib == nil {
log.WithField("task-id", task.TaskId).Error("not found stdlib to verify")
return nil, errors.New("not found stdlib to verify")
}
if stdlib.VerifyResult(string(task.TaskParam), result.TaskResultBody) == true {
log.WithField("task-id", task.TaskId).Debug("stdlib to verify passed")
} else {
log.WithField("task-id", task.TaskId).Debug("stdlib to verify failed")
return nil, errors.New("stdlib to verify failed")
}
//{
// // verify container_signature and miner_signature
// // container_signature = sign(hash(task_id+hash(task_param)+hash(task_result)))
// paramHash := crypto.Keccak256Hash(task.TaskParam)
// resultHash := crypto.Keccak256Hash(result.TaskResult)
// dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskUuid), paramHash[:], resultHash[:]))
// containerPubkey, _ := utils.HexToPubkey(hex.EncodeToString(task.ContainerPubkey))
// verified := ecdsa.VerifyASN1(containerPubkey, dataHash[:], result.ContainerSignature)
// if !verified {
// // todo: handle signature verify failed
// }
//}
{
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey
verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature)
log.WithField("minerSignatureVerify", verified).Debug("miner signature verify")
if !verified {
// todo: handle signature verify failed
}
}
taskResponse := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskResultHeader: result.TaskResultHeader,
TaskResultBody: result.TaskResultBody,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: true,
TaskError: "",
}
go wm.doCallback(task.TaskCallback, taskResponse)
receipt := wm.makeReceipt(worker, task, result, Succeed)
wm.node.PostResult(receipt)
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:],
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()))
signature, err := wm.node.Sign(dataHash[:])
if err != nil {
log.WithError(err).Error("sign result failed")
return nil, err
}
proof := new(omanager.ManagerMessage_ProofTaskResult)
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
log.WithFields(log.Fields{
"task-id": result.TaskId,
"workload": task.TaskWorkload,
}).Debug("send proof to worker")
return proof, nil
}
......@@ -2,22 +2,21 @@ package server
import (
"bytes"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/standardlib"
"github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
"math/big"
"strconv"
"sync"
"time"
......@@ -316,93 +315,15 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
continue
}
log.WithField("task-id", task.TaskId).WithField("result", result).Debug("got task result")
if result.IsSuccessed == false {
taskResponse := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskResultHeader: result.TaskResultHeader,
TaskResultBody: result.TaskResultBody,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: false,
TaskError: "worker failed",
}
receipt := wm.makeReceipt(worker, task, result, errors.New("worker failed"))
wm.node.PostResult(receipt)
go wm.doCallback(task.TaskCallback, taskResponse)
continue
}
//{
// // verify container_signature and miner_signature
// // container_signature = sign(hash(task_id+hash(task_param)+hash(task_result)))
// paramHash := crypto.Keccak256Hash(task.TaskParam)
// resultHash := crypto.Keccak256Hash(result.TaskResult)
// dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskUuid), paramHash[:], resultHash[:]))
// containerPubkey, _ := utils.HexToPubkey(hex.EncodeToString(task.ContainerPubkey))
// verified := ecdsa.VerifyASN1(containerPubkey, dataHash[:], result.ContainerSignature)
// if !verified {
// // todo: handle signature verify failed
// }
//}
{
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey
verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature)
log.WithField("minerSignatureVerify", verified).Debug("miner signature verify")
if !verified {
// todo: handle signature verify failed
}
}
receipt := wm.makeReceipt(worker, task, result, Succeed)
wm.node.PostResult(receipt)
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:],
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()))
signature, err := wm.node.Sign(dataHash[:])
proof, err := wm.taskResult(worker, task, result)
if err != nil {
log.WithError(err).Error("sign result failed")
continue
}
proof := new(omanager.ManagerMessage_ProofTaskResult)
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
log.WithFields(log.Fields{
"task-id": result.TaskId,
"workload": task.TaskWorkload,
}).Debug("send proof to worker")
msg.Message = proof
callback = func(err error) bool {
if err == nil {
// remove task from cache.
worker.recentTask.Remove(result.TaskId)
}
taskResponse := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskResultHeader: result.TaskResultHeader,
TaskResultBody: result.TaskResultBody,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: true,
TaskError: "",
}
go wm.doCallback(task.TaskCallback, taskResponse)
// remove task from cache.
worker.recentTask.Remove(result.TaskId)
_ = wm.AddWorkerSingle(worker)
......@@ -465,6 +386,36 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
l.WithFields(log.Fields{
"worker-addr": worker.addr,
}).Debugf("receive worker resource map:%v", msg.ResourceMap)
case *omanager.WorkerMessage_FetchStandardTask:
l.WithFields(log.Fields{
"worker-addr": worker.addr,
}).Debugf("receive worker fetch std task request:%v", msg.FetchStandardTask.TaskType)
tasks := standardlib.GetStdTaskList()
pushTask := standardlib.StdTask{}
for _, task := range tasks {
if task.TaskType == msg.FetchStandardTask.TaskType {
stdlib := standardlib.GetStdLib(task.TaskType)
if stdlib == nil {
continue
}
pushTask = task
pushTask.TaskId = uuid.NewString()
param, err := stdlib.GenerateParam(0)
if err != nil {
continue
}
pushTask.TaskParam = []byte(param)
pushTask.TaskInLen = int32(len(param))
pushTask.TaskKind = odysseus.TaskKind_StandardTask
pushTask.TaskFee = "0"
worker.taskCh <- &dispatchTask{
task: &pushTask.TaskContent,
errCh: make(chan error, 1),
}
break
}
}
case *omanager.WorkerMessage_DeviceInfo:
// todo: handler worker device info
l.WithFields(log.Fields{
......
package standardlib
import (
"bytes"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"math/rand"
)
type GenEthAddr struct {
}
func (g *GenEthAddr) GenerateParam(difficult int) (string, error) {
m := rand.Intn(256) // random prefix 2 length
for i := 0; i < difficult; i++ {
n := rand.Intn(256) // append prefix 2 length
m = m<<8 | n
}
return fmt.Sprintf("%02x", m), nil
}
func (g *GenEthAddr) VerifyResult(param string, result []byte) bool {
pk, err := crypto.ToECDSA(result)
if err != nil {
return false
}
addr := crypto.PubkeyToAddress(pk.PublicKey)
pm := common.Hex2Bytes(param)
if len(param) > len(addr.Bytes()) {
return false
}
return bytes.Compare(addr.Bytes()[:len(pm)], pm) == 0
}
package standardlib
import (
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"sync"
)
type StdLib interface {
GenerateParam(difficult int) (string, error)
VerifyResult(param string, result []byte) bool
}
var (
standardLibMap = sync.Map{}
)
func RegisterStdLib(taskType uint64, lib StdLib) {
standardLibMap.Store(taskType, lib)
}
func GetStdLib(taskType uint64) StdLib {
if lib, ok := standardLibMap.Load(taskType); ok {
return lib.(StdLib)
}
return nil
}
type StdTask struct {
odysseus.TaskContent
}
func GetStdTaskList() []StdTask {
return []StdTask{}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment