Commit 54f6d8e6 authored by vicotor's avatar vicotor

add mongo

parent 2ae53176
...@@ -33,4 +33,4 @@ demoworker: ...@@ -33,4 +33,4 @@ demoworker:
clean: clean:
rm -fr build/* rm -fr build/*
docker: docker:
docker build -t manager:${TAG} . docker build --no-cache -t manager:${TAG} .
...@@ -9,6 +9,7 @@ worker_signature_expired_time=1 ...@@ -9,6 +9,7 @@ worker_signature_expired_time=1
[mongodb] [mongodb]
url="mongodb://0.0.0.0:27017" url="mongodb://0.0.0.0:27017"
database="ai"
user="admin" user="admin"
password="admin" password="admin"
......
...@@ -30,9 +30,10 @@ type RedisConfig struct { ...@@ -30,9 +30,10 @@ type RedisConfig struct {
} }
type MongoDbConfig struct { type MongoDbConfig struct {
Url string `json:"url" toml:"url"` Url string `json:"url" toml:"url"`
User string `json:"user" toml:"user"` Database string `json:"database" toml:"database"`
Passwd string `json:"password" toml:"password"` User string `json:"user" toml:"user"`
Passwd string `json:"password" toml:"password"`
} }
type TickerConfig struct { type TickerConfig struct {
......
...@@ -200,3 +200,7 @@ func (n *Node) PayForFee(uid int64, fee int64) error { ...@@ -200,3 +200,7 @@ func (n *Node) PayForFee(uid int64, fee int64) error {
func (n *Node) Cache() *cachedata.CacheData { func (n *Node) Cache() *cachedata.CacheData {
return n.cache return n.cache
} }
func (n *Node) Mongo() *mongo.Client {
return n.mogo
}
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/odysseus/service-registry/registry" "github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
"math/big" "math/big"
"regexp" "regexp"
...@@ -54,6 +55,7 @@ type NodeInterface interface { ...@@ -54,6 +55,7 @@ type NodeInterface interface {
Sign(hash []byte) ([]byte, error) Sign(hash []byte) ([]byte, error)
Cache() *cachedata.CacheData Cache() *cachedata.CacheData
PayForFee(uid int64, fee int64) error PayForFee(uid int64, fee int64) error
Mongo() *mongo.Client
} }
type WorkerManager struct { type WorkerManager struct {
...@@ -68,8 +70,8 @@ type WorkerManager struct { ...@@ -68,8 +70,8 @@ type WorkerManager struct {
wkRwLock sync.RWMutex wkRwLock sync.RWMutex
quit chan struct{} quit chan struct{}
node NodeInterface node NodeInterface
std *standardlib.StandardTasks std *standardlib.StandardTasks
workerInfoOperator *operator.WorkerInfoOperator workerInfoOperator *operator.WorkerInfoOperator
workerInstalledOperator *operator.WorkerInstalledOperator workerInstalledOperator *operator.WorkerInstalledOperator
workerRunningOperator *operator.WorkerRunningOperator workerRunningOperator *operator.WorkerRunningOperator
...@@ -77,14 +79,17 @@ type WorkerManager struct { ...@@ -77,14 +79,17 @@ type WorkerManager struct {
func NewWorkerManager(rdb *redis.Client, node NodeInterface) *WorkerManager { func NewWorkerManager(rdb *redis.Client, node NodeInterface) *WorkerManager {
return &WorkerManager{ return &WorkerManager{
heartBeat: make(map[int64]int64), heartBeat: make(map[int64]int64),
workerReg: make(map[int64]*registry.Registry), workerReg: make(map[int64]*registry.Registry),
workers: make(map[int64]*Worker), workers: make(map[int64]*Worker),
workid: make(map[string]*Worker), workid: make(map[string]*Worker),
quit: make(chan struct{}), quit: make(chan struct{}),
rdb: rdb, rdb: rdb,
node: node, node: node,
std: standardlib.NewStandardTasks(), std: standardlib.NewStandardTasks(),
workerInfoOperator: operator.NewDBWorker(node.Mongo(), config.GetConfig().Mongo.Database),
workerInstalledOperator: operator.NewDBWorkerInstalled(node.Mongo(), config.GetConfig().Mongo.Database),
workerRunningOperator: operator.NewDBWorkerRunning(node.Mongo(), config.GetConfig().Mongo.Database),
} }
} }
......
...@@ -7,6 +7,8 @@ import ( ...@@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/odysseus/mogo/operator"
"github.com/odysseus/mogo/types"
"github.com/odysseus/nodemanager/config" "github.com/odysseus/nodemanager/config"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv" "strconv"
...@@ -76,6 +78,16 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { ...@@ -76,6 +78,16 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
} }
} }
} }
_, err := wm.workerInfoOperator.InsertWorker(context.Background(), &operator.WorkerInfo{
WorkerId: worker.WorkerAccount().String(),
NodeInfo: types.PbToNodeInfo(worker.info.nodeInfo.Info),
Models: types.PbToModelInfo(worker.info.nodeInfo.Models),
Hardware: types.PbToHardwareInfo(worker.info.nodeInfo.Hardware),
})
if err != nil {
log.WithError(err).Error("insert worker info failed")
return err
}
return nil return nil
} }
...@@ -185,6 +197,11 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) { ...@@ -185,6 +197,11 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.Del(context.Background(), workerBootedResourceInfoKey(worker)) wm.rdb.Del(context.Background(), workerBootedResourceInfoKey(worker))
if worker.info.nodeInfo != nil { if worker.info.nodeInfo != nil {
wm.rmWorkerFromSets(worker, worker.info.nodeInfo.Info.BenefitAddress) wm.rmWorkerFromSets(worker, worker.info.nodeInfo.Info.BenefitAddress)
n, err := wm.workerInfoOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
if err != nil {
log.WithError(err).Error("delete worker info failed")
}
log.Debugf("delete worker info count %d", n)
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment