Commit 52cd0159 authored by vicotor's avatar vicotor

update nodemanager

parent 802ac706
...@@ -2,7 +2,11 @@ package main ...@@ -2,7 +2,11 @@ package main
import ( import (
"context" "context"
"crypto/ecdsa"
"crypto/rand"
"flag" "flag"
"github.com/ethereum/go-ethereum/crypto"
"github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
...@@ -52,6 +56,7 @@ func main() { ...@@ -52,6 +56,7 @@ func main() {
log.WithField("endpoint", endpoint).Fatal("register worker failed", "err", err) log.WithField("endpoint", endpoint).Fatal("register worker failed", "err", err)
return return
} }
sk, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
for { for {
data, err := managerClient.Recv() data, err := managerClient.Recv()
...@@ -84,6 +89,7 @@ func main() { ...@@ -84,6 +89,7 @@ func main() {
msg := &omanager.WorkerMessage{ msg := &omanager.WorkerMessage{
Message: &omanager.WorkerMessage_DeviceInfo{ Message: &omanager.WorkerMessage_DeviceInfo{
DeviceInfo: &omanager.DeviceInfoResponse{ DeviceInfo: &omanager.DeviceInfoResponse{
MinerPubkey: utils.PubkeyToHex(&sk.PublicKey),
Devices: []*omanager.DeviceInfo{deviceInfo}, Devices: []*omanager.DeviceInfo{deviceInfo},
}, },
}, },
......
...@@ -43,7 +43,7 @@ services: ...@@ -43,7 +43,7 @@ services:
KAFKA_NUM_PARTITIONS: 3 KAFKA_NUM_PARTITIONS: 3
KAFKA_DELETE_RETENTION_MS: 1000 KAFKA_DELETE_RETENTION_MS: 1000
KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.108:9092 ## 这里填写服务器的公网ip KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 ## 这里填写服务器的公网ip
KAFKA_BROKER_ID: 1 KAFKA_BROKER_ID: 1
kafka-ui: kafka-ui:
container_name: kafka-ui container_name: kafka-ui
......
package model package model
import (
"fmt"
"github.com/astaxie/beego/orm"
"github.com/odysseus/nodemanager/config"
log "github.com/sirupsen/logrus"
)
func DbInit() { func DbInit() {
if config.GetConfig().EnablePay == false { //if config.GetConfig().EnablePay == false {
return // return
} //}
// Set up database //// Set up database
dbconf := config.GetConfig().DbConfig //dbconf := config.GetConfig().DbConfig
datasource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", dbconf.User, dbconf.Passwd, dbconf.Host, dbconf.Port, dbconf.DbName) //datasource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", dbconf.User, dbconf.Passwd, dbconf.Host, dbconf.Port, dbconf.DbName)
orm.RegisterDriver("mysql", orm.DRMySQL) //orm.RegisterDriver("mysql", orm.DRMySQL)
err := orm.RegisterDataBase("default", "mysql", datasource) //err := orm.RegisterDataBase("default", "mysql", datasource)
if err != nil { //if err != nil {
log.WithError(err).Fatal("failed to connect to database") // log.WithError(err).Fatal("failed to connect to database")
} //}
orm.RegisterModel(new(User)) //orm.RegisterModel(new(User))
} }
package model package model
import ( import (
"github.com/astaxie/beego/orm"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
) )
...@@ -28,42 +27,43 @@ func (User) TableName() string { ...@@ -28,42 +27,43 @@ func (User) TableName() string {
return "user" return "user"
} }
type UserRepository interface { //
Create(user *User) error //type UserRepository interface {
GetByUserId(uid int64) (*User, error) // Create(user *User) error
Update(user *User) error // GetByUserId(uid int64) (*User, error)
Delete(user *User) error // Update(user *User) error
} // Delete(user *User) error
//}
type UserRepositoryImpl struct { //
o orm.Ormer //type UserRepositoryImpl struct {
} // o orm.Ormer
//}
func NewUserRepository() UserRepository { //
o := orm.NewOrm() //func NewUserRepository() UserRepository {
return &UserRepositoryImpl{o} // o := orm.NewOrm()
} // return &UserRepositoryImpl{o}
//}
func (repo *UserRepositoryImpl) Create(user *User) error { //
_, err := repo.o.Insert(user) //func (repo *UserRepositoryImpl) Create(user *User) error {
return err // _, err := repo.o.Insert(user)
} // return err
//}
func (repo *UserRepositoryImpl) GetByUserId(uid int64) (*User, error) { //
user := User{ID: uid} //func (repo *UserRepositoryImpl) GetByUserId(uid int64) (*User, error) {
err := repo.o.Read(&user) // user := User{ID: uid}
if err != nil { // err := repo.o.Read(&user)
return nil, err // if err != nil {
} // return nil, err
return &user, nil // }
} // return &user, nil
//}
func (repo *UserRepositoryImpl) Update(user *User) error { //
_, err := repo.o.Update(user) //func (repo *UserRepositoryImpl) Update(user *User) error {
return err // _, err := repo.o.Update(user)
} // return err
//}
func (repo *UserRepositoryImpl) Delete(user *User) error { //
_, err := repo.o.Delete(user) //func (repo *UserRepositoryImpl) Delete(user *User) error {
return err // _, err := repo.o.Delete(user)
} // return err
//}
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/odysseus/nodemanager/config" "github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/model"
"github.com/odysseus/nodemanager/utils" "github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
...@@ -14,7 +13,6 @@ import ( ...@@ -14,7 +13,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
"math/big" "math/big"
"strconv"
"sync" "sync"
"time" "time"
) )
...@@ -53,8 +51,6 @@ type WorkerManager struct { ...@@ -53,8 +51,6 @@ type WorkerManager struct {
wkRwLock sync.RWMutex wkRwLock sync.RWMutex
quit chan struct{} quit chan struct{}
userRepo model.UserRepository
node *Node node *Node
} }
...@@ -66,7 +62,6 @@ func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager { ...@@ -66,7 +62,6 @@ func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager {
quit: make(chan struct{}), quit: make(chan struct{}),
rdb: rdb, rdb: rdb,
node: node, node: node,
userRepo: model.NewUserRepository(),
} }
} }
...@@ -404,40 +399,40 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -404,40 +399,40 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
func (wm *WorkerManager) Payment(task *odysseus.TaskContent) error { func (wm *WorkerManager) Payment(task *odysseus.TaskContent) error {
if config.GetConfig().EnablePay == true { if config.GetConfig().EnablePay == true {
// pay for task. // pay for task.
uid, err := strconv.ParseInt(task.TaskUid, 10, 64) //uid, err := strconv.ParseInt(task.TaskUid, 10, 64)
if err != nil { //if err != nil {
log.WithFields(log.Fields{ // log.WithFields(log.Fields{
"taskid": task.TaskId, // "taskid": task.TaskId,
"uid": task.TaskUid, // "uid": task.TaskUid,
"error": err, // "error": err,
}).Error("parse task uid failed") // }).Error("parse task uid failed")
} //}
user, err := wm.userRepo.GetByUserId(uid) //user, err := wm.userRepo.GetByUserId(uid)
if err != nil { //if err != nil {
log.WithFields(log.Fields{ // log.WithFields(log.Fields{
"taskid": task.TaskId, // "taskid": task.TaskId,
"uid": task.TaskUid, // "uid": task.TaskUid,
"error": err, // "error": err,
}).Error("get user by uid failed") // }).Error("get user by uid failed")
} //}
if user != nil { //if user != nil {
fee, _ := strconv.ParseInt(task.TaskFee, 10, 64) // fee, _ := strconv.ParseInt(task.TaskFee, 10, 64)
user.Balance = user.Balance - fee // user.Balance = user.Balance - fee
err = wm.userRepo.Update(user) // err = wm.userRepo.Update(user)
if err != nil { // if err != nil {
log.WithFields(log.Fields{ // log.WithFields(log.Fields{
"taskid": task.TaskId, // "taskid": task.TaskId,
"uid": task.TaskUid, // "uid": task.TaskUid,
"error": err, // "error": err,
}).Error("update user failed") // }).Error("update user failed")
} else { // } else {
log.WithFields(log.Fields{ // log.WithFields(log.Fields{
"taskid": task.TaskId, // "taskid": task.TaskId,
"uid": task.TaskUid, // "uid": task.TaskUid,
"fee": fee, // "fee": fee,
}).Info("pay task fee finished") // }).Info("pay task fee finished")
} // }
} //}
} }
return nil return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment