Commit 466e0c38 authored by vicotor's avatar vicotor

update node manager

parent ed03355a
......@@ -4,4 +4,5 @@ const (
NODE_MANAGER_SET = "node_manager_set"
WORKER_STATUS_PREFIX = "worker_status_"
WORKER_QUEUE_PREFIX = "worker_queue_"
WORKER_DEVICE_INFO_PREFIX = "worker_device_info_"
)
......@@ -28,6 +28,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/google/uuid v1.5.0 // indirect
......
......@@ -57,6 +57,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-redis/redis v6.14.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
......@@ -115,9 +117,11 @@ github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkL
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ=
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
......
package server
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/nmregistry"
"github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/odysseus/payment"
"github.com/odysseus/payment/cachedata"
"github.com/odysseus/payment/model"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
......@@ -20,7 +22,7 @@ type Node struct {
rdb *redis.Client
wm *WorkerManager
privk *ecdsa.PrivateKey
payment *payment.Payment
cache *cachedata.CacheData
}
func NewNode() *Node {
......@@ -30,11 +32,19 @@ func NewNode() *Node {
Password: redisConfig.Password,
DbIndex: redisConfig.DbIndex,
})
pay := payment.NewPayment(payment.RedisConnParam{
dbconf := config.GetConfig().DbConfig
pay := cachedata.NewCacheData(context.Background(), cachedata.RedisConnParam{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
DbIndex: redisConfig.DbIndex,
}, model.DbConfig{
Host: dbconf.Host,
Port: dbconf.Port,
User: dbconf.User,
Passwd: dbconf.Passwd,
DbName: dbconf.DbName,
})
privk, err := utils.HexToPrivatekey(config.GetConfig().PrivateKey)
if err != nil {
log.WithError(err).Error("failed to parse node manager private key")
......@@ -43,7 +53,7 @@ func NewNode() *Node {
node := &Node{
rdb: rdb,
privk: privk,
payment: pay,
cache: pay,
apiServer: grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)),
registry: nmregistry.NewRegistryService(config.GetConfig(), rdb, privk.PublicKey),
}
......
package server
import (
"context"
"bytes"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
......@@ -39,6 +42,7 @@ type Worker struct {
status []byte
usageInfo []*omanager.DeviceUsage
deviceInfo []*omanager.DeviceInfo
deviceInfoHash []byte
recentTask *lru.Cache
stream omanager.NodeManagerService_RegisterWorkerServer
}
......@@ -136,6 +140,16 @@ func (wm *WorkerManager) AddNewWorker(uuid int64, worker omanager.NodeManagerSer
type Callback func(err error) bool
func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse) {
d, _ := proto.Marshal(response)
err := utils.Post(hook, d)
if err != nil {
log.WithError(err).Error("post task result failed")
} else {
log.WithField("taskid", response.TaskId).Debug("post task result")
}
}
func (wm *WorkerManager) manageWorker(worker *Worker) error {
log.WithField("worker", worker.uuid).Info("start manage worker")
......@@ -236,7 +250,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
taskMsg.PushTaskMessage = &omanager.PushTaskMessage{
TaskId: task.TaskId,
TaskType: task.TaskType,
Workload: 0,
Workload: uint64(task.TaskWorkload),
TaskCmd: task.TaskCmd,
TaskParam: task.TaskParam,
}
......@@ -267,13 +281,49 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
log.WithField("worker", worker.uuid).Error("task id not match")
continue
}
// todo: verify container_signature and miner_signature
if result.IsSuccessed == false {
taskResponse := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskResult: result.TaskResult,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: false,
TaskError: "worker failed",
}
go wm.doCallback(task.TaskCallback, taskResponse)
continue
}
{
// verify container_signature and miner_signature
// container_signature = sign(hash(task_id+hash(task_param)+hash(task_result)))
paramHash := sha3.Sum256(task.TaskParam)
resultHash := sha3.Sum256(result.TaskResult)
dataHash := sha3.Sum256(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
containerPubkey, _ := utils.HexToPubkey(hex.EncodeToString(task.ContainerPubkey))
verified := ecdsa.VerifyASN1(containerPubkey, dataHash[:], result.ContainerSignature)
if !verified {
// todo: handle signature verify failed
}
}
{
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
paramHash := sha3.Sum256(task.TaskParam)
resultHash := sha3.Sum256(result.TaskResult)
dataHash := sha3.Sum256(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey
verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature)
if !verified {
// todo: handle signature verify failed
}
}
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
paramHash := sha3.Sum256(task.TaskParam)
resultHash := sha3.Sum256(result.TaskResult)
dataHash := sha3.Sum256(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:],
result.ContainerSignature, result.MinerSignature, big.NewInt(0).Bytes()))
//result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.Workload)).Bytes()))
result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()))
signature, err := wm.node.Sign(dataHash[:])
if err != nil {
......@@ -285,7 +335,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
callback = func(err error) bool {
if err == nil {
// remove task from cache.
......@@ -299,19 +351,11 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
TaskIsSucceed: true,
TaskError: "",
}
d, _ := proto.Marshal(taskResponse)
go func() {
err := utils.Post(task.TaskCallback, d)
if err != nil {
log.WithError(err).Error("post task result failed")
} else {
log.WithField("taskid", task.TaskId).Debug("post task result")
}
}()
go wm.doCallback(task.TaskCallback, taskResponse)
_ = wm.AddWorker(worker)
wm.Payment(task)
// todo: post event for task succeed or failed
return true
}
}
......@@ -367,10 +411,11 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
log.WithFields(log.Fields{
"worker": worker.uuid,
}).Debugf("receive worker device info:%v", msg.DeviceInfo.Devices)
if worker.deviceInfo == nil || worker.addr == "" {
// first time receive device info
{
// receive device info
worker.publicKey = msg.DeviceInfo.MinerPubkey
worker.deviceInfo = msg.DeviceInfo.Devices
if pubkey, err := utils.HexToPubkey(worker.publicKey); err != nil {
log.WithFields(log.Fields{
"worker": worker.uuid,
......@@ -380,6 +425,19 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.addr = utils.PubkeyToAddress(pubkey)
}
if worker.addr != "" {
infoData, err := json.Marshal(msg.DeviceInfo.Devices)
if err != nil {
log.WithFields(log.Fields{
"worker": worker.uuid,
"error": err,
}).Error("marshal device info failed")
} else if len(infoData) > 0 {
infoHash := sha3.Sum256(infoData)
if bytes.Compare(infoHash[:], worker.deviceInfoHash) != 0 {
wm.UpdateWorkerDeviceInfo(worker, string(infoData))
}
worker.deviceInfoHash = infoHash[:]
}
wm.AddWorker(worker)
wm.SetWorkerAddr(worker, worker.addr)
}
......@@ -403,7 +461,9 @@ func (wm *WorkerManager) Payment(task *odysseus.TaskContent) error {
if config.GetConfig().EnablePay == true {
// pay for task.
fee, _ := strconv.ParseInt(task.TaskFee, 10, 64)
_, err := wm.node.payment.DecrBalance(context.Background(), task.TaskUid, fee)
uid, _ := strconv.ParseInt(task.TaskUid, 10, 64)
err := wm.node.cache.PayforFee(uid, fee)
if err != nil {
return err
}
......
......@@ -6,6 +6,12 @@ import (
"strconv"
)
func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos string) {
deviceInfoKey := config.WORKER_DEVICE_INFO_PREFIX + worker.addr
wm.rdb.Set(context.Background(), deviceInfoKey, deviceInfos, 0)
}
func (wm *WorkerManager) AddWorker(worker *Worker) error {
for _, device := range worker.deviceInfo {
// add device to redis
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment