Commit dc4a4687 authored by duanjinfei's avatar duanjinfei

update log print

parent 74d8203d
......@@ -2,7 +2,6 @@ package db
import (
"example.com/m/log"
"fmt"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
......@@ -62,8 +61,7 @@ func Get(key string) ([]byte, error) {
log.Error("Leveldb get data failed:", err)
return nil, err
}
fmt.Printf("Value: %s\n", data)
log.WithField("key", key).WithField("value", data).Info("leveldb data")
return data, nil
}
......
......@@ -41,6 +41,11 @@ type ComputeResult struct {
Data string `json:"data"`
}
type BenefitAddressInfo struct {
Address string `json:"address"`
Timestamp int64 `json:"timestamp"`
}
type NodeManagerClient struct {
mutex sync.Mutex
LastHeartTime int64
......
......@@ -74,7 +74,7 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo)
}
ticker = time.NewTicker(time.Minute * 65)
ticker = time.NewTicker(time.Minute * 10)
}
}
}
......@@ -86,6 +86,7 @@ func reportModelInfo(nodeManager *models.NodeManagerClient,
select {
case taskId := <-dockerOp.ModelTaskIdChan:
if !nodeManager.Status {
log.WithField("endpoint", nodeManager.Endpoint).Error("Node manager is down , stop report model info")
return
}
params := buildParams(taskId)
......
......@@ -35,7 +35,7 @@ func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, w
handler: handler,
params: params,
}
log.Info("---------------------------------------Send register msg ------------------------------------")
log.Info("----------------register msg -------------")
}
func (o *RespMsgWorker) SendMsg() {
......@@ -46,7 +46,7 @@ func (o *RespMsgWorker) SendMsg() {
workerMsg := pool.handler(pool.params...)
err := pool.workerClient.SendMsg(workerMsg)
if err != nil {
log.Error("Send heartbeat msg error:", err)
log.Error("Send msg to nm client failed:", err)
return
}
log.Info("Worker client send message successfully")
......
......@@ -51,17 +51,20 @@ func StartMonitor() {
for i := 0; i < len(nodeManagerArr); i++ {
// TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接
if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum {
log.Warn("Nothing available node manager..................................")
break
}
randomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, int64(len(nodeManagerArr)))
manager := nodeManagerArr[randomNum.Int64()]
if !manager.IsExist {
log.WithField("endpoint", manager.Info.Endpoint).Warn("node manager is not exist")
continue
}
if !manager.IsUsed {
isSuccess := inputNodeManagerChan(manager, nil)
if !isSuccess {
panic("Init input node manager chan failed")
log.Warn("Init input node manager chan failed")
continue
}
connectNodeManagerCount++
}
......@@ -73,7 +76,7 @@ func StartMonitor() {
case <-ticker.C:
log.Info("Monitoring node manager client thread start......")
for _, managerClient := range usedNodeManagerClient {
if !managerClient.GetStatus() {
if !managerClient.GetStatus() && !managerClient.IsDel {
log.Warn("The Node manager client is failed:", managerClient.Endpoint)
manager := getNodeManager(managerClient.Endpoint)
if manager == nil {
......@@ -100,6 +103,7 @@ func StartMonitor() {
unUsedManager := unUsedNodeManagers[randomNum.Int64()]
isSuccess := inputNodeManagerChan(unUsedManager, nil)
if !isSuccess {
log.Warn("Connect unused node manager client error:", manager.Info.Endpoint)
break
}
}
......@@ -135,6 +139,7 @@ func monitorWorker(op *operate.DockerOp) {
// 上报image信息
go reportModelInfo(nodeManager, worker, msgRespWorker, op)
log.Info("Report model info started")
// 证明存储
go proofWorker.ProofStorage()
......@@ -145,6 +150,7 @@ func monitorWorker(op *operate.DockerOp) {
log.Info("Proof commit worker started")
go handlerStandardTask(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Handler standard task worker started")
// 处理消息
for i := 0; i < 5; i++ {
......@@ -265,27 +271,27 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
continue
}
nmSignMsg := rev.GetProofTaskResult()
if nmSignMsg != nil {
containerSign, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.ContainerSign)
nmResultMsg := rev.GetProofTaskResult()
if nmResultMsg != nil {
containerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ContainerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ContainerSign)
//}
minerSign, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.MinerSign)
minerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.MinerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.MinerSign)
//}
reqHash, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.ReqHash)
reqHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ReqHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ReqHash)
//}
respHash, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.RespHash)
respHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.RespHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.RespHash)
//}
taskType, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.TaskType)
proofWorker.ProductProof(nmSignMsg.TaskId, nmSignMsg.Workload, taskType.(uint64), reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte), nmSignMsg.ManagerSignature)
log.Info(nmSignMsg)
taskType, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.TaskType)
proofWorker.ProductProof(nmResultMsg, taskType.(uint64), reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte))
log.Info(nmResultMsg)
continue
}
......
......@@ -92,12 +92,10 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.TaskRespBody[taskMsg.TaskId] = nil
t.TaskRespHeader[taskMsg.TaskId] = nil
t.TaskExecTime[taskMsg.TaskId] = 0
t.TaskIsSuccess[taskMsg.TaskId] = false
reader := bytes.NewReader(taskMsg.TaskParam)
taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
......@@ -193,9 +191,10 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
startBeforeTaskTime := time.Now()
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reader)
reqContainerBody := bytes.NewReader(taskMsg.TaskParam)
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reqContainerBody)
if err != nil {
log.Error("Http client post error: ", err)
log.WithField("error:", err).Error("Http client post request container failed")
return
}
endAfterTaskTime := time.Since(startBeforeTaskTime)
......@@ -224,23 +223,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("received computeTask--------------------------------")
}
func (t *TaskHandler) StandardTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.TaskRespBody[taskMsg.TaskId] = nil
t.TaskRespHeader[taskMsg.TaskId] = nil
t.TaskExecTime[taskMsg.TaskId] = 0
t.TaskIsSuccess[taskMsg.TaskId] = false
//todo: 执行标准任务
t.IsExecStandardTask = false
log.Info("received customTask--------------------------------")
}
func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
if err != nil {
log.Error("custome task handler docker op ps images failed: ", err)
log.Error("custom task handler docker op ps images failed: ", err)
return
}
log.Info("received customTask--------------------------------")
......
......@@ -29,7 +29,7 @@ func ConnValidatorGrpc(endpoint string) witnessv1.WitnessServiceClient {
func GetDockerClient() (*client.Client, error) {
dockerClient, err := client.NewClientWithOpts(client.WithAPIVersionNegotiation(), client.WithHost(conf.GetConfig().DockerServer))
if err != nil {
log.Error("Error create docker client: ", err)
log.WithField("error", err).Error("create docker client failed")
return nil, err
}
return dockerClient, nil
......@@ -42,7 +42,7 @@ func connGrpc(endpoint string) *grpc.ClientConn {
grpc.MaxCallSendMsgSize(1024*1024*1024)),
)
if err != nil {
log.Error("Dial error:", err)
log.WithField("error:", err).Error("Connect node manager failed")
return nil
}
return dial
......
......@@ -74,7 +74,6 @@ func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, task
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
return nil
}
// TODO: 请求容器API
request, err := http.NewRequest("POST", d.SignApi[taskCmd.ImageName], bytes.NewReader(body))
if err != nil {
log.Error("New http request failed: ", err)
......
......@@ -19,7 +19,6 @@ func TestGenerateRandomNumber(t *testing.T) {
args args
want *big.Int
}{
// TODO: Add test cases.
{
"randomPrvTest",
args{
......
......@@ -13,7 +13,7 @@ import (
"os"
)
const KEYPATH_PWD = "keystore"
const KeypadPwd = "keystore"
func GenerateRandomNumber(privateKey *ecdsa.PrivateKey, length int64) *big.Int {
// 生成一个随机数
......@@ -27,7 +27,7 @@ func GenerateRandomNumber(privateKey *ecdsa.PrivateKey, length int64) *big.Int {
}
func GetPrv() (*ecdsa.PrivateKey, error) {
if _, err := os.Stat(KEYPATH_PWD); os.IsNotExist(err) {
if _, err := os.Stat(KeypadPwd); os.IsNotExist(err) {
//log.Info("Keystore not found. Generating a new one...")
// 生成私钥
privateKey, err := generatePrivateKey()
......@@ -61,8 +61,8 @@ func generatePrivateKey() (*ecdsa.PrivateKey, error) {
}
func savePrivateKey(privateKey *ecdsa.PrivateKey) error {
ks := keystore.NewKeyStore(KEYPATH_PWD, keystore.StandardScryptN, keystore.StandardScryptP)
account, err := ks.ImportECDSA(privateKey, KEYPATH_PWD)
ks := keystore.NewKeyStore(KeypadPwd, keystore.StandardScryptN, keystore.StandardScryptP)
account, err := ks.ImportECDSA(privateKey, KeypadPwd)
if err != nil {
log.Fatal(err)
}
......@@ -71,18 +71,18 @@ func savePrivateKey(privateKey *ecdsa.PrivateKey) error {
}
func readPrivateKey() (*ecdsa.PrivateKey, error) {
file, err := ioutil.ReadDir(KEYPATH_PWD)
file, err := ioutil.ReadDir(KeypadPwd)
if err != nil {
return nil, err
}
for _, info := range file {
keystoreFile := fmt.Sprintf("%s%s%s", KEYPATH_PWD, "/", info.Name())
keystoreFile := fmt.Sprintf("%s%s%s", KeypadPwd, "/", info.Name())
jsonBytes, err := ioutil.ReadFile(keystoreFile)
if err != nil {
log.Error("import ecdsa keystore error: ", err)
continue
}
key, err := keystore.DecryptKey(jsonBytes, KEYPATH_PWD)
key, err := keystore.DecryptKey(jsonBytes, KeypadPwd)
if err != nil {
log.Error("keystore decrypt key failed:", err)
continue
......
......@@ -7,6 +7,7 @@ import (
"example.com/m/db"
"example.com/m/log"
"example.com/m/operate"
nodemanagerv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
witnessV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/witness/v1"
"google.golang.org/grpc"
"math/rand"
......@@ -27,17 +28,18 @@ func NewProofWorker() *ProofWorker {
}
}
func (p *ProofWorker) ProductProof(taskId string, workLoad, taskType uint64, reqHash []byte, respHash []byte, containerSign, minerSign, nmSign []byte) {
log.Info("ProductProof received workLoad:", workLoad)
func (p *ProofWorker) ProductProof(nmResultMsg *nodemanagerv1.ProofTaskResult, taskType uint64, reqHash []byte, respHash []byte, containerSign, minerSign []byte) {
log.Info("ProductProof received workLoad:", nmResultMsg.Workload)
p.productProofChan <- &witnessV1.Proof{
Workload: workLoad,
TaskId: taskId,
TaskType: taskType,
Workload: nmResultMsg.Workload,
TaskId: nmResultMsg.TaskId,
ReqHash: reqHash,
RespHash: respHash,
ManagerSignature: nmResultMsg.ManagerSignature,
ContainerSignature: containerSign,
MinerSignature: minerSign,
ManagerSignature: nmSign,
TaskType: taskType,
Timestamp: nmResultMsg.Timestamp,
}
}
......@@ -68,10 +70,14 @@ func (p *ProofWorker) ProofStorage() {
for {
select {
case <-timer.C:
min := time.Now().Minute()
nowTime := time.Now()
min := nowTime.Minute()
if min == 0 {
randomMinute = getRandInt()
}
if nowTime.Hour() == 23 {
randomMinute = 59
}
// 检查是否在指定时间范围内(40-59分钟)
if min >= 40 && min <= 59 && min == randomMinute {
proofs := make([]*witnessV1.Proof, 0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment