Commit 3772cd82 authored by duanjinfei's avatar duanjinfei

Merge branch 'master' into test

parents 761e2641 d7b442d3
package models
import (
"github.com/golang/groupcache/lru"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"sync"
"time"
......@@ -134,3 +135,37 @@ func (n *NodeManagerClient) UpdateStatus(status bool) {
defer n.mutex.Unlock()
n.Status = status
}
// SafeLruCache is a thread-safe wrapper around lru.Cache
type SafeLruCache struct {
mu sync.Mutex
cache *lru.Cache
}
// NewSafeLruCache creates a new SafeLruCache
func NewSafeLruCache(maxEntries int) *SafeLruCache {
return &SafeLruCache{
cache: lru.New(maxEntries),
}
}
// Add adds a value to the cache
func (sc *SafeLruCache) Add(key lru.Key, value interface{}) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.cache.Add(key, value)
}
// Get retrieves a value from the cache
func (sc *SafeLruCache) Get(key lru.Key) (interface{}, bool) {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.cache.Get(key)
}
// Remove removes a value from the cache
func (sc *SafeLruCache) Remove(key lru.Key) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.cache.Remove(key)
}
......@@ -85,13 +85,13 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("worker:%s-%s-%s", conf.GetConfig().SignPublicAddress.Hex(), "Task exec error", taskExecRes.TaskExecError)
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
_, _, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
taskResultParams := utils.BuildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResultResp, taskResultParams)
log.Info("--------------taskMsg--------------:", taskMsg)
}(n.msgRespWorker, n.taskMsgWorker, taskMsg)
......
......@@ -11,7 +11,6 @@ import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/groupcache/lru"
baseV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
......@@ -27,7 +26,7 @@ import (
type TaskWorker struct {
Wg *sync.WaitGroup
LruCache *lru.Cache
LruCache *models.SafeLruCache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
......@@ -53,7 +52,7 @@ type TaskOp struct {
func NewTaskWorker(op *operate.DockerOp) *TaskWorker {
return &TaskWorker{
Wg: &sync.WaitGroup{},
LruCache: lru.New(100),
LruCache: models.NewSafeLruCache(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
IsExecAiTask: false,
......@@ -271,17 +270,22 @@ func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) {
func (t *TaskWorker) foundImageIsRunning(imageName string) (bool, string) {
containers := t.DockerOp.ListContainer()
netWorkInfoArr := make([]string, 0)
for _, container := range containers {
if container.Image == imageName && container.State == "running" {
networks := container.NetworkSettings.Networks
ip := ""
for _, endPoint := range networks {
ip = endPoint.IPAddress
log.Warn("Container network ip:", ip)
netWorkInfoArr = append(netWorkInfoArr, endPoint.IPAddress)
}
return true, ip
}
}
if len(netWorkInfoArr) > 0 {
rand.Seed(time.Now().UnixNano())
randomIndex := rand.Intn(len(netWorkInfoArr))
ip := netWorkInfoArr[randomIndex]
log.Warn("Container network ip:", ip)
return true, ip
}
return false, ""
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment