Commit 22385de4 authored by vicotor's avatar vicotor

update standard lib and task

parent 783e5e2c
...@@ -3,6 +3,7 @@ local_host="127.0.0.1" ...@@ -3,6 +3,7 @@ local_host="127.0.0.1"
port=10001 port=10001
metrics_port = 28010 metrics_port = 28010
private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC" private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC"
standard_task_file = "standardtask.json"
[redis] [redis]
addr="127.0.0.1:6379" addr="127.0.0.1:6379"
......
...@@ -35,10 +35,11 @@ type TickerConfig struct { ...@@ -35,10 +35,11 @@ type TickerConfig struct {
} }
type Config struct { type Config struct {
PrivateKey string `json:"private_key" toml:"private_key"` PrivateKey string `json:"private_key" toml:"private_key"`
RemoteHost string `json:"remote_host" toml:"remote_host"` RemoteHost string `json:"remote_host" toml:"remote_host"`
LocalHost string `json:"local_host" toml:"local_host"` LocalHost string `json:"local_host" toml:"local_host"`
Port int `json:"port" toml:"port"` Port int `json:"port" toml:"port"`
StandardTaskFile string `json:"standard_task_file" toml:"standard_task_file"`
//Endpoint string `json:"endpoint" toml:"endpoint"` //Endpoint string `json:"endpoint" toml:"endpoint"`
MetricPort int `json:"metrics_port" toml:"metrics_port"` MetricPort int `json:"metrics_port" toml:"metrics_port"`
EnablePay bool `json:"enable_pay" toml:"enable_pay"` EnablePay bool `json:"enable_pay" toml:"enable_pay"`
...@@ -79,3 +80,7 @@ func (conf *Config) LocalEndpoint() string { ...@@ -79,3 +80,7 @@ func (conf *Config) LocalEndpoint() string {
func (conf *Config) ApiEndpoint() string { func (conf *Config) ApiEndpoint() string {
return fmt.Sprintf("0.0.0.0:%d", conf.Port) return fmt.Sprintf("0.0.0.0:%d", conf.Port)
} }
func (conf *Config) StandardTaskFilePath() string {
return conf.StandardTaskFile
}
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/odysseus/nodemanager/standardlib"
"github.com/odysseus/nodemanager/utils" "github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
...@@ -22,7 +21,6 @@ func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, ...@@ -22,7 +21,6 @@ func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent,
} }
return nil, errors.New("unsupport task kind") return nil, errors.New("unsupport task kind")
} }
func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) { func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
...@@ -108,7 +106,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC ...@@ -108,7 +106,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
return nil, nil return nil, nil
} }
stdlib := standardlib.GetStdLib(task.TaskType) stdlib := wm.std.GetStdLib(task.TaskType)
if stdlib == nil { if stdlib == nil {
log.WithField("task-id", task.TaskId).Error("not found stdlib to verify") log.WithField("task-id", task.TaskId).Error("not found stdlib to verify")
return nil, errors.New("not found stdlib to verify") return nil, errors.New("not found stdlib to verify")
......
...@@ -69,6 +69,7 @@ type WorkerManager struct { ...@@ -69,6 +69,7 @@ type WorkerManager struct {
quit chan struct{} quit chan struct{}
node *Node node *Node
std *standardlib.StandardTasks
} }
func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager { func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager {
...@@ -79,6 +80,7 @@ func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager { ...@@ -79,6 +80,7 @@ func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager {
quit: make(chan struct{}), quit: make(chan struct{}),
rdb: rdb, rdb: rdb,
node: node, node: node,
std: standardlib.NewStandardTasks(),
} }
} }
...@@ -390,30 +392,32 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -390,30 +392,32 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.addr, "worker-addr": worker.addr,
}).Debugf("receive worker fetch std task request:%v", msg.FetchStandardTask.TaskType) }).Debugf("receive worker fetch std task request:%v", msg.FetchStandardTask.TaskType)
tasks := standardlib.GetStdTaskList()
pushTask := standardlib.StdTask{} pushTask := standardlib.StdTask{}
for _, task := range tasks { task, exist := wm.std.GetTask(msg.FetchStandardTask.TaskType)
if task.TaskType == msg.FetchStandardTask.TaskType { if exist {
stdlib := standardlib.GetStdLib(task.TaskType) stdlib := wm.std.GetStdLib(task.TaskType)
if stdlib == nil { if stdlib == nil {
continue l.WithField("task-type", task.TaskType).Warn("not found std lib")
} continue
pushTask = task }
pushTask.TaskId = uuid.NewString() pushTask = task
param, err := stdlib.GenerateParam(0) pushTask.TaskId = uuid.NewString()
if err != nil { param, err := stdlib.GenerateParam(0)
continue if err != nil {
} l.WithError(err).WithField("task-type", task.TaskType).Error("generate param failed")
pushTask.TaskParam = []byte(param) continue
pushTask.TaskInLen = int32(len(param)) }
pushTask.TaskKind = odysseus.TaskKind_StandardTask pushTask.TaskParam = []byte(param)
pushTask.TaskFee = "0" pushTask.TaskInLen = int32(len(param))
worker.taskCh <- &dispatchTask{ pushTask.TaskKind = odysseus.TaskKind_StandardTask
task: &pushTask.TaskContent, pushTask.TaskFee = "0"
errCh: make(chan error, 1), worker.taskCh <- &dispatchTask{
} task: &pushTask.TaskContent,
break errCh: make(chan error, 1),
} }
break
} else {
l.WithField("task-type", msg.FetchStandardTask.TaskType).Warn("not found std task")
} }
case *omanager.WorkerMessage_DeviceInfo: case *omanager.WorkerMessage_DeviceInfo:
......
...@@ -11,6 +11,10 @@ import ( ...@@ -11,6 +11,10 @@ import (
type GenEthAddr struct { type GenEthAddr struct {
} }
func init() {
RegisterStdLib("ethaddr", &GenEthAddr{})
}
func (g *GenEthAddr) GenerateParam(difficult int) (string, error) { func (g *GenEthAddr) GenerateParam(difficult int) (string, error) {
m := rand.Intn(256) // random prefix 2 length m := rand.Intn(256) // random prefix 2 length
for i := 0; i < difficult; i++ { for i := 0; i < difficult; i++ {
......
package standardlib package standardlib
import ( import (
"encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/odysseus/nodemanager/config"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
log "github.com/sirupsen/logrus"
"os"
"sync" "sync"
) )
...@@ -10,16 +15,49 @@ type StdLib interface { ...@@ -10,16 +15,49 @@ type StdLib interface {
VerifyResult(param string, result []byte) bool VerifyResult(param string, result []byte) bool
} }
type StandardTasks struct {
tasks []StdTask
mux sync.Mutex
stdTasks sync.Map
}
var ( var (
standardLibMap = sync.Map{} standardLibMap = sync.Map{}
) )
func RegisterStdLib(taskType uint64, lib StdLib) { func RegisterStdLib(name string, lib StdLib) {
standardLibMap.Store(taskType, lib) standardLibMap.Store(name, lib)
}
func NewStandardTasks() *StandardTasks {
s := &StandardTasks{}
s.loadTasks()
return s
}
func (s *StandardTasks) loadTasks() {
s.tasks = loadStdTasks()
for _, task := range s.tasks {
s.stdTasks.Store(task.TaskType, task)
}
}
func (s *StandardTasks) GetTask(taskType uint64) (StdTask, bool) {
if task, ok := s.stdTasks.Load(taskType); ok {
return task.(StdTask), true
}
return StdTask{}, false
}
func (s *StandardTasks) GetStdLib(taskType uint64) StdLib {
if task, exist := s.GetTask(taskType); exist {
return getStdLib(task.Standlib)
}
return nil
} }
func GetStdLib(taskType uint64) StdLib { func getStdLib(name string) StdLib {
if lib, ok := standardLibMap.Load(taskType); ok { if lib, ok := standardLibMap.Load(name); ok {
return lib.(StdLib) return lib.(StdLib)
} }
return nil return nil
...@@ -27,8 +65,44 @@ func GetStdLib(taskType uint64) StdLib { ...@@ -27,8 +65,44 @@ func GetStdLib(taskType uint64) StdLib {
type StdTask struct { type StdTask struct {
odysseus.TaskContent odysseus.TaskContent
Standlib string
}
type defineStdTask struct {
TaskKind int `json:"task_kind,omitempty"`
TaskType uint64 `json:"task_type,omitempty"`
TaskCmd string `json:"task_cmd,omitempty"`
TaskParam string `json:"task_param,omitempty"`
TaskFee string `json:"task_fee,omitempty"`
TaskWorkload int64 `json:"task_workload,omitempty"`
ContainerPubkey string ` json:"container_pubkey,omitempty"`
} }
func GetStdTaskList() []StdTask { func loadStdTasks() []StdTask {
return []StdTask{} file := config.GetConfig().StandardTaskFilePath()
defStdTasks := make([]defineStdTask, 0)
// read content from file and json unmarshal to defStdTasks
data, err := os.ReadFile(file)
if err != nil {
log.WithError(err).Error("read standard task file failed")
return []StdTask{}
}
err = json.Unmarshal(data, &defStdTasks)
if err != nil {
log.WithError(err).Error("json unmarshal standard task file failed")
return []StdTask{}
}
stdTasks := make([]StdTask, len(defStdTasks))
for i, defStdTask := range defStdTasks {
stdTasks[i].TaskKind = odysseus.TaskKind_StandardTask
stdTasks[i].TaskType = defStdTask.TaskType
stdTasks[i].TaskCmd = defStdTask.TaskCmd
stdTasks[i].TaskParam = []byte(defStdTask.TaskParam)
stdTasks[i].TaskFee = defStdTask.TaskFee
stdTasks[i].TaskWorkload = defStdTask.TaskWorkload
stdTasks[i].ContainerPubkey = common.Hex2Bytes(defStdTask.ContainerPubkey)
stdTasks[i].Standlib = defStdTask.TaskCmd
}
return stdTasks
} }
[
{
"task_kind": 3,
"task_type":12,
"task_cmd":"{\"image_name\":\"demianhjw/ethaddr:latest\",\"docker_cmd\":{\"container_port\":\"5001\"},\"api_url\":\"http://127.0.0.1:%d/aigic\"}",
"task_timestamp":1708574058633402301,
"task_fee":"0",
"task_workload":100,
"standlib": "ethaddr"
}
]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment