Commit 6ea89bcb authored by vicotor's avatar vicotor

implement node manager

parent d92a29ca
...@@ -5,6 +5,7 @@ go 1.18 ...@@ -5,6 +5,7 @@ go 1.18
require ( require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/ethereum/go-ethereum v1.13.10 github.com/ethereum/go-ethereum v1.13.10
github.com/hashicorp/golang-lru v0.5.4
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000 github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
......
...@@ -31,6 +31,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg ...@@ -31,6 +31,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU= github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
......
...@@ -15,6 +15,7 @@ type Node struct { ...@@ -15,6 +15,7 @@ type Node struct {
registry *nmregistry.RegistryService registry *nmregistry.RegistryService
apiServer *grpc.Server apiServer *grpc.Server
rdb *redis.Client rdb *redis.Client
wm *WorkerManager
} }
func NewNode() *Node { func NewNode() *Node {
...@@ -26,6 +27,7 @@ func NewNode() *Node { ...@@ -26,6 +27,7 @@ func NewNode() *Node {
}) })
node := &Node{ node := &Node{
rdb: rdb, rdb: rdb,
wm: NewWorkerManager(rdb),
apiServer: grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)), apiServer: grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)),
registry: nmregistry.NewRegistryService(config.GetConfig(), rdb), registry: nmregistry.NewRegistryService(config.GetConfig(), rdb),
} }
......
...@@ -3,7 +3,9 @@ package server ...@@ -3,7 +3,9 @@ package server
import ( import (
"context" "context"
"errors" "errors"
"github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"strconv"
) )
var ( var (
...@@ -30,26 +32,38 @@ func (n *NodeManagerService) ManagerList(ctx context.Context, request *omanager. ...@@ -30,26 +32,38 @@ func (n *NodeManagerService) ManagerList(ctx context.Context, request *omanager.
return res, nil return res, nil
} }
func (n *NodeManagerService) RegisterWorker(worker omanager.NodeManagerService_RegisterWorkerServer) error { func (n *NodeManagerService) RegisterWorker(client omanager.NodeManagerService_RegisterWorkerServer) error {
//workerHandler := func() { uuid := utils.GetSnowflakeId()
// for { worker, err := n.node.wm.AddNewWorker(uuid, client)
// select { if err != nil {
// case <-n.quit: return err
// return }
// default: return n.node.wm.manageWorker(worker)
// msg, err := worker.Recv()
// if err != nil {
// log.WithError(err).WithField("worker", "wwww").Error("recv msg failed")
// return
// }
// }
// }
//}
return nil
} }
func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager.DispatchTaskRequest) (*omanager.DispatchTaskResponse, error) { func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager.DispatchTaskRequest) (*omanager.DispatchTaskResponse, error) {
//TODO implement me if request == nil {
panic("implement me") return nil, errors.New("invalid request")
}
uuid, err := strconv.ParseInt(request.Miner, 10, 64)
if err != nil {
return nil, errors.New("not found worker")
}
worker := n.node.wm.GetWorker(uuid)
dtask := &dispatchTask{
task: request.TaskData,
errCh: make(chan error, 1),
}
worker.taskCh <- dtask
// wait task send to worker.
if err, _ := <-dtask.errCh; err != nil {
return nil, err
}
res := new(omanager.DispatchTaskResponse)
res.TaskId = request.TaskData.TaskId
res.Miner = request.Miner
return res, nil
} }
package server
import (
"errors"
"fmt"
lru "github.com/hashicorp/golang-lru"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
var (
ErrWorkerExist = errors.New("worker exist")
)
type dispatchTask struct {
task *odysseus.TaskContent
errCh chan error
}
type Worker struct {
quit chan interface{}
taskCh chan *dispatchTask
resultCh chan *omanager.SubmitTaskResult
uuid int64
publicKey string
recentTask *lru.Cache
stream omanager.NodeManagerService_RegisterWorkerServer
}
type WorkerManager struct {
rdb *redis.Client
heartBeat map[int64]int64
hbRwLock sync.RWMutex
workers map[int64]*Worker
wkRwLock sync.RWMutex
quit chan struct{}
}
func NewWorkerManager(rdb *redis.Client) *WorkerManager {
return &WorkerManager{
heartBeat: make(map[int64]int64),
workers: make(map[int64]*Worker),
quit: make(chan struct{}),
rdb: rdb,
}
}
func (wm *WorkerManager) UpdateHeartBeat(uuid int64) {
wm.hbRwLock.Lock()
defer wm.hbRwLock.Unlock()
wm.heartBeat[uuid] = time.Now().Unix()
}
func (wm *WorkerManager) UpdateStatus(worker *Worker) {
}
func (wm *WorkerManager) GetHeartBeat(uuid int64) int64 {
wm.hbRwLock.RLock()
defer wm.hbRwLock.RUnlock()
return wm.heartBeat[uuid]
}
func (wm *WorkerManager) GetWorker(uuid int64) *Worker {
wm.wkRwLock.RLock()
defer wm.wkRwLock.RUnlock()
return wm.workers[uuid]
}
func (wm *WorkerManager) AddNewWorker(uuid int64, worker omanager.NodeManagerService_RegisterWorkerServer) (*Worker, error) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
if _, exist := wm.workers[uuid]; exist {
return nil, ErrWorkerExist
}
w := &Worker{
taskCh: make(chan *dispatchTask),
resultCh: make(chan *omanager.SubmitTaskResult),
uuid: uuid,
stream: worker,
}
taskCache, err := lru.New(100)
if err != nil {
return nil, err
}
w.recentTask = taskCache
wm.workers[uuid] = w
go wm.handleWorkerMsg(w)
return w, nil
}
type Callback func(err error) bool
func (wm *WorkerManager) manageWorker(worker *Worker) error {
heartBeatDuration := time.Second * 10
workerCheckDuration := heartBeatDuration * 3
heartBeatTicker := time.NewTicker(heartBeatDuration)
defer heartBeatTicker.Stop()
workerCheckTicker := time.NewTicker(workerCheckDuration)
defer workerCheckTicker.Stop()
statusTicker := time.NewTicker(time.Second * 10)
defer statusTicker.Stop()
deviceInfoTicker := time.NewTicker(time.Second * 10)
defer deviceInfoTicker.Stop()
deviceUsageTicker := time.NewTicker(time.Second * 10)
defer deviceUsageTicker.Stop()
for {
var msg = new(omanager.ManagerMessage)
var callback = Callback(func(err error) bool {
// do nothing
return true
})
select {
case <-wm.quit:
gb := new(omanager.ManagerMessage_GoodbyeMessage)
gb.GoodbyeMessage = &omanager.GoodbyeMessage{}
msg.Message = gb
case <-workerCheckTicker.C:
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) {
// remove worker
close(worker.quit)
}
case <-heartBeatTicker.C:
hb := new(omanager.ManagerMessage_HeartbeatRequest)
hb.HeartbeatRequest = &omanager.HeartbeatRequest{
Timestamp: uint64(time.Now().Unix()),
}
msg.Message = hb
case <-deviceInfoTicker.C:
deviceInfo := new(omanager.ManagerMessage_DeviceRequest)
deviceInfo.DeviceRequest = &omanager.DeviceInfoRequest{}
msg.Message = deviceInfo
callback = func(err error) bool {
if err == nil {
deviceInfoTicker.Reset(time.Second * 180)
}
return true
}
case <-deviceUsageTicker.C:
deviceUsage := new(omanager.ManagerMessage_DeviceUsage)
deviceUsage.DeviceUsage = &omanager.DeviceUsageRequest{}
msg.Message = deviceUsage
callback = func(err error) bool {
if err == nil {
deviceUsageTicker.Reset(time.Second * 180)
}
return true
}
case <-statusTicker.C:
status := new(omanager.ManagerMessage_StatusRequest)
status.StatusRequest = &omanager.StatusRequest{}
msg.Message = status
callback = func(err error) bool {
if err == nil {
statusTicker.Reset(time.Second * 120)
}
return true
}
case dtask, ok := <-worker.taskCh:
if !ok {
return nil
}
task := dtask.task
taskMsg := new(omanager.ManagerMessage_PushTaskMessage)
taskMsg.PushTaskMessage = &omanager.PushTaskMessage{
TaskId: task.TaskId,
TaskType: task.TaskType,
Workload: 0,
TaskCmd: task.TaskCmd,
TaskParam: task.TaskParam,
}
msg.Message = taskMsg
callback = func(err error) bool {
if err == nil {
worker.recentTask.Add(task.TaskId, task)
}
select {
case dtask.errCh <- err:
default:
// err ch is invalid
}
return true
}
}
if msg != nil {
err := worker.stream.Send(msg)
if err != nil {
log.WithError(err).Error("send message to worker failed")
}
callback(err)
}
}
return nil
}
func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
for {
select {
case <-wm.quit:
return
case <-worker.quit:
return
default:
wmsg, err := worker.stream.Recv()
if err != nil {
log.WithError(err).WithField("worker", worker.uuid).Error("recv msg failed")
close(worker.quit)
return
}
switch msg := wmsg.Message.(type) {
case *omanager.WorkerMessage_GoodbyeMessage:
worker.quit <- msg.GoodbyeMessage.Reason
close(worker.taskCh)
return
case *omanager.WorkerMessage_SubmitTaskResult:
case *omanager.WorkerMessage_HeartbeatResponse:
case *omanager.WorkerMessage_Status:
case *omanager.WorkerMessage_DeviceInfo:
case *omanager.WorkerMessage_DeviceUsage:
default:
log.WithField("worker", worker.uuid).Error(fmt.Sprintf("unsupport msg type %T", msg))
}
}
}
}
package utils
import (
"sync"
"time"
)
var (
machineID int64 // 机器 id 占10位, 十进制范围是 [ 0, 1023 ]
sn int64 // 序列号占 12 位,十进制范围是 [ 0, 4095 ]
lastTimeStamp int64 // 上次的时间戳(毫秒级), 1秒=1000毫秒, 1毫秒=1000微秒,1微秒=1000纳秒
mu sync.Mutex
)
func init() {
machineID = 101 << 12
lastTimeStamp = time.Now().UnixNano() / 1000
}
func GetSnowflakeIdProcess() int64 {
curTimeStamp := time.Now().UnixNano() / 1000
// 同一毫秒
if curTimeStamp == lastTimeStamp {
// 序列号占 12 位,十进制范围是 [ 0, 4095 ]
if sn > 4095 {
time.Sleep(time.Microsecond)
curTimeStamp = time.Now().UnixNano() / 1000
sn = 0
}
} else {
sn = 0
}
sn++
lastTimeStamp = curTimeStamp
// 取 64 位的二进制数 0000000000 0000000000 0000000000 0001111111111 1111111111 1111111111 1 ( 这里共 41 个 1 )和时间戳进行并操作
// 并结果( 右数 )第 42 位必然是 0, 低 41 位也就是时间戳的低 41 位
rightBinValue := curTimeStamp & 0x1FFFFFFFFFF
// 机器 id 占用10位空间,序列号占用12位空间,所以左移 22 位; 经过上面的并操作,左移后的第 1 位,必然是 0
rightBinValue <<= 22
id := rightBinValue | machineID | sn
return id
}
func GetSnowflakeId() int64 {
mu.Lock()
defer mu.Unlock()
return GetSnowflakeIdProcess()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment