Commit 5b4b486b authored by duanjinfei's avatar duanjinfei

init commit

parent 26dbb045
Pipeline #668 failed with stages
.idea
logs
*.DS_Store
*/mydb/
\ No newline at end of file
package main
import "runtime"
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
Execute()
}
package main
import (
"fmt"
"github.com/spf13/cobra"
)
var versionDetail bool
func init() {
RootCmd.AddCommand(versionCmd)
versionDetail = *versionCmd.Flags().BoolP("detail", "d", true, "Print detail version info")
}
// versionCmd represents the base command when called without any subcommands
var versionCmd = &cobra.Command{
Use: "version",
Short: "Print version number",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
if versionDetail {
fmt.Println("detail version:v1.0.0")
} else {
fmt.Println("version:v1.0.0")
}
},
}
package main
import (
"encoding/json"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/nm"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"io/ioutil"
"os"
)
var (
routineCount uint
_cfg *conf.Config = nil
)
// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Use: "Miner",
Short: "The miner command-line interface",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
nm.StartMonitor()
},
// Uncomment the following line if your bare application
// has an action associated with it:
// Run: func(cmd *cobra.Command, args []string) { },
}
func init() {
cobra.OnInitialize(initConfig)
RootCmd.PersistentFlags().UintVar(&routineCount, "routine", 2, "routine count for corrupt do task")
}
// initConfig reads in conf file and ENV variables if set.
func initConfig() {
// 设置配置文件的名称(不包含扩展名)
viper.SetConfigName("config")
// 设置配置文件的类型
viper.SetConfigType("json")
// 设置配置文件所在的目录
viper.AddConfigPath(".")
// 读取配置文件
if err := viper.ReadInConfig(); err != nil {
fmt.Println("Error reading config file:", err)
return
}
configFilePath := viper.ConfigFileUsed()
if configFilePath == "" {
// handle error
log.Error("config file path is empty")
panic("config file path is empty")
}
data, err := ioutil.ReadFile(configFilePath)
if err != nil {
// handle error
log.Error("Read cfg file error:", err)
panic("Read cfg file error")
}
err = json.Unmarshal(data, &_cfg)
if err != nil {
// handle error
log.Error("Json unmarshal cfg error:", err)
panic("Json unmarshal cfg error")
}
_cfg.HeartRespTimeMillis = _cfg.HeartRespTimeSecond * 60 * 60 * 1000
prvKey, err := crypto.HexToECDSA(_cfg.SignPrv)
if err != nil {
return
}
_cfg.SignPrivateKey = prvKey
ecdsaPub := prvKey.PublicKey
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
_cfg.SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub))
log.Info(_cfg.SignPub)
_cfg.SignPublicAddress = publicAddr
}
func Execute() {
if err := RootCmd.Execute(); err != nil {
log.Error("root cmd execute failed", err)
os.Exit(-1)
}
}
appname = node-server
httpport = 8888
runmode = dev
autorender = false
copyrequestbody = true
\ No newline at end of file
package conf
import (
"crypto/ecdsa"
"encoding/json"
"example.com/m/log"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/spf13/viper"
"io/ioutil"
)
type Config struct {
NmSeed string `json:"nm_seed"`
SignPrv string `json:"sign_prv"`
SignPrivateKey *ecdsa.PrivateKey
SignPub string
SignPublicAddress common.Address
DockerServer string `json:"docker_server"`
HeartRespTimeSecond int64 `json:"heart_response"`
HeartRespTimeMillis int64
TaskValidatorTime float64 `json:"task_validator_time"`
BenefitAddress string `json:"benefit_address"`
DockerSignApi string `json:"docker_sign_api"`
ContainerNum int64 `json:"container_num"`
}
var _cfg *Config = nil
func init() {
// 设置配置文件的名称(不包含扩展名)
viper.SetConfigName("config")
// 设置配置文件的类型
viper.SetConfigType("json")
// 设置配置文件所在的目录
viper.AddConfigPath("../")
// 读取配置文件
if err := viper.ReadInConfig(); err != nil {
fmt.Println("Error reading config file:", err)
return
}
configFilePath := viper.ConfigFileUsed()
if configFilePath == "" {
// handle error
log.Error("config file path is empty")
panic("config file path is empty")
}
data, err := ioutil.ReadFile(configFilePath)
if err != nil {
// handle error
log.Error("Read cfg file error:", err)
panic("Read cfg file error")
}
err = json.Unmarshal(data, &_cfg)
if err != nil {
// handle error
log.Error("Json unmarshal cfg error:", err)
panic("Json unmarshal cfg error")
}
_cfg.HeartRespTimeMillis = _cfg.HeartRespTimeSecond * 60 * 60 * 1000
prvKey, err := crypto.HexToECDSA(_cfg.SignPrv)
if err != nil {
return
}
_cfg.SignPrivateKey = prvKey
ecdsaPub := prvKey.PublicKey
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
_cfg.SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub))
log.Info(_cfg.SignPub)
_cfg.SignPublicAddress = publicAddr
}
func GetConfig() *Config {
return _cfg
}
{
"nm_seed": "192.168.1.241:10001",
"sign_prv": "0e80b06d24d7543b3e2520c91d25997bcf5e0e9e6361910cea6ab268c2db3600",
"docker_server": "tcp://192.168.1.120:2375",
"benefit_address": "0x84A3175be614F5886f99Da506dF08682DF530739",
"heart_response": 30,
"task_validator_time": 1,
"docker_sign_api":"http://192.168.1.120:8888/llm/test/get/sign",
"container_num": 1
}
\ No newline at end of file
package controllers
import (
"github.com/astaxie/beego"
"github.com/astaxie/beego/logs"
)
type BaseController struct {
beego.Controller
}
func (d *BaseController) ResponseInfo(code int, msg interface{}, result interface{}) {
switch code {
case 500:
logs.Error(msg, result)
d.Data["json"] = result
case 200:
logs.Info(msg, result)
d.Data["json"] = result
}
d.ServeJSON()
}
package controllers
import "example.com/m/nm"
type NodeController struct {
BaseController
}
func (c *NodeController) SetNmSeed() {
c.ResponseInfo(200, "sign successful", "")
}
func (c *NodeController) GetNodeManager() {
manager := nm.GetNodeManager()
c.ResponseInfo(200, "sign successful", manager)
}
package db
import (
"example.com/m/log"
"fmt"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
var dbInstance *leveldb.DB
var err error
func init() {
// 打开或创建一个LevelDB数据库
dbInstance, err = leveldb.OpenFile("mydb", nil)
if err != nil {
log.Error("Leveldb open file failed: ", err)
}
defer func(dbInstance *leveldb.DB) {
err := dbInstance.Close()
if err != nil {
log.Error("Leveldb close file failed: ", err)
}
}(dbInstance)
}
func Put(key string, value []byte) error {
// 存储数据
err := dbInstance.Put([]byte(key), value, nil)
if err != nil {
log.Error("Leveldb put data failed:", err)
return err
}
return nil
}
func NewIterator() (iterator.Iterator, error) {
// 存储数据
iteratorRes := dbInstance.NewIterator(nil, nil)
if iteratorRes.Error() != nil {
log.Error("Leveldb new iterator failed:", err)
return nil, err
}
return iteratorRes, nil
}
func Get(key string) ([]byte, error) {
data, err := dbInstance.Get([]byte(key), nil)
if err != nil {
log.Error("Leveldb get data failed:", err)
return nil, err
}
fmt.Printf("Value: %s\n", data)
return data, nil
}
func Delete(key []byte) error {
err := dbInstance.Delete(key, nil)
if err != nil {
log.Error("Leveldb del data failed:", err)
return err
}
return nil
}
module example.com/m
go 1.19
require (
github.com/astaxie/beego v1.12.3
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1
github.com/docker/docker v24.0.7+incompatible
github.com/ethereum/go-ethereum v1.13.10
github.com/go-cmd/cmd v1.4.2
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/kardianos/service v1.2.2
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/odysseus-protocol v0.0.1
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
google.golang.org/grpc v1.60.1
)
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.1 // indirect
)
replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
This diff is collapsed.
package log
import (
"context"
"github.com/lestrrat-go/file-rotatelogs"
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
"os"
"path"
"time"
)
var (
mlog = logrus.New()
)
type LogConfig struct {
Save uint `json:"save"`
Path string `json:"path"`
Level string `json:"level"`
}
func InitLog(logConfig LogConfig) {
mlog.Out = os.Stdout
var loglevel logrus.Level
err := loglevel.UnmarshalText([]byte(logConfig.Level))
if err != nil {
mlog.Panicf("set log level failed: %v", err)
}
mlog.SetLevel(loglevel)
mlog.Formatter = &logrus.TextFormatter{FullTimestamp: true, TimestampFormat: "2006-01-2 15:04:05.000"}
localFilesystemLogger(mlog, logConfig.Path, logConfig.Save)
}
func logWriter(logPath string, level string, save uint) *rotatelogs.RotateLogs {
logFullPath := path.Join(logPath, level)
logwriter, err := rotatelogs.New(
logFullPath+".%Y%m%d",
rotatelogs.WithLinkName(logFullPath),
rotatelogs.WithRotationCount(save),
rotatelogs.WithRotationTime(24*time.Hour),
)
if err != nil {
panic(err)
}
return logwriter
}
func localFilesystemLogger(log *logrus.Logger, logPath string, save uint) {
lfHook := lfshook.NewHook(lfshook.WriterMap{
logrus.DebugLevel: logWriter(logPath, "debug", save), // 为不同级别设置不同的输出目的
logrus.InfoLevel: logWriter(logPath, "info", save),
logrus.WarnLevel: logWriter(logPath, "warn", save),
logrus.ErrorLevel: logWriter(logPath, "error", save),
logrus.FatalLevel: logWriter(logPath, "fatal", save),
logrus.PanicLevel: logWriter(logPath, "panic", save),
}, &logrus.TextFormatter{FullTimestamp: true, TimestampFormat: "2006-01-2 15:04:05.000"})
log.AddHook(lfHook)
}
// WithField allocates a new entry and adds a field to it.
// Debug, Print, Info, Warn, Error, Fatal or Panic must be then applied to
// this new returned entry.
// If you want multiple fields, use `WithFields`.
func WithField(key string, value interface{}) *logrus.Entry {
return mlog.WithField(key, value)
}
// Adds a struct of fields to the log entry. All it does is call `WithField` for
// each `Field`.
func WithFields(fields logrus.Fields) *logrus.Entry {
return mlog.WithFields(fields)
}
// Add an error as single field to the log entry. All it does is call
// `WithError` for the given `error`.
func WithError(err error) *logrus.Entry {
return mlog.WithError(err)
}
// Add a context to the log entry.
func WithContext(ctx context.Context) *logrus.Entry {
return mlog.WithContext(ctx)
}
// Overrides the time of the log entry.
func WithTime(t time.Time) *logrus.Entry {
return mlog.WithTime(t)
}
func Logf(level logrus.Level, format string, args ...interface{}) {
mlog.Logf(level, format, args...)
}
func Tracef(format string, args ...interface{}) {
mlog.Tracef(format, args...)
}
func Debugf(format string, args ...interface{}) {
mlog.Debugf(format, args...)
}
func Infof(format string, args ...interface{}) {
mlog.Infof(format, args...)
}
func Printf(format string, args ...interface{}) {
mlog.Printf(format, args...)
}
func Warnf(format string, args ...interface{}) {
mlog.Warnf(format, args...)
}
func Warningf(format string, args ...interface{}) {
mlog.Warningf(format, args...)
}
func Errorf(format string, args ...interface{}) {
mlog.Errorf(format, args)
}
func Fatalf(format string, args ...interface{}) {
mlog.Fatalf(format, args...)
}
func Panicf(format string, args ...interface{}) {
mlog.Panicf(format, args...)
}
func Log(level logrus.Level, args ...interface{}) {
mlog.Log(level, args...)
}
func LogFn(level logrus.Level, fn logrus.LogFunction) {
mlog.LogFn(level, fn)
}
func Trace(args ...interface{}) {
mlog.Trace(args...)
}
func Debug(args ...interface{}) {
mlog.Debug(args...)
}
func Info(args ...interface{}) {
mlog.Info(args...)
}
func Print(args ...interface{}) {
mlog.Print(args...)
}
func Warn(args ...interface{}) {
mlog.Warn(args...)
}
func Warning(args ...interface{}) {
mlog.Warning(args...)
}
func Error(args ...interface{}) {
mlog.Error(args...)
}
func Fatal(args ...interface{}) {
mlog.Fatal(args...)
}
func Panic(args ...interface{}) {
mlog.Panic(args...)
}
func TraceFn(fn logrus.LogFunction) {
mlog.TraceFn(fn)
}
func DebugFn(fn logrus.LogFunction) {
mlog.DebugFn(fn)
}
func InfoFn(fn logrus.LogFunction) {
mlog.InfoFn(fn)
}
func PrintFn(fn logrus.LogFunction) {
mlog.PrintFn(fn)
}
func WarnFn(fn logrus.LogFunction) {
mlog.WarnFn(fn)
}
func WarningFn(fn logrus.LogFunction) {
mlog.WarningFn(fn)
}
func ErrorFn(fn logrus.LogFunction) {
mlog.ErrorFn(fn)
}
func FatalFn(fn logrus.LogFunction) {
mlog.FatalFn(fn)
}
func PanicFn(fn logrus.LogFunction) {
mlog.PanicFn(fn)
}
func Logln(level logrus.Level, args ...interface{}) {
mlog.Logln(level, args...)
}
func Traceln(args ...interface{}) {
mlog.Traceln(args...)
}
func Debugln(args ...interface{}) {
mlog.Debugln(args...)
}
func Infoln(args ...interface{}) {
mlog.Infoln(args...)
}
func Println(args ...interface{}) {
mlog.Println(args...)
}
func Warnln(args ...interface{}) {
mlog.Warnln(args...)
}
func Warningln(args ...interface{}) {
mlog.Warningln(args...)
}
func Errorln(args ...interface{}) {
mlog.Errorln(args...)
}
func Fatalln(args ...interface{}) {
mlog.Fatalln(args...)
}
func Panicln(args ...interface{}) {
mlog.Panicln(args...)
}
package main
import (
"example.com/m/log"
"example.com/m/nm"
"github.com/astaxie/beego"
)
func main() {
log.InitLog(log.LogConfig{Path: "logs", Level: "debug", Save: 3})
nm.StartMonitor()
beego.Run()
}
package models
const (
AiPaint = "aipaint"
Chat = "chat"
Picture = "picture"
Language = "language"
ContainerSign = "container"
MinerSign = "miner"
ReqHash = "reqHash"
RespHash = "respHash"
)
package models
import (
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"sync"
)
type TaskCmd struct {
ImageName string `json:"image_name"`
DockerCmd *DockerCmd `json:"docker_cmd"`
ApiUrl string `json:"api_url"`
}
type DockerCmd struct {
ContainerPort string `json:"container_port"`
HostIp string
HostPort string
}
type TaskReq struct {
TaskId string `json:"task_id"`
}
type ContainerSignStruct struct {
Sign []byte `json:"sign"`
}
type ModelInfo struct {
TaskId uint64 `json:"task_id"`
User string `json:"user"`
Pwd string `json:"pwd"`
Repository string `json:"repository"`
SignUrl string `json:"sign_url"`
ImageName string `json:"image_name"`
DiskSize int64 `json:"disk_size"`
MemorySize int64 `json:"memory_size"`
IsImageExist bool `json:"is_image_delete"`
}
type ComputeResult struct {
Code string `json:"code"`
Msg string `json:"msg"`
Content string `json:"content"`
}
type NodeManagerClient struct {
mutex sync.Mutex
LastHeartTime int64
PublicKey string
Endpoint string
Client nodeManagerV1.NodeManagerServiceClient
Status bool
}
func (n *NodeManagerClient) GetLastHeartTime() int64 {
n.mutex.Lock()
defer n.mutex.Unlock()
return n.LastHeartTime
}
func (n *NodeManagerClient) UpdateLastHeartTime(time int64) {
n.mutex.Lock()
n.LastHeartTime = time
n.mutex.Unlock()
}
func (n *NodeManagerClient) GetStatus() bool {
n.mutex.Lock()
defer n.mutex.Unlock()
return n.Status
}
func (n *NodeManagerClient) UpdateStatus(status bool) {
n.mutex.Lock()
n.Status = status
n.mutex.Unlock()
}
package models
MANIFEST-000063
MANIFEST-000061
This diff is collapsed.
package nm
import (
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"fmt"
"github.com/docker/docker/libnetwork/bitmap"
nodemanagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
)
type WorkerMsgHandler func(params ...interface{}) *nodemanagerV1.WorkerMessage
type RespMsgHandler struct {
nodeManager *models.NodeManagerClient
workerClient nodemanagerV1.NodeManagerService_RegisterWorkerClient
handler WorkerMsgHandler
params []interface{}
}
type RespMsgWorker struct {
MsgPool chan *RespMsgHandler
}
func NewMsgRespWorker() *RespMsgWorker {
return &RespMsgWorker{
MsgPool: make(chan *RespMsgHandler, 0),
}
}
func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, workerClient nodemanagerV1.NodeManagerService_RegisterWorkerClient, handler WorkerMsgHandler, params []interface{}) {
o.MsgPool <- &RespMsgHandler{
nodeManager: nodeManager,
workerClient: workerClient,
handler: handler,
params: params,
}
}
func (o *RespMsgWorker) SendMsg() {
for {
select {
case pool := <-o.MsgPool:
{
workerMsg := pool.handler(pool.params...)
err := pool.workerClient.SendMsg(workerMsg)
if err != nil {
log.Error("Send heartbeat msg error:", err)
pool.nodeManager.UpdateStatus(false)
continue
}
log.Info("Worker client send message successfully")
}
}
}
}
func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Heartbeat response received params: ", params)
serverTimestamp := params[0].(uint64)
heartRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_HeartbeatResponse{
HeartbeatResponse: &nodemanagerV1.HeartbeatResponse{
Timestamp: serverTimestamp,
},
},
}
return heartRes
}
func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Heartbeat response received params: ", params)
taskIdIndex := params[0].(uint64)
b := bitmap.New(taskIdIndex)
binary, err := b.MarshalBinary()
if err != nil {
return nil
}
heartRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_ResourceMap{
ResourceMap: &nodemanagerV1.SubmitResourceMap{
ResourceMap: binary,
},
},
}
return heartRes
}
func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Device info response received params:", params)
devices := make([]*nodemanagerV1.DeviceInfo, 0)
//cpuInfos, err := cpu.Info()
//if err != nil {
// log.Error("Error getting CPU info: ", err)
// return nil
//}
//for i, cpuInfo := range cpuInfos {
// cpuInfo := &nodemanagerV1.DeviceInfo{
// DeviceType: fmt.Sprintf("cpu-%d", i),
// DeviceModel: cpuInfo.ModelName,
// DevicePower: 12,
// DeviceParam: strconv.FormatFloat(cpuInfo.Mhz, 'f', 2, 64),
// }
// devices = append(devices, cpuInfo)
//}
cpuInfo := &nodemanagerV1.DeviceInfo{
DeviceType: "cpu-0",
DeviceModel: "xl",
DevicePower: 12,
DeviceParam: "2150",
}
devices = append(devices, cpuInfo)
gpuInfo := &nodemanagerV1.DeviceInfo{
DeviceType: fmt.Sprint("gpu-0"),
DeviceModel: "Nvidia",
DevicePower: 12,
DeviceParam: "1200",
}
devices = append(devices, gpuInfo)
memInfo := &nodemanagerV1.DeviceInfo{
DeviceType: fmt.Sprint("mem-0"),
DeviceModel: "Micron",
DevicePower: 12,
DeviceParam: "1200",
}
devices = append(devices, memInfo)
deviceInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_DeviceInfo{
DeviceInfo: &nodemanagerV1.DeviceInfoResponse{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
Devices: devices,
DeviceSignature: []byte(""),
},
},
}
return deviceInfoRes
}
func DeviceUsageResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("DeviceUsageResp params :", params)
deviceInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_DeviceUsage{
DeviceUsage: &nodemanagerV1.DeviceUsageResponse{},
},
}
return deviceInfoRes
}
func StatusResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Status resp received params:", params)
statusRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_Status{
Status: &nodemanagerV1.StatusResponse{
DeviceStatus: []byte("0"),
},
},
}
return statusRes
}
func GoodbyeResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Goodbye resp received params:", params)
reason := ""
if len(params) > 0 {
reason = params[0].(string)
}
goodbyeMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_GoodbyeMessage{
GoodbyeMessage: &nodemanagerV1.GoodbyeMessage{
Reason: reason,
},
},
}
return goodbyeMsgRes
}
func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
//log.Info("Handler task submit result resp received params:", params)
taskId := params[0].(string)
containerSign := params[1].([]byte)
minerSign := params[2].([]byte)
taskResult := params[3].([]byte)
isSuccess := params[4].(bool)
submitResultMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskResult{
SubmitTaskResult: &nodemanagerV1.SubmitTaskResult{
TaskUuid: taskId,
ContainerSignature: containerSign,
MinerSignature: minerSign,
TaskResult: taskResult,
IsSuccessed: isSuccess,
},
},
}
return submitResultMsgRes
}
This diff is collapsed.
package nm
import (
"bytes"
"crypto/ecdsa"
cryptoRand "crypto/rand"
"encoding/json"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"fmt"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/groupcache/lru"
baseV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
"math/rand"
"net/http"
"strconv"
"sync"
"time"
)
type TaskHandler struct {
Wg *sync.WaitGroup
LruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskResp map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
}
func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
return &TaskHandler{
Wg: &sync.WaitGroup{},
LruCache: lru.New(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskResp: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
}
}
func (t *TaskHandler) HandlerTask(runCount int) {
for i := 0; i < runCount; i++ {
go func(t *TaskHandler) {
for {
select {
case taskMsg := <-t.TaskMsg:
{
switch taskMsg.TaskType {
case baseV1.TaskType_SystemTask:
{
//command := operate.GetSystemCommand(taskMsg.TaskCmd, taskMsg.TaskParam, taskMsg.TaskUuid+".sh")
t.SystemTaskHandler(taskMsg)
}
case baseV1.TaskType_ComputeTask:
{
t.ComputeTaskHandler(taskMsg)
}
case baseV1.TaskType_CustomTask:
{
t.CustomTaskHandler(taskMsg)
}
}
}
}
}
}(t)
}
}
func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
log.Info("received systemTask--------------------------------")
}
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.TaskResp[taskMsg.TaskUuid] = nil
t.TaskIsSuccess[taskMsg.TaskUuid] = false
reader := bytes.NewReader(taskMsg.TaskParam)
taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
return
}
// todo: 填充映射端口
var externalPort int64
for {
// 设置种子以确保每次运行时生成不同的随机数序列
rand.Seed(time.Now().UnixNano())
// 生成一个介于 0 和 100 之间的随机整数
externalPort = rand.Int63n(10001) + 10000
fmt.Println("t.DockerOp.UsedExternalPort[externalPort]:", t.DockerOp.UsedExternalPort[externalPort])
if t.DockerOp.UsedExternalPort[externalPort] {
continue
}
break
}
taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10)
images, err := t.DockerOp.PsImages()
if err != nil {
log.Error("Ps images failed:", err)
return
}
imageId := ""
for _, image := range images {
if image.RepoTags[0] == taskCmd.ImageName {
imageId = image.ID
break
}
log.Println(image.ID)
}
containers := t.DockerOp.ListContainer()
isImageRunExist := false
for _, container := range containers {
if container.ImageID == imageId {
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, container.Ports[0].PublicPort)
isImageRunExist = true
break
}
}
if !isImageRunExist {
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, externalPort)
if int64(len(containers)) == conf.GetConfig().ContainerNum {
//todo: 待定,需要根据权重去停止哪个容器
t.DockerOp.StopAndDeleteContainer(containers[0].ID)
}
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
return
}
log.Infof("Started container with ID %s", containerId)
}
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reader)
if err != nil {
log.Error("Http client post error: ", err)
return
}
if post.StatusCode == http.StatusOK {
readBody, err := io.ReadAll(post.Body)
if err != nil {
log.Error("received error: ", err)
return
}
res := &models.ComputeResult{}
err = json.Unmarshal(readBody, res)
if err != nil {
log.Error("received error: ", err)
return
}
if res.Code == "200" {
log.Info(string(readBody))
t.TaskResp[taskMsg.TaskUuid] = readBody
t.TaskIsSuccess[taskMsg.TaskUuid] = true
}
}
log.Info("received computeTask--------------------------------")
}
func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
if err != nil {
log.Error("custome task handler docker op ps images failed: ", err)
return
}
log.Info("received customTask--------------------------------")
}
func (t *TaskHandler) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
reqHash := crypto.Keccak256Hash(msg.TaskParam)
respHash := crypto.Keccak256Hash(taskResult)
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskUuid).Bytes(), reqHash.Bytes(), respHash.Bytes())
sign, err := ecdsa.SignASN1(cryptoRand.Reader, conf.GetConfig().SignPrivateKey, signHash.Bytes())
if err != nil {
log.Error("custom task handler")
}
return reqHash.Bytes(), respHash.Bytes(), sign
}
package operate
import (
"example.com/m/conf"
"example.com/m/log"
"github.com/docker/docker/client"
nodemanagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
witnessv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/witness/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func ConnNmGrpc(endpoint string) nodemanagerV1.NodeManagerServiceClient {
conn := connGrpc(endpoint)
if conn != nil {
return nodemanagerV1.NewNodeManagerServiceClient(conn)
}
return nil
}
func ConnValidatorGrpc(endpoint string) witnessv1.WitnessServiceClient {
conn := connGrpc(endpoint)
if conn != nil {
return witnessv1.NewWitnessServiceClient(conn)
}
return nil
}
func GetDockerClient() (*client.Client, error) {
dockerClient, err := client.NewClientWithOpts(client.WithAPIVersionNegotiation(), client.WithHost(conf.GetConfig().DockerServer))
if err != nil {
log.Error("Error create docker client: ", err)
return nil, err
}
return dockerClient, nil
}
func connGrpc(endpoint string) *grpc.ClientConn {
dial, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*1024),
grpc.MaxCallSendMsgSize(1024*1024*1024)),
)
if err != nil {
log.Error("Dial error:", err)
return nil
}
return dial
}
package operate
import (
"example.com/m/log"
"fmt"
"github.com/go-cmd/cmd"
"os/exec"
)
type Command struct {
cmd string
params []byte
scriptPath string
}
func GetSystemCommand(cmd string, params []byte) *Command {
return &Command{}
}
func (c *Command) ExecCommand(name string, param []string) {
// 创建一个新的 cmd.Cmd 结构体,表示要执行的 curl 命令
command := cmd.NewCmd(name, param...)
// 阻塞并等待命令执行完成
status := <-command.Start()
// 检查命令执行的状态
if status.Error != nil {
fmt.Println("Error executing command:", status.Error)
return
}
// 打印命令的标准输出
if len(status.Stdout) > 0 {
log.Info("Command output:", status.Stdout)
}
// 打印命令的标准错误
if len(status.Stderr) > 0 {
log.Error("Command error:", status.Stderr)
}
}
func (c *Command) ExecShell() {
// 使用 exec.Command 执行脚本文件
shellCmd := exec.Command("sh", c.scriptPath)
// 执行命令并捕获输出
output, err := shellCmd.CombinedOutput()
if err != nil {
log.Error("Error executing script:", err)
return
}
// 打印输出结果
log.Info("Script output:", string(output))
}
package operate
import (
"bytes"
"context"
"encoding/json"
"example.com/m/log"
"example.com/m/models"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/ethereum/go-ethereum/common"
nodemanagerv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
"net/http"
"os"
"time"
)
var httpClient *http.Client
type DockerOp struct {
IsHealthy bool
Reason string
dockerClient *client.Client
UsedExternalPort map[int64]bool
SignApi map[string]string
ModelsInfo []*models.ModelInfo
ModelTaskIdChan chan uint64
}
func init() {
httpClient = &http.Client{}
}
func NewDockerOp() *DockerOp {
dockerClient, err := GetDockerClient()
if err != nil {
return &DockerOp{
IsHealthy: false,
Reason: fmt.Sprintf("The connect docker client failed reason:%s", err.Error()),
}
}
return &DockerOp{
IsHealthy: true,
Reason: "",
dockerClient: dockerClient,
SignApi: make(map[string]string, 0),
ModelsInfo: make([]*models.ModelInfo, 0),
UsedExternalPort: make(map[int64]bool, 0),
ModelTaskIdChan: make(chan uint64, 0),
}
}
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage) []byte {
reqBody := &models.TaskReq{
TaskId: taskMsg.TaskUuid,
}
body, err := json.Marshal(reqBody)
if err != nil {
log.Error("Unable to marshal task info: ", err.Error())
return nil
}
taskCmd := &models.TaskCmd{}
err = json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
return nil
}
// TODO: 请求容器API
request, err := http.NewRequest("POST", d.SignApi[taskCmd.ImageName], bytes.NewReader(body))
if err != nil {
log.Error("New http request failed: ", err)
return nil
}
resp, err := httpClient.Do(request)
if err != nil {
log.Error("HTTP request failed: ", err)
return nil
}
// TODO: 解析Resp
all, err := io.ReadAll(resp.Body)
if err != nil {
log.Error("Failed to read docker response body:", err)
return nil
}
res := &models.ComputeResult{}
err = json.Unmarshal(all, res)
if err != nil {
log.Error("Failed to parse docker response body")
return nil
}
sign := res.Content
log.Info("Container sign:", sign)
// TODO: 返回签名
return common.Hex2Bytes(sign)
}
func (d *DockerOp) ContainerIsRunning(containerId string) bool {
inspectContainer := d.inspectContainer(containerId)
if inspectContainer == nil {
return false
}
return inspectContainer.State.Running
}
func (d *DockerOp) ListContainer() []types.Container {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
containers, err := d.dockerClient.ContainerList(ctx, types.ContainerListOptions{})
if err != nil {
log.Error("Get container list failed:", err)
return nil
}
for _, c := range containers {
t := time.Unix(c.Created, 0)
formattedTime := t.Format("2006-01-02 15:04:05")
log.Infof("ID: %s, Image: %s, Status: %s , CreateTime: %s \n", c.ID[:10], c.Image, c.Status, formattedTime)
}
return containers
}
func (d *DockerOp) CreateAndStartContainer(imageName string, dockerCmd *models.DockerCmd) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
containerId, err := d.CreateContainer(imageName, dockerCmd)
if err != nil {
log.Error("Error creating container image failed: ", err)
return "", err
}
// 启动容器
startContainerIsSuccess := d.StartContainer(containerId)
if !startContainerIsSuccess {
log.Error("start container failed:", startContainerIsSuccess)
}
statusCh, errCh := d.dockerClient.ContainerWait(ctx, containerId, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
panic(err)
}
case <-statusCh:
break
}
out, err := d.dockerClient.ContainerLogs(ctx, containerId, types.ContainerLogsOptions{ShowStdout: true})
if err != nil {
panic(err)
}
_, err = stdcopy.StdCopy(os.Stdout, os.Stderr, out)
if err != nil {
log.Error("std out put failed:", err)
return "", err
}
return containerId, nil
}
func (d *DockerOp) CreateContainer(imageName string, dockerCmd *models.DockerCmd) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
portBinds := []nat.PortBinding{
{
HostIP: dockerCmd.HostIp,
HostPort: dockerCmd.HostPort,
},
}
portMap := nat.PortMap{}
portMap = make(map[nat.Port][]nat.PortBinding, 0)
portMap[nat.Port(dockerCmd.ContainerPort+"/tcp")] = portBinds
resp, err := d.dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageName,
}, &container.HostConfig{
PortBindings: portMap,
AutoRemove: true, // 容器停止后自动删除
}, nil, nil, "")
if err != nil {
log.Error("Error creating container image failed: ", err)
return "", err
}
containerId := resp.ID
log.Info("Container created with ID:", containerId)
return containerId, nil
}
func (d *DockerOp) StartContainer(containerID string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 启动容器
if err := d.dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
log.Error("Start container failed:", err)
return false
}
log.Info("Container started successfully.")
return true
}
func (d *DockerOp) StopContainer(containerID string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 停止容器(如果正在运行)
if err := d.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil {
// 可能容器已经停止或不存在
log.Info("Error stopping container:", err)
return false
}
log.Info("Container stopped successfully.")
return true
}
func (d *DockerOp) StopAndDeleteContainer(containerID string) bool {
// 停止容器(如果正在运行)
stopContainer := d.StopContainer(containerID)
if stopContainer {
d.RmContainer(containerID)
}
log.Info("Container stopped successfully.")
return true
}
func (d *DockerOp) RmContainer(containerID string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 删除容器
if err := d.dockerClient.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{}); err != nil {
panic(err)
}
log.Info("Container deleted successfully.")
}
func (d *DockerOp) PsImages() ([]types.ImageSummary, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
images, err := d.dockerClient.ImageList(ctx, types.ImageListOptions{})
if err != nil {
return nil, err
}
for _, c := range images {
log.Infof("%s %s\n", c.ID[:10], c.ParentID)
}
return images, nil
}
func (d *DockerOp) PsImageNameMap() (map[string]bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
images, err := d.dockerClient.ImageList(ctx, types.ImageListOptions{})
if err != nil {
return nil, err
}
res := make(map[string]bool, 0)
for _, image := range images {
res[image.RepoTags[0]] = true
}
return res, nil
}
func (d *DockerOp) PullImage(info *models.ModelInfo) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
out, err := d.dockerClient.ImagePull(ctx, info.ImageName, types.ImagePullOptions{})
if err != nil {
log.Errorf("Error pulling image from %s: %v", info.ImageName, err)
return
}
defer func(out io.ReadCloser) {
err := out.Close()
if err != nil {
log.Error("io close failed:", err)
}
}(out)
// 读取拉取镜像的输出
_, err = io.Copy(os.Stdout, out)
if err != nil {
log.Error("pull image info and read failed:", err)
return
}
//_, err = stdcopy.StdCopy(os.Stderr, os.Stdout, out)
//if err != nil {
// log.Error("pull image info and read failed:", err)
// return
//}
log.Info("Image pulled successfully.")
}
func (d *DockerOp) RmImage(imageId string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 删除镜像
_, err := d.dockerClient.ImageRemove(ctx, imageId, types.ImageRemoveOptions{})
if err != nil {
panic(err)
}
log.Info("Image deleted successfully.")
}
func (d *DockerOp) inspectContainer(containerId string) *types.ContainerJSON {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 容器信息
containerJson, err := d.dockerClient.ContainerInspect(ctx, containerId)
if err != nil {
log.Error("Container inspect failed: ", err)
return nil
}
log.Info("Image deleted successfully.")
return &containerJson
}
package routers
import (
"example.com/m/controllers"
"github.com/astaxie/beego"
)
func init() {
beego.Router("/llm/test/get/sign", &controllers.NodeController{}, "post:GetContainerSign")
}
package test
import (
"example.com/m/db"
"reflect"
"testing"
)
func TestGet(t *testing.T) {
type args struct {
key string
}
var tests []struct {
name string
args args
want []byte
wantErr bool
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := db.Get(tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Get() got = %v, want %v", got, tt.want)
}
})
}
}
func TestPut(t *testing.T) {
type args struct {
key string
value []byte
}
var tests []struct {
name string
args args
wantErr bool
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := db.Put(tt.args.key, tt.args.value); (err != nil) != tt.wantErr {
t.Errorf("Put() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
package test
import (
"encoding/json"
"example.com/m/models"
"example.com/m/nm"
"example.com/m/operate"
"fmt"
"github.com/golang/groupcache/lru"
basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"net/http"
"sync"
"testing"
)
func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
type fields struct {
wg *sync.WaitGroup
lruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskResp map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
}
type args struct {
taskMsg *nodeManagerV1.PushTaskMessage
}
m := &models.TaskCmd{
ImageName: "llm-server:latest",
DockerCmd: &models.DockerCmd{
ContainerPort: "8888",
HostIp: "0.0.0.0",
HostPort: "",
},
ApiUrl: "http://192.168.1.120:%d/llm/test/get/sign",
}
marshal, err := json.Marshal(m)
if err != nil {
fmt.Println("Error marshalling:", err)
return
}
taskParam := &models.TaskReq{
TaskId: "22222",
}
taskParamBytes, err := json.Marshal(taskParam)
if err != nil {
fmt.Println("Error marshalling:", err)
return
}
n := args{
taskMsg: &nodeManagerV1.PushTaskMessage{
TaskUuid: "1111",
TaskType: basev1.TaskType_ComputeTask,
Workload: 111,
TaskCmd: string(marshal),
TaskParam: taskParamBytes,
},
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
{
"test send task",
fields{
wg: &sync.WaitGroup{},
lruCache: lru.New(100),
DockerOp: operate.NewDockerOp(),
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskResp: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
},
n,
},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &nm.TaskHandler{
Wg: tt.fields.wg,
LruCache: tt.fields.lruCache,
DockerOp: tt.fields.DockerOp,
TaskMsg: tt.fields.TaskMsg,
TaskResp: tt.fields.TaskResp,
TaskIsSuccess: tt.fields.TaskIsSuccess,
HttpClient: tt.fields.HttpClient,
}
tt.fields.wg.Add(1)
t.ComputeTaskHandler(tt.args.taskMsg)
})
}
}
package validator
import (
"context"
"encoding/json"
"example.com/m/conf"
"example.com/m/db"
"example.com/m/log"
"example.com/m/operate"
witnessV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/witness/v1"
"google.golang.org/grpc"
"math/rand"
"time"
)
type ProofWorker struct {
lastCommitTime time.Time
productProofChan chan *witnessV1.Proof
consumeProofChan chan []*witnessV1.Proof
isCommitProof map[string]bool
}
func NewProofWorker() *ProofWorker {
return &ProofWorker{
lastCommitTime: time.Now(),
productProofChan: make(chan *witnessV1.Proof, 0),
consumeProofChan: make(chan []*witnessV1.Proof, 0),
}
}
func (p *ProofWorker) ProductProof(taskId string, workLoad uint64, reqHash []byte, respHash []byte, containerSign, minerSign, nmSign []byte) {
p.productProofChan <- &witnessV1.Proof{
Workload: workLoad,
TaskId: taskId,
ReqHash: reqHash,
RespHash: respHash,
ContainerSignature: containerSign,
MinerSignature: minerSign,
ManagerSignature: nmSign,
}
}
func (p *ProofWorker) ProofStorage() {
// 定义区间
min := 40
max := 59
go func(productProofChan chan *witnessV1.Proof) {
for {
select {
case proof := <-productProofChan:
{
proofByte, err := json.Marshal(proof)
if err != nil {
log.Error("Failed to marshal proof: ", err)
return
}
err = db.Put(proof.TaskId, proofByte)
if err != nil {
log.Error("leveldb put proof failed: ", err)
return
}
p.isCommitProof[proof.TaskId] = false
}
}
}
}(p.productProofChan)
for {
// todo: 每个小时的提交时间,应该随机
since := time.Since(p.lastCommitTime)
if since.Hours() == conf.GetConfig().TaskValidatorTime {
nowTime := time.Now()
rand.Seed(nowTime.UnixNano())
// 生成在 [min, max] 范围内的随机整数
randomNumber := rand.Intn(max-min+1) + min
if nowTime.Minute() > min && nowTime.Minute() < randomNumber {
proofs := make([]*witnessV1.Proof, 0)
// TODO: 取出数据并且消费
iter, err := db.NewIterator()
if err != nil {
log.Error("db new iterator failed: ", err)
continue
}
// todo: 数据堆积越多,可能循环的次数越多,对性能有影响
for iter.Next() {
proof := &witnessV1.Proof{}
err := json.Unmarshal(iter.Value(), proof)
if err != nil {
log.Error("Error parsing proof from database: ", err)
return
}
if p.isCommitProof[proof.TaskId] {
continue
}
p.isCommitProof[proof.TaskId] = true
proofs = append(proofs, proof)
//err = db.Delete(iter.Key())
//if err != nil {
// log.Error("Error deleting proof from database: ", err)
// return
//}
}
p.lastCommitTime = nowTime
p.consumeProofChan <- proofs
}
}
}
}
func (p *ProofWorker) CommitWitness() {
validatorClient := operate.ConnValidatorGrpc("")
for {
select {
case proofs := <-p.consumeProofChan:
proofsReq := &witnessV1.PushProofRequest{
Proofs: proofs,
MinerAddress: conf.GetConfig().SignPublicAddress.Hex(),
RewardAddress: conf.GetConfig().BenefitAddress,
}
pushProof, err := validatorClient.PushProof(context.Background(), proofsReq, grpc.EmptyCallOption{})
if err != nil {
log.Error("Push proof failed :", err)
return
}
workload := pushProof.GetWorkload()
log.Info("Push proof response received : %v", workload)
}
}
}
package win
import (
"example.com/m/log"
"github.com/kardianos/service"
"os"
"time"
)
type program struct{}
func Run() {
srvConfig := &service.Config{
Name: "NodeService",
DisplayName: "NodeServiceRun",
Description: "The service is miner for node",
}
prg := &program{}
s, err := service.New(prg, srvConfig)
if err != nil {
log.Error("New service failed: ", err)
return
}
if len(os.Args) > 1 {
serviceAction := os.Args[1]
switch serviceAction {
case "install":
err := s.Install()
if err != nil {
log.Error("安装服务失败: ", err.Error())
} else {
log.Info("安装服务成功")
}
return
case "uninstall":
err := s.Uninstall()
if err != nil {
log.Error("卸载服务失败: ", err.Error())
} else {
log.Info("卸载服务成功")
}
return
case "start":
err := s.Start()
if err != nil {
log.Error("运行服务失败: ", err.Error())
} else {
log.Info("运行服务成功")
}
return
case "stop":
err := s.Stop()
if err != nil {
log.Error("停止服务失败: ", err.Error())
} else {
log.Info("停止服务成功")
}
return
}
}
}
func (p *program) Start(s service.Service) error {
log.Info("service start...")
go p.run()
return nil
}
func (p *program) run() {
for {
time.Sleep(time.Second)
log.Info("running")
}
}
func (p *program) Stop(s service.Service) error {
err := s.Stop()
if err != nil {
log.Error("service stop...", err.Error())
return err
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment