Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
P
power-node
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Odysseus
power-node
Commits
bdd9c319
Commit
bdd9c319
authored
Feb 03, 2024
by
duanjinfei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update handler task res
parent
44aa3bda
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
98 additions
and
91 deletions
+98
-91
task_msg.go
nm/task_msg.go
+98
-91
No files found.
nm/task_msg.go
View file @
bdd9c319
...
...
@@ -2,16 +2,23 @@ package nm
import
(
"bytes"
"encoding/json"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/groupcache/lru"
baseV1
"github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
nodeManagerV1
"github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
"math/rand"
"net/http"
"strconv"
"sync"
"time"
)
type
TaskHandler
struct
{
...
...
@@ -78,98 +85,98 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func
(
t
*
TaskHandler
)
ComputeTaskHandler
(
taskMsg
*
nodeManagerV1
.
PushTaskMessage
)
{
defer
t
.
Wg
.
Done
()
t
.
TaskRespBody
[
taskMsg
.
TaskUuid
]
=
[]
byte
{}
t
.
TaskRespHeader
[
taskMsg
.
TaskUuid
]
=
[]
byte
{}
t
.
TaskRespBody
[
taskMsg
.
TaskUuid
]
=
nil
t
.
TaskRespHeader
[
taskMsg
.
TaskUuid
]
=
nil
t
.
TaskExecTime
[
taskMsg
.
TaskUuid
]
=
0
t
.
TaskIsSuccess
[
taskMsg
.
TaskUuid
]
=
tru
e
//
reader := bytes.NewReader(taskMsg.TaskParam)
//
taskCmd := &models.TaskCmd{}
//
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
//
if err != nil {
//
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
//
return
//
}
//
//
images, err := t.DockerOp.PsImages()
//
if err != nil {
//
log.Error("Ps images failed:", err)
//
return
//
}
//
imageId := ""
//
isFound := false
//
for _, image := range images {
//
if isFound {
//
break
//
}
//
for _, tag := range image.RepoTags {
//
if tag == taskCmd.ImageName {
//
imageId = image.ID
//
isFound = true
//
break
//
}
//
}
//
log.Println(image.ID)
//
}
//
containers := t.DockerOp.ListContainer()
//
isImageRunExist := false
//
for _, container := range containers {
//
if container.ImageID == imageId {
//
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, container.Ports[0].PublicPort)
//
isImageRunExist = true
//
break
//
}
//
}
//
if !isImageRunExist {
//
var externalPort int64
//
for {
//
// 设置种子以确保每次运行时生成不同的随机数序列
//
rand.Seed(time.Now().UnixNano())
//
// 生成一个介于 0 和 100 之间的随机整数
//
externalPort = rand.Int63n(10001) + 10000
//
log.Info("DockerOp UsedExternalPort :", t.DockerOp.UsedExternalPort[externalPort])
//
if t.DockerOp.UsedExternalPort[externalPort] {
//
continue
//
}
//
break
//
}
//
taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10)
//
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, externalPort)
//
if int64(len(containers)) == conf.GetConfig().ContainerNum {
//
//todo: 待定,需要根据权重去停止哪个容器
//
t.DockerOp.StopAndDeleteContainer(containers[0].ID)
//
}
//
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
//
if err != nil {
//
log.Errorf("Create and start container failed: %s", err.Error())
//
return
//
}
//
log.Infof("Started container with ID %s", containerId)
//
}
//
//
startBeforeTaskTime := time.Now()
//
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reader)
//
if err != nil {
//
log.Error("Http client post error: ", err)
//
return
//
}
//
endAfterTaskTime := time.Since(startBeforeTaskTime)
//
log.WithField("time", endAfterTaskTime.Seconds()).Info("Exec task end (second is units) :")
//
if post.StatusCode == http.StatusOK {
//
headers, err := json.Marshal(post.Header)
//
if err != nil {
//
log.Error("JSON marshal header error: ", err)
//
return
//
}
//
readBody, err := io.ReadAll(post.Body)
//
if err != nil {
//
log.Error("received error: ", err)
//
return
//
}
//
t.TaskRespHeader[taskMsg.TaskUuid] = headers
//
t.TaskRespBody[taskMsg.TaskUuid] = readBody
//
t.TaskIsSuccess[taskMsg.TaskUuid] = true
//
t.TaskExecTime[taskMsg.TaskUuid] = endAfterTaskTime.Microseconds()
//
}
t
.
TaskIsSuccess
[
taskMsg
.
TaskUuid
]
=
fals
e
reader
:=
bytes
.
NewReader
(
taskMsg
.
TaskParam
)
taskCmd
:=
&
models
.
TaskCmd
{}
err
:=
json
.
Unmarshal
(
bytes
.
NewBufferString
(
taskMsg
.
TaskCmd
)
.
Bytes
(),
taskCmd
)
if
err
!=
nil
{
log
.
Errorf
(
"failed to unmarshal task cmd: %s"
,
err
.
Error
())
return
}
images
,
err
:=
t
.
DockerOp
.
PsImages
()
if
err
!=
nil
{
log
.
Error
(
"Ps images failed:"
,
err
)
return
}
imageId
:=
""
isFound
:=
false
for
_
,
image
:=
range
images
{
if
isFound
{
break
}
for
_
,
tag
:=
range
image
.
RepoTags
{
if
tag
==
taskCmd
.
ImageName
{
imageId
=
image
.
ID
isFound
=
true
break
}
}
log
.
Println
(
image
.
ID
)
}
containers
:=
t
.
DockerOp
.
ListContainer
()
isImageRunExist
:=
false
for
_
,
container
:=
range
containers
{
if
container
.
ImageID
==
imageId
{
taskCmd
.
ApiUrl
=
fmt
.
Sprintf
(
taskCmd
.
ApiUrl
,
container
.
Ports
[
0
]
.
PublicPort
)
isImageRunExist
=
true
break
}
}
if
!
isImageRunExist
{
var
externalPort
int64
for
{
// 设置种子以确保每次运行时生成不同的随机数序列
rand
.
Seed
(
time
.
Now
()
.
UnixNano
())
// 生成一个介于 0 和 100 之间的随机整数
externalPort
=
rand
.
Int63n
(
10001
)
+
10000
log
.
Info
(
"DockerOp UsedExternalPort :"
,
t
.
DockerOp
.
UsedExternalPort
[
externalPort
])
if
t
.
DockerOp
.
UsedExternalPort
[
externalPort
]
{
continue
}
break
}
taskCmd
.
DockerCmd
.
HostPort
=
strconv
.
FormatInt
(
externalPort
,
10
)
taskCmd
.
ApiUrl
=
fmt
.
Sprintf
(
taskCmd
.
ApiUrl
,
externalPort
)
if
int64
(
len
(
containers
))
==
conf
.
GetConfig
()
.
ContainerNum
{
//todo: 待定,需要根据权重去停止哪个容器
t
.
DockerOp
.
StopAndDeleteContainer
(
containers
[
0
]
.
ID
)
}
containerId
,
err
:=
t
.
DockerOp
.
CreateAndStartContainer
(
taskCmd
.
ImageName
,
taskCmd
.
DockerCmd
)
if
err
!=
nil
{
log
.
Errorf
(
"Create and start container failed: %s"
,
err
.
Error
())
return
}
log
.
Infof
(
"Started container with ID %s"
,
containerId
)
}
startBeforeTaskTime
:=
time
.
Now
()
post
,
err
:=
t
.
HttpClient
.
Post
(
taskCmd
.
ApiUrl
,
"application/json"
,
reader
)
if
err
!=
nil
{
log
.
Error
(
"Http client post error: "
,
err
)
return
}
endAfterTaskTime
:=
time
.
Since
(
startBeforeTaskTime
)
log
.
WithField
(
"time"
,
endAfterTaskTime
.
Seconds
())
.
Info
(
"Exec task end (second is units) :"
)
if
post
.
StatusCode
==
http
.
StatusOK
{
headers
,
err
:=
json
.
Marshal
(
post
.
Header
)
if
err
!=
nil
{
log
.
Error
(
"JSON marshal header error: "
,
err
)
return
}
readBody
,
err
:=
io
.
ReadAll
(
post
.
Body
)
if
err
!=
nil
{
log
.
Error
(
"received error: "
,
err
)
return
}
t
.
TaskRespHeader
[
taskMsg
.
TaskUuid
]
=
headers
t
.
TaskRespBody
[
taskMsg
.
TaskUuid
]
=
readBody
t
.
TaskIsSuccess
[
taskMsg
.
TaskUuid
]
=
true
t
.
TaskExecTime
[
taskMsg
.
TaskUuid
]
=
endAfterTaskTime
.
Microseconds
()
}
log
.
Info
(
"received computeTask--------------------------------"
)
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment