Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
P
power-node
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Odysseus
power-node
Commits
fbce8155
Commit
fbce8155
authored
Apr 17, 2024
by
duanjinfei
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' into test
parents
876eff65
44c177e8
Changes
12
Show whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
219 additions
and
126 deletions
+219
-126
.gitignore
.gitignore
+1
-1
rootcmd.go
cmd/rootcmd.go
+20
-35
config.go
conf/config.go
+28
-13
config.json
config.json
+1
-2
db.go
db/db.go
+1
-1
model_handler.go
largeModel/model_handler.go
+2
-1
node_manager.go
models/node_manager.go
+2
-0
monitor.go
nm/monitor.go
+2
-2
msg_handler.go
nm/msg_handler.go
+20
-16
msg_resp.go
nm/msg_resp.go
+56
-12
task_handler.go
nm/task_handler.go
+73
-41
docker.go
operate/docker.go
+13
-2
No files found.
.gitignore
View file @
fbce8155
.idea
logs
*.DS_Store
mydb
data
keystore
powerNode
\ No newline at end of file
cmd/rootcmd.go
View file @
fbce8155
package
main
import
(
"encoding/json"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/nm"
"example.com/m/utils"
"fmt"
"github.com/astaxie/beego"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/fsnotify/fsnotify"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"io/ioutil"
"os"
)
...
...
@@ -75,43 +71,32 @@ func initConfig() {
viper
.
AutomaticEnv
()
// 读取配置文件
if
err
:=
viper
.
ReadInConfig
();
err
!=
nil
{
fmt
.
Println
(
"Error reading config file:"
,
err
)
return
}
viper
.
WatchConfig
()
configFilePath
:=
viper
.
ConfigFileUsed
()
if
configFilePath
==
""
{
// handle error
log
.
Error
(
"config file path is empty"
)
panic
(
"config file path is empty"
)
viper
.
OnConfigChange
(
func
(
e
fsnotify
.
Event
)
{
// 配置文件发生变更之后会调用的回调函数
log
.
Warn
(
"The configuration file has been modified..........."
)
err
:=
viper
.
Unmarshal
(
conf
.
GetConfig
())
if
err
!=
nil
{
log
.
WithError
(
err
)
.
Error
(
"Viper unmarshal cfg error:"
)
panic
(
fmt
.
Errorf
(
"Viper unmarshal conf failed, err:%s
\n
"
,
err
))
}
conf
.
GetConfig
()
.
UpdateFiledInfo
()
log
.
Info
(
"Config file changed success:"
,
e
.
Name
)
})
data
,
err
:=
ioutil
.
ReadFile
(
configFilePath
)
if
err
!=
nil
{
// handle error
log
.
Error
(
"Read cfg file error:"
,
err
)
panic
(
"Read cfg file error"
)
// 读取配置文件
if
err
:=
viper
.
ReadInConfig
();
err
!=
nil
{
panic
(
fmt
.
Errorf
(
"Error reading config file: %s "
,
err
.
Error
()))
}
err
=
json
.
Unmarshal
(
data
,
conf
.
GetConfig
())
err
:=
viper
.
Unmarshal
(
conf
.
GetConfig
())
if
err
!=
nil
{
// handle error
log
.
Error
(
"Json unmarshal cfg error:"
,
err
)
panic
(
"Json unmarshal cfg error"
)
}
conf
.
GetConfig
()
.
HeartRespTimeMillis
=
conf
.
GetConfig
()
.
HeartRespTimeSecond
*
60
*
60
*
1000
prvKey
,
err
:=
utils
.
GetPrv
()
if
err
!=
nil
{
panic
(
"get prv error or delete keystore after restart"
)
log
.
Error
(
"Viper unmarshal cfg error:"
,
err
)
panic
(
"Viper unmarshal cfg error"
)
}
conf
.
GetConfig
()
.
SignPrivateKey
=
prvKey
ecdsaPub
:=
prvKey
.
PublicKey
conf
.
GetConfig
()
.
SignPub
=
common
.
Bytes2Hex
(
crypto
.
FromECDSAPub
(
&
ecdsaPub
))
log
.
Info
(
"PublicKey"
,
conf
.
GetConfig
()
.
SignPub
)
publicAddr
:=
crypto
.
PubkeyToAddress
(
ecdsaPub
)
log
.
Info
(
"publicAddr:"
,
publicAddr
)
conf
.
GetConfig
()
.
SignPublicAddress
=
publicAddr
conf
.
GetConfig
()
.
UpdateFiledInfo
()
}
func
Execute
()
{
...
...
conf/config.go
View file @
fbce8155
...
...
@@ -3,8 +3,10 @@ package conf
import
(
"crypto/ecdsa"
"example.com/m/log"
"example.com/m/utils"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"net/url"
)
...
...
@@ -17,19 +19,17 @@ type Config struct {
ExternalIp
string
SignPublicAddress
common
.
Address
SignPrivateKey
*
ecdsa
.
PrivateKey
NmSeed
string
`json:"nm_seed"`
HeartRespTimeSecond
int64
`json:"heart_response"`
TaskValidatorTime
float64
`json:"task_validator_time"`
ContainerNum
int64
`json:"container_num"`
NodeManagerNum
int64
`json:"node_manager_num"`
ChainID
int64
`json:"chain_id"`
ApiUrl
string
`json:"api_url"`
ValidatorUrl
string
`json:"validator_url"`
OssUrl
string
`json:"oss_url"`
WaitLastTaskExecTime
int64
`json:"wait_last_task_exec_time"`
OpSys
string
`json:"op_sys"`
ReplicateImageNameSuffix
string
`json:"replicate_image_name_suffix"`
IsStopLastContainer
bool
`json:"is_stop_last_container"`
NmSeed
string
`json:"nm_seed" mapstructure:"nm_seed"`
HeartRespTimeSecond
int64
`json:"heart_response" mapstructure:"heart_response"`
NodeManagerNum
int64
`json:"node_manager_num" mapstructure:"node_manager_num"`
ChainID
int64
`json:"chain_id" mapstructure:"chain_id"`
ApiUrl
string
`json:"api_url" mapstructure:"api_url"`
ValidatorUrl
string
`json:"validator_url" mapstructure:"validator_url"`
OssUrl
string
`json:"oss_url" mapstructure:"oss_url"`
WaitLastTaskExecTime
int64
`json:"wait_last_task_exec_time" mapstructure:"wait_last_task_exec_time"`
OpSys
string
`json:"op_sys" mapstructure:"op_sys"`
ReplicateImageNameSuffix
string
`json:"replicate_image_name_suffix" mapstructure:"replicate_image_name_suffix"`
IsStopLastContainer
bool
`json:"is_stop_last_container" mapstructure:"is_stop_last_container"`
}
var
_cfg
*
Config
=
nil
...
...
@@ -80,6 +80,21 @@ func (c *Config) SetOpSys(sys string) bool {
return
true
}
func
(
c
*
Config
)
UpdateFiledInfo
()
{
c
.
HeartRespTimeMillis
=
c
.
HeartRespTimeSecond
*
60
*
60
*
1000
prvKey
,
err
:=
utils
.
GetPrv
()
if
err
!=
nil
{
panic
(
"get prv error or delete keystore after restart"
)
}
c
.
SignPrivateKey
=
prvKey
ecdsaPub
:=
prvKey
.
PublicKey
c
.
SignPub
=
common
.
Bytes2Hex
(
crypto
.
FromECDSAPub
(
&
ecdsaPub
))
log
.
Info
(
"PublicKey"
,
c
.
SignPub
)
publicAddr
:=
crypto
.
PubkeyToAddress
(
ecdsaPub
)
log
.
Info
(
"publicAddr:"
,
publicAddr
)
c
.
SignPublicAddress
=
publicAddr
}
func
checkDockerServer
(
rawURL
string
)
(
bool
,
string
)
{
if
rawURL
==
""
{
return
true
,
fmt
.
Sprintf
(
"tcp://%s:%s"
,
"host.docker.internal"
,
"2375"
)
...
...
config.json
View file @
fbce8155
...
...
@@ -3,8 +3,7 @@
"api_url"
:
"https://dev.aigic.ai/admin/api/task/taskheat"
,
"node_manager_num"
:
1
,
"heart_response"
:
60
,
"task_validator_time"
:
1
,
"container_num"
:
1
,
"share_gpu_memory_usage"
:
80
,
"chain_id"
:
100
,
"validator_url"
:
"18.167.203.17:20011"
,
"oss_url"
:
"https://tmp-file.aigic.ai/api/v1/upload"
,
...
...
db/db.go
View file @
fbce8155
...
...
@@ -12,7 +12,7 @@ var err error
func
init
()
{
// 打开或创建一个LevelDB数据库
dbInstance
,
err
=
leveldb
.
OpenFile
(
"mydb"
,
nil
)
dbInstance
,
err
=
leveldb
.
OpenFile
(
"
data/
mydb"
,
nil
)
if
err
!=
nil
{
log
.
Error
(
"Leveldb open file failed: "
,
err
)
}
...
...
largeModel/model_handler.go
View file @
fbce8155
...
...
@@ -83,12 +83,13 @@ func (m *ModelHandler) MonitorModelInfo() {
}
}
else
{
log
.
WithField
(
"name"
,
modelInfo
.
ImageName
)
.
Info
(
"The image name is already"
)
m
.
dockerOp
.
BootUpModelId
[
modelInfo
.
ImageName
]
=
modelInfo
.
TaskId
reportTaskIds
=
append
(
reportTaskIds
,
modelInfo
.
TaskId
)
}
m
.
dockerOp
.
SignApi
[
modelInfo
.
ImageName
]
=
modelInfo
.
SignUrl
}
m
.
dockerOp
.
ModelsInfo
=
modelInfosResp
m
.
dockerOp
.
Report
Task
Ids
=
reportTaskIds
m
.
dockerOp
.
Report
Model
Ids
=
reportTaskIds
err
=
os
.
WriteFile
(
m
.
modelsFileName
,
bodyBytes
,
0644
)
if
err
!=
nil
{
log
.
WithError
(
err
)
.
Error
(
"Error writing models.json"
)
...
...
models/node_manager.go
View file @
fbce8155
...
...
@@ -75,6 +75,8 @@ type ModelInfo struct {
//OutPutJson string `json:"out_put_json"`
FileExpiresTime
string
`json:"file_expires_time"`
PublishStatus
int
`json:"publish_status"`
EstimatExeTime
int64
`json:"estimat_exe_time"`
StartUpTime
int64
`json:"start_up_time"`
}
type
HealthyCheck
struct
{
...
...
nm/monitor.go
View file @
fbce8155
...
...
@@ -57,8 +57,8 @@ func (m *MonitorNm) monitorNmClient() {
msgRespWorker
.
RegisterMsgResp
(
nodeManager
,
worker
,
DeviceInfoResp
,
nil
)
log
.
Info
(
"------------------------Send deviceInfo message ended------------------------"
)
if
len
(
m
.
DockerOp
.
Report
Task
Ids
)
==
0
{
params
:=
utils
.
BuildParams
(
m
.
DockerOp
.
Report
Task
Ids
)
if
len
(
m
.
DockerOp
.
Report
Model
Ids
)
==
0
{
params
:=
utils
.
BuildParams
(
m
.
DockerOp
.
Report
Model
Ids
)
msgRespWorker
.
RegisterMsgResp
(
nodeManager
,
worker
,
SubmitResourceMapRes
,
params
)
}
log
.
Info
(
"------------------------Send once-off message ended------------------------"
)
...
...
nm/msg_handler.go
View file @
fbce8155
...
...
@@ -37,7 +37,6 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
log
.
Warn
(
"handlerMsg -> node manager is not running"
)
return
}
heartbeatReq
:=
rev
.
GetHeartbeatRequest
()
if
heartbeatReq
!=
nil
{
n
.
nodeManager
.
UpdateLastHeartTime
(
time
.
Now
())
...
...
@@ -49,15 +48,10 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
taskMsg
:=
rev
.
GetPushTaskMessage
()
if
taskMsg
!=
nil
{
params
:=
utils
.
BuildParams
(
taskMsg
.
TaskId
)
n
.
msgRespWorker
.
RegisterMsgResp
(
n
.
nodeManager
,
n
.
worker
,
RespTaskAck
,
params
)
go
func
(
msgRespWorker
*
RespMsgWorker
,
taskMsgWorker
*
TaskWorker
,
taskMsg
*
nodeManagerV1
.
PushTaskMessage
)
{
if
!
taskMsgWorker
.
DockerOp
.
IsHealthy
{
//params := utils.BuildParams(taskMsgWorker.DockerOp.Reason)
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
}
go
func
(
msgRespWorker
*
RespMsgWorker
,
taskMsgWorker
*
TaskWorker
,
taskMsg
*
nodeManagerV1
.
PushTaskMessage
)
{
isCanExecute
,
bootUpTime
,
queueWaitTime
,
executeTime
:=
taskMsgWorker
.
GetAckResp
(
taskMsg
)
ackParams
:=
utils
.
BuildParams
(
taskMsg
.
TaskId
,
isCanExecute
,
bootUpTime
,
queueWaitTime
,
executeTime
)
msgRespWorker
.
RegisterMsgResp
(
n
.
nodeManager
,
n
.
worker
,
RespTaskAck
,
ackParams
)
taskMsgWorker
.
Wg
.
Add
(
1
)
taskMsgWorker
.
TaskMsg
<-
taskMsg
taskMsgWorker
.
Wg
.
Wait
()
...
...
@@ -88,15 +82,13 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
taskExecRes
.
TaskExecError
=
fmt
.
Sprintf
(
"worker:%s-%s-%s"
,
conf
.
GetConfig
()
.
SignPublicAddress
.
Hex
(),
"Task exec error"
,
taskExecRes
.
TaskExecError
)
}
reqHash
,
respHash
,
minerSign
:=
taskMsgWorker
.
GetMinerSign
(
taskMsg
,
taskExecRes
.
TaskRespBody
)
params
:=
utils
.
BuildParams
(
taskMsg
.
TaskId
,
containerSign
,
minerSign
,
taskExecRes
,
isSuccess
)
taskMsgWorker
.
Mutex
.
Lock
()
taskResultParams
:=
utils
.
BuildParams
(
taskMsg
.
TaskId
,
containerSign
,
minerSign
,
taskExecRes
,
isSuccess
)
taskMsgWorker
.
LruCache
.
Add
(
taskMsg
.
TaskId
+
models
.
TaskType
,
taskMsg
.
TaskType
)
taskMsgWorker
.
LruCache
.
Add
(
taskMsg
.
TaskId
+
models
.
ContainerSign
,
containerSign
)
taskMsgWorker
.
LruCache
.
Add
(
taskMsg
.
TaskId
+
models
.
MinerSign
,
minerSign
)
taskMsgWorker
.
LruCache
.
Add
(
taskMsg
.
TaskId
+
models
.
ReqHash
,
reqHash
)
taskMsgWorker
.
LruCache
.
Add
(
taskMsg
.
TaskId
+
models
.
RespHash
,
respHash
)
taskMsgWorker
.
Mutex
.
Unlock
()
msgRespWorker
.
RegisterMsgResp
(
n
.
nodeManager
,
n
.
worker
,
SubmitResultResp
,
params
)
msgRespWorker
.
RegisterMsgResp
(
n
.
nodeManager
,
n
.
worker
,
SubmitResultResp
,
taskResultParams
)
log
.
Info
(
"--------------taskMsg--------------:"
,
taskMsg
)
}(
n
.
msgRespWorker
,
n
.
taskMsgWorker
,
taskMsg
)
continue
...
...
@@ -184,8 +176,20 @@ func (n *NodeManagerHandler) ReportResourceMap(dockerOp *operate.DockerOp) {
for
{
select
{
case
<-
ticker
.
C
:
if
len
(
dockerOp
.
ReportTaskIds
)
>
0
{
params
:=
utils
.
BuildParams
(
dockerOp
.
ReportTaskIds
)
if
len
(
dockerOp
.
ReportModelIds
)
>
0
{
bootUpModelIds
:=
make
([]
uint64
,
0
)
containers
:=
dockerOp
.
ListContainer
()
if
containers
!=
nil
&&
len
(
containers
)
>
0
{
for
_
,
container
:=
range
containers
{
if
container
.
State
==
"running"
{
taskId
:=
dockerOp
.
BootUpModelId
[
container
.
Image
]
if
taskId
!=
0
{
bootUpModelIds
=
append
(
bootUpModelIds
,
taskId
)
}
}
}
}
params
:=
utils
.
BuildParams
(
dockerOp
.
ReportModelIds
,
bootUpModelIds
)
n
.
msgRespWorker
.
RegisterMsgResp
(
n
.
nodeManager
,
n
.
worker
,
SubmitResourceMapRes
,
params
)
ticker
=
time
.
NewTicker
(
time
.
Minute
*
10
)
}
...
...
nm/msg_resp.go
View file @
fbce8155
package
nm
import
(
"bytes"
"encoding/binary"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"fmt"
"github.com/docker/docker/libnetwork/bitmap"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
nodemanagerV1
"github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/shirou/gopsutil/cpu"
"strconv"
"time"
)
type
WorkerMsgHandler
func
(
params
...
interface
{})
*
nodemanagerV1
.
WorkerMessage
...
...
@@ -77,26 +82,45 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func
SubmitResourceMapRes
(
params
...
interface
{})
*
nodemanagerV1
.
WorkerMessage
{
log
.
Info
(
"Submit resource map response received params: "
,
params
)
taskIdIndexes
:=
params
[
0
]
.
([]
uint64
)
b
:=
bitmap
.
New
(
1000000000
)
for
i
:=
0
;
i
<
len
(
taskIdIndexes
);
i
++
{
taskIdIndex
:=
taskIdIndexes
[
i
]
err
:=
b
.
Set
(
taskIdIndex
)
existTaskIdIndexes
:=
params
[
0
]
.
([]
uint64
)
existMap
:=
bitmap
.
New
(
1000000000
)
for
i
:=
0
;
i
<
len
(
existTaskIdIndexes
);
i
++
{
taskIdIndex
:=
existTaskIdIndexes
[
i
]
err
:=
existMap
.
Set
(
taskIdIndex
)
if
err
!=
nil
{
log
.
WithField
(
"taskId index"
,
taskIdIndex
)
.
WithField
(
"error"
,
err
)
.
Error
(
"Error setting task id index"
)
return
nil
}
}
existImage
,
err
:=
existMap
.
MarshalBinary
()
if
err
!=
nil
{
log
.
Error
(
"bitmap marshal binary failed with error: "
,
err
)
return
nil
}
bootUpModelIdIndexes
:=
params
[
1
]
.
([]
uint64
)
bootUpMap
:=
bitmap
.
New
(
1000000000
)
for
i
:=
0
;
i
<
len
(
bootUpModelIdIndexes
);
i
++
{
taskIdIndex
:=
bootUpModelIdIndexes
[
i
]
err
:=
bootUpMap
.
Set
(
taskIdIndex
)
if
err
!=
nil
{
log
.
WithField
(
"taskId index"
,
taskIdIndex
)
.
WithField
(
"error"
,
err
)
.
Error
(
"Error setting task id index"
)
return
nil
}
}
b
inary
,
err
:=
b
.
MarshalBinary
()
b
ootUpImage
,
err
:=
bootUpMap
.
MarshalBinary
()
if
err
!=
nil
{
log
.
Error
(
"bitmap marshal binary failed with error: "
,
err
)
return
nil
}
log
.
WithField
(
""
,
binary
)
.
Info
(
"Bit map binary byte"
)
log
.
WithField
(
""
,
existImage
)
.
Info
(
"Bit map binary byte"
)
heartRes
:=
&
nodemanagerV1
.
WorkerMessage
{
Message
:
&
nodemanagerV1
.
WorkerMessage_ResourceMap
{
ResourceMap
:
&
nodemanagerV1
.
SubmitResourceMap
{
ResourceMap
:
binary
,
ResourceMap
:
existImage
,
BootupMap
:
bootUpImage
,
},
},
}
...
...
@@ -106,12 +130,24 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
func
RegisterInfoResp
(
params
...
interface
{})
*
nodemanagerV1
.
WorkerMessage
{
log
.
Info
(
"Register info response received params:"
,
params
)
nowTimeStamp
:=
time
.
Now
()
.
Unix
()
byteSlice
:=
make
([]
byte
,
8
)
binary
.
BigEndian
.
PutUint64
(
byteSlice
,
uint64
(
nowTimeStamp
))
signHash
:=
crypto
.
Keccak256Hash
(
bytes
.
NewBufferString
(
conf
.
GetConfig
()
.
GetExternalIp
())
.
Bytes
(),
bytes
.
NewBufferString
(
conf
.
GetConfig
()
.
SignPub
)
.
Bytes
(),
bytes
.
NewBufferString
(
conf
.
GetConfig
()
.
BenefitAddress
)
.
Bytes
(),
byteSlice
)
log
.
WithField
(
"hash"
,
signHash
.
String
())
.
Info
(
"register message sign result"
)
sign
,
_
:=
crypto
.
Sign
(
signHash
.
Bytes
(),
conf
.
GetConfig
()
.
SignPrivateKey
)
log
.
Info
(
"register message sign:"
,
common
.
Bytes2Hex
(
sign
))
nodeInfoRes
:=
&
nodemanagerV1
.
WorkerMessage
{
Message
:
&
nodemanagerV1
.
WorkerMessage_RegisteMessage
{
RegisteMessage
:
&
nodemanagerV1
.
RegisteMessage
{
DeviceIp
:
conf
.
GetConfig
()
.
GetExternalIp
(),
MinerPubkey
:
conf
.
GetConfig
()
.
SignPub
,
BenefitAddress
:
conf
.
GetConfig
()
.
BenefitAddress
,
Timestamp
:
nowTimeStamp
,
DeviceSignature
:
sign
,
},
},
}
...
...
@@ -252,10 +288,18 @@ func FetchStandardTaskResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func
RespTaskAck
(
params
...
interface
{})
*
nodemanagerV1
.
WorkerMessage
{
taskId
:=
params
[
0
]
.
(
string
)
canExecute
:=
params
[
1
]
.
(
bool
)
bootUpTime
:=
params
[
2
]
.
(
int64
)
queueWaitTime
:=
params
[
3
]
.
(
int64
)
executeTime
:=
params
[
4
]
.
(
int64
)
taskAckMsgRes
:=
&
nodemanagerV1
.
WorkerMessage
{
Message
:
&
nodemanagerV1
.
WorkerMessage_SubmitTaskAck
{
SubmitTaskAck
:
&
nodemanagerV1
.
SubmitTaskAck
{
TaskId
:
taskId
,
CanExecute
:
canExecute
,
BootUpTime
:
bootUpTime
,
QueueWaitTime
:
queueWaitTime
,
ExecuteTime
:
executeTime
,
},
},
}
...
...
nm/task_handler.go
View file @
fbce8155
...
...
@@ -27,16 +27,15 @@ import (
type
TaskWorker
struct
{
Wg
*
sync
.
WaitGroup
Mutex
*
sync
.
Mutex
LruCache
*
lru
.
Cache
DockerOp
*
operate
.
DockerOp
CmdOp
*
operate
.
Command
TaskMsg
chan
*
nodeManagerV1
.
PushTaskMessage
IsExecAiTask
bool
IsExecStandardTask
bool
ExecTaskIdIs
Success
*
sync
.
Map
oldTaskImageName
string
oldTaskId
string
ExecTaskIdIs
Finished
*
sync
.
Map
lastExecTaskId
,
lastExecTaskImageName
string
lastExecTaskStartTime
time
.
Time
}
type
TaskOp
struct
{
...
...
@@ -53,12 +52,11 @@ type TaskOp struct {
func
NewTaskWorker
(
op
*
operate
.
DockerOp
)
*
TaskWorker
{
return
&
TaskWorker
{
Wg
:
&
sync
.
WaitGroup
{},
Mutex
:
&
sync
.
Mutex
{},
LruCache
:
lru
.
New
(
100
),
DockerOp
:
op
,
TaskMsg
:
make
(
chan
*
nodeManagerV1
.
PushTaskMessage
,
0
),
IsExecAiTask
:
false
,
ExecTaskIdIs
Success
:
&
sync
.
Map
{},
ExecTaskIdIs
Finished
:
&
sync
.
Map
{},
}
}
...
...
@@ -126,6 +124,7 @@ func (t *TaskWorker) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
func
(
t
*
TaskWorker
)
ComputeTaskHandler
(
taskMsg
*
nodeManagerV1
.
PushTaskMessage
)
{
defer
t
.
Wg
.
Done
()
t
.
lastExecTaskStartTime
=
time
.
Now
()
t
.
checkLastTaskExecStatus
(
taskMsg
)
log
.
Info
(
"check last task exec status successful"
)
taskOp
:=
&
TaskOp
{
...
...
@@ -150,12 +149,12 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if
err
!=
nil
{
log
.
Errorf
(
"failed to unmarshal task cmd: %s"
,
err
.
Error
())
taskOp
.
taskExecResult
.
TaskExecError
=
fmt
.
Sprintf
(
"%s,%s"
,
"failed to unmarshal task cmd: %s"
,
err
.
Error
())
t
.
ExecTaskIdIs
Success
.
Store
(
taskMsg
.
TaskId
,
true
)
t
.
ExecTaskIdIs
Finished
.
Store
(
taskMsg
.
TaskId
,
true
)
return
}
taskOp
.
taskCmd
.
ImageName
=
fmt
.
Sprintf
(
"%s-%s"
,
taskOp
.
taskCmd
.
ImageName
,
conf
.
GetConfig
()
.
OpSys
)
log
.
Info
(
"received task cmd :"
,
taskOp
.
taskCmd
)
log
.
WithField
(
"t.
oldTaskImageName"
,
t
.
old
TaskImageName
)
.
WithField
(
"newTaskImageName"
,
taskOp
.
taskCmd
.
ImageName
)
.
Info
(
"task image info"
)
log
.
WithField
(
"t.
lastExecTaskImageName"
,
t
.
lastExec
TaskImageName
)
.
WithField
(
"newTaskImageName"
,
taskOp
.
taskCmd
.
ImageName
)
.
Info
(
"task image info"
)
if
taskMsg
.
TaskKind
!=
baseV1
.
TaskKind_StandardTask
&&
conf
.
GetConfig
()
.
IsStopLastContainer
{
t
.
checkIsStopContainer
(
taskOp
.
taskCmd
)
}
...
...
@@ -165,14 +164,14 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if
imageId
==
""
{
log
.
Error
(
"The image is not found:"
,
taskOp
.
taskCmd
.
ImageName
)
taskOp
.
taskExecResult
.
TaskExecError
=
fmt
.
Sprintf
(
"%s,%s"
,
"The image is not found:"
,
taskOp
.
taskCmd
.
ImageName
)
t
.
ExecTaskIdIs
Success
.
Store
(
taskMsg
.
TaskId
,
true
)
t
.
ExecTaskIdIs
Finished
.
Store
(
taskMsg
.
TaskId
,
true
)
return
}
err
=
json
.
Unmarshal
(
taskMsg
.
TaskParam
,
taskOp
.
taskParam
)
if
err
!=
nil
{
log
.
WithField
(
"err"
,
err
)
.
Error
(
"Error unmarshalling task parameter"
)
taskOp
.
taskExecResult
.
TaskExecError
=
fmt
.
Sprintf
(
"%s,%s"
,
"Error unmarshalling task parameter"
,
err
.
Error
())
t
.
ExecTaskIdIs
Success
.
Store
(
taskMsg
.
TaskId
,
true
)
t
.
ExecTaskIdIs
Finished
.
Store
(
taskMsg
.
TaskId
,
true
)
return
}
running
,
_
,
_
:=
t
.
foundImageIsRunning
(
imageId
)
...
...
@@ -183,19 +182,19 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if
err
!=
nil
{
log
.
Errorf
(
"Create and start container failed: %s"
,
err
.
Error
())
taskOp
.
taskExecResult
.
TaskExecError
=
fmt
.
Sprintf
(
"%s,%s"
,
"Create and start container failed"
,
err
.
Error
())
t
.
ExecTaskIdIs
Success
.
Store
(
taskMsg
.
TaskId
,
true
)
t
.
ExecTaskIdIs
Finished
.
Store
(
taskMsg
.
TaskId
,
true
)
return
}
log
.
Infof
(
"Started container with ID %s"
,
containerId
)
}
if
err
=
taskOp
.
waitContainerRunning
(
t
,
imageId
);
err
!=
nil
{
taskOp
.
taskExecResult
.
TaskExecError
=
fmt
.
Sprintf
(
"%s"
,
err
.
Error
())
t
.
ExecTaskIdIs
Success
.
Store
(
taskMsg
.
TaskId
,
true
)
t
.
ExecTaskIdIs
Finished
.
Store
(
taskMsg
.
TaskId
,
true
)
return
}
if
err
=
taskOp
.
waitReqContainerOk
(
t
.
DockerOp
);
err
!=
nil
{
taskOp
.
taskExecResult
.
TaskExecError
=
fmt
.
Sprintf
(
"%s"
,
err
.
Error
())
t
.
ExecTaskIdIs
Success
.
Store
(
taskMsg
.
TaskId
,
true
)
t
.
ExecTaskIdIs
Finished
.
Store
(
taskMsg
.
TaskId
,
true
)
return
}
endAfterTaskTime
:=
time
.
Since
(
taskOp
.
startBeforeTaskTime
)
...
...
@@ -206,11 +205,44 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
else
if
taskMsg
.
TaskKind
==
baseV1
.
TaskKind_StandardTask
{
t
.
IsExecStandardTask
=
false
}
t
.
ExecTaskIdIs
Success
.
Store
(
taskMsg
.
TaskId
,
true
)
t
.
ExecTaskIdIs
Finished
.
Store
(
taskMsg
.
TaskId
,
true
)
//log.WithField("result", taskExecResult).Info("lru cache storage task result")
log
.
Info
(
"----------------------Compute task exec done--------------------------------"
)
}
func
(
t
*
TaskWorker
)
GetAckResp
(
taskMsg
*
nodeManagerV1
.
PushTaskMessage
)
(
isCanExecute
bool
,
bootUpTime
,
queueWaitTime
,
executeTime
int64
)
{
if
t
.
IsExecStandardTask
{
isCanExecute
=
true
return
}
taskCmd
:=
&
models
.
TaskCmd
{}
err
:=
json
.
Unmarshal
(
bytes
.
NewBufferString
(
taskMsg
.
TaskCmd
)
.
Bytes
(),
taskCmd
)
if
err
!=
nil
{
log
.
Errorf
(
"failed to unmarshal task cmd: %s"
,
err
.
Error
())
return
}
value
,
ok
:=
t
.
ExecTaskIdIsFinished
.
Load
(
t
.
lastExecTaskId
)
if
!
ok
{
log
.
WithField
(
"task id"
,
t
.
lastExecTaskId
)
.
Warn
(
"task exec is not finished"
)
return
}
isSuccess
:=
value
.
(
bool
)
log
.
WithField
(
"isSuccess"
,
isSuccess
)
.
Info
(
"Task exec info"
)
if
!
isSuccess
&&
!
t
.
lastExecTaskStartTime
.
IsZero
()
{
lastTaskImageInfo
:=
t
.
DockerOp
.
GetImageInfo
(
t
.
lastExecTaskImageName
)
since
:=
time
.
Since
(
t
.
lastExecTaskStartTime
)
queueWaitTime
=
lastTaskImageInfo
.
EstimatExeTime
-
int64
(
since
.
Seconds
())
if
queueWaitTime
<
0
{
queueWaitTime
=
lastTaskImageInfo
.
EstimatExeTime
}
}
isCanExecute
=
true
modelInfo
:=
t
.
DockerOp
.
GetImageInfo
(
taskCmd
.
ImageName
)
bootUpTime
=
modelInfo
.
StartUpTime
executeTime
=
modelInfo
.
EstimatExeTime
return
}
func
(
t
*
TaskWorker
)
foundTaskImage
(
taskCmd
*
models
.
TaskCmd
)
(
imageId
string
)
{
images
,
err
:=
t
.
DockerOp
.
PsImages
()
if
err
!=
nil
{
...
...
@@ -267,29 +299,29 @@ func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMess
}
else
if
taskMsg
.
TaskKind
==
baseV1
.
TaskKind_StandardTask
{
t
.
IsExecStandardTask
=
true
}
if
t
.
old
TaskId
!=
taskMsg
.
TaskId
{
if
t
.
lastExec
TaskId
!=
taskMsg
.
TaskId
{
now
:=
time
.
Now
()
for
{
since
:=
time
.
Since
(
now
)
if
int64
(
since
.
Seconds
())
>
conf
.
GetConfig
()
.
WaitLastTaskExecTime
{
log
.
WithField
(
"taskId"
,
t
.
old
TaskId
)
.
Info
(
"Waiting for last task execution ending"
)
t
.
old
TaskId
=
taskMsg
.
TaskId
log
.
WithField
(
"taskId"
,
t
.
lastExec
TaskId
)
.
Info
(
"Waiting for last task execution ending"
)
t
.
lastExec
TaskId
=
taskMsg
.
TaskId
break
}
if
t
.
old
TaskId
==
""
{
t
.
old
TaskId
=
taskMsg
.
TaskId
if
t
.
lastExec
TaskId
==
""
{
t
.
lastExec
TaskId
=
taskMsg
.
TaskId
break
}
value
,
ok
:=
t
.
ExecTaskIdIs
Success
.
Load
(
t
.
old
TaskId
)
value
,
ok
:=
t
.
ExecTaskIdIs
Finished
.
Load
(
t
.
lastExec
TaskId
)
//log.WithField("isSuccess", value).Info("Task id exec info")
if
!
ok
{
//log.WithField("task id", t.
old
TaskId).Warn("task exec is not finished")
//log.WithField("task id", t.
lastExec
TaskId).Warn("task exec is not finished")
continue
}
isSuccess
:=
value
.
(
bool
)
if
isSuccess
{
t
.
old
TaskId
=
taskMsg
.
TaskId
log
.
WithField
(
"taskId"
,
t
.
old
TaskId
)
.
Info
(
"Task exec success"
)
t
.
lastExec
TaskId
=
taskMsg
.
TaskId
log
.
WithField
(
"taskId"
,
t
.
lastExec
TaskId
)
.
Info
(
"Task exec success"
)
break
}
}
...
...
@@ -297,7 +329,7 @@ func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMess
}
func
(
t
*
TaskWorker
)
checkIsStopContainer
(
taskCmd
*
models
.
TaskCmd
)
{
if
t
.
oldTaskImageName
!=
""
&&
t
.
old
TaskImageName
!=
taskCmd
.
ImageName
{
if
t
.
lastExecTaskImageName
!=
""
&&
t
.
lastExec
TaskImageName
!=
taskCmd
.
ImageName
{
//todo: 停止标准任务容器
containers
:=
t
.
DockerOp
.
ListContainer
()
for
_
,
container
:=
range
containers
{
...
...
@@ -305,17 +337,17 @@ func (t *TaskWorker) checkIsStopContainer(taskCmd *models.TaskCmd) {
if
len
(
split
)
==
1
{
container
.
Image
=
fmt
.
Sprintf
(
"%s:%s"
,
container
.
Image
,
"latest"
)
}
log
.
WithField
(
"containerImageName"
,
container
.
Image
)
.
WithField
(
"t.
oldTaskImageName"
,
t
.
old
TaskImageName
)
.
Info
(
"match image"
)
if
container
.
Image
==
t
.
old
TaskImageName
&&
container
.
State
==
"running"
{
log
.
WithField
(
"containerImageName"
,
container
.
Image
)
.
WithField
(
"t.
lastExecTaskImageName"
,
t
.
lastExec
TaskImageName
)
.
Info
(
"match image"
)
if
container
.
Image
==
t
.
lastExec
TaskImageName
&&
container
.
State
==
"running"
{
t
.
DockerOp
.
StopContainer
(
container
.
ID
)
log
.
WithField
(
"Image name"
,
container
.
Image
)
.
Info
(
"Stopping container"
)
//t.DockerOp.RunningImages[t.
old
TaskImageName] = false
//t.DockerOp.RunningImages[t.
lastExec
TaskImageName] = false
break
}
}
t
.
old
TaskImageName
=
taskCmd
.
ImageName
t
.
lastExec
TaskImageName
=
taskCmd
.
ImageName
}
else
{
t
.
old
TaskImageName
=
taskCmd
.
ImageName
t
.
lastExec
TaskImageName
=
taskCmd
.
ImageName
}
}
...
...
operate/docker.go
View file @
fbce8155
...
...
@@ -29,7 +29,8 @@ type DockerOp struct {
UsedExternalPort
map
[
int64
]
bool
SignApi
map
[
string
]
string
ModelsInfo
[]
*
models
.
ModelInfo
ReportTaskIds
[]
uint64
ReportModelIds
[]
uint64
BootUpModelId
map
[
string
]
uint64
//RunningImages map[string]bool
}
...
...
@@ -52,11 +53,21 @@ func NewDockerOp() *DockerOp {
SignApi
:
make
(
map
[
string
]
string
,
0
),
ModelsInfo
:
make
([]
*
models
.
ModelInfo
,
1000
),
UsedExternalPort
:
make
(
map
[
int64
]
bool
,
0
),
ReportTaskIds
:
make
([]
uint64
,
0
),
ReportModelIds
:
make
([]
uint64
,
0
),
BootUpModelId
:
make
(
map
[
string
]
uint64
,
0
),
//RunningImages: make(map[string]bool, 0),
}
}
func
(
d
*
DockerOp
)
GetImageInfo
(
imageName
string
)
*
models
.
ModelInfo
{
for
_
,
info
:=
range
d
.
ModelsInfo
{
if
info
.
ImageName
==
imageName
{
return
info
}
}
return
nil
}
func
(
d
*
DockerOp
)
GetContainerSign
(
taskMsg
*
nodemanagerv1
.
PushTaskMessage
,
taskRes
[]
byte
)
[]
byte
{
reqBody
:=
&
models
.
TaskReq
{
TaskId
:
taskMsg
.
TaskId
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment