Commit bba4e8d3 authored by vicotor's avatar vicotor

update worker.

parent 86f12881
package server
import (
"github.com/ethereum/go-ethereum/common"
lru "github.com/hashicorp/golang-lru"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
log "github.com/sirupsen/logrus"
"time"
)
type workerInfo struct {
nodeInfo *omanager.NodeInfoResponse
deviceUsageInfo []*omanager.DeviceUsage
deviceInfo *omanager.DeviceInfoMessage
deviceStatusInfo *omanager.StatusResponse
resourceInfo *omanager.SubmitResourceMap
}
type Worker struct {
quit chan interface{}
taskCh chan *dispatchTask
msgCh chan *omanager.WorkerMessage
resultCh chan *omanager.SubmitTaskResult
uuid int64 // worker uuid in the local.
registed bool // worker is registed to this nm.
online bool
disconnect bool
nonce int
latestNmValue string
addFirstSucceed bool
info workerInfo
workerAddr string // worker address from public-key
deviceInfoHash []byte
recentTask *lru.ARCCache
status string
errCh chan error
stream omanager.NodeManagerService_RegisterWorkerServer
}
func (w *Worker) ProfitAccount() common.Address {
if w.info.nodeInfo != nil {
return common.HexToAddress(w.info.nodeInfo.BenefitAddress)
}
return common.Address{}
}
func (w *Worker) WorkerAccount() common.Address {
return common.HexToAddress(w.workerAddr)
}
func (w *Worker) Disconnect() {
w.disconnect = true
}
func (w *Worker) DisConnected() bool {
return w.disconnect
}
func (w *Worker) SendMessage(msg *omanager.ManagerMessage, callback func(err error) bool) {
if msg.Message != nil {
err := w.stream.Send(msg)
if err != nil {
log.WithError(err).Error("send message to worker failed")
}
callback(err)
}
}
func (w *Worker) RecvMessage() {
l := log.WithField("worker-uuid", w.uuid)
defer func() {
if e := recover(); e != nil {
l.WithField("worker-addr", w.workerAddr).Error("worker handle message panic")
}
}()
for {
start := time.Now()
wmsg, err := w.stream.Recv()
if err != nil {
l.WithError(err).WithField("worker-addr", w.workerAddr).WithField("recv duration", time.Now().Sub(start).String()).Error("recv msg failed")
w.quit <- "recv msg failed"
return
}
if w.DisConnected() {
return
}
w.msgCh <- wmsg
}
}
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/google/uuid" "github.com/google/uuid"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
...@@ -43,46 +42,6 @@ const ( ...@@ -43,46 +42,6 @@ const (
TASK_TIMEOUT TASK_TIMEOUT
) )
type workerInfo struct {
nodeInfo *omanager.NodeInfoResponse
deviceUsageInfo []*omanager.DeviceUsage
deviceInfo *omanager.DeviceInfoMessage
deviceStatusInfo *omanager.StatusResponse
resourceInfo *omanager.SubmitResourceMap
}
type Worker struct {
quit chan interface{}
taskCh chan *dispatchTask
resultCh chan *omanager.SubmitTaskResult
uuid int64 // worker uuid in the local.
registed bool // worker is registed to this nm.
online bool
nonce int
latestNmValue string
addFirstSucceed bool
info workerInfo
workerAddr string // worker address from public-key
deviceInfoHash []byte
recentTask *lru.ARCCache
status string
stream omanager.NodeManagerService_RegisterWorkerServer
}
func (w *Worker) ProfitAccount() common.Address {
if w.info.nodeInfo != nil {
return common.HexToAddress(w.info.nodeInfo.BenefitAddress)
}
return common.Address{}
}
func (w *Worker) WorkerAccount() common.Address {
return common.HexToAddress(w.workerAddr)
}
type WorkerManager struct { type WorkerManager struct {
rdb *redis.Client rdb *redis.Client
heartBeat map[int64]int64 heartBeat map[int64]int64
...@@ -181,7 +140,9 @@ func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerServi ...@@ -181,7 +140,9 @@ func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerServi
} }
w := &Worker{ w := &Worker{
quit: make(chan interface{}), quit: make(chan interface{}),
errCh: make(chan error, 1),
taskCh: make(chan *dispatchTask), taskCh: make(chan *dispatchTask),
msgCh: make(chan *omanager.WorkerMessage, 30),
resultCh: make(chan *omanager.SubmitTaskResult, 30), resultCh: make(chan *omanager.SubmitTaskResult, 30),
uuid: id, uuid: id,
...@@ -231,7 +192,7 @@ func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse ...@@ -231,7 +192,7 @@ func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse
func (wm *WorkerManager) disconnect(worker *Worker) { func (wm *WorkerManager) disconnect(worker *Worker) {
worker.online = false worker.online = false
worker.status = "disconnected" worker.status = "disconnected"
worker.Disconnect()
wm.InActiveWorker(worker) wm.InActiveWorker(worker)
if worker.registed { if worker.registed {
wm.StopRegistry(worker.uuid) wm.StopRegistry(worker.uuid)
...@@ -427,14 +388,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -427,14 +388,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
}).Warn("task is timeout") }).Warn("task is timeout")
} }
} }
worker.SendMessage(msg, callback)
if msg.Message != nil {
err := worker.stream.Send(msg)
if err != nil {
log.WithError(err).Error("send message to worker failed")
}
callback(err)
}
} }
return nil return nil
...@@ -444,16 +398,23 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -444,16 +398,23 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
l.WithField("worker-addr", worker.workerAddr).Info("start handle worker message") l.WithField("worker-addr", worker.workerAddr).Info("start handle worker message")
defer l.WithField("worker-addr", worker.workerAddr).Info("exit handle worker message") defer l.WithField("worker-addr", worker.workerAddr).Info("exit handle worker message")
defer close(worker.quit) defer close(worker.quit)
defer worker.Disconnect()
checkDuration := config.GetConfig().Tickers.HeartBeat * 3 checkDuration := config.GetConfig().Tickers.HeartBeat * 3
workerCheckTicker := time.NewTicker(time.Second * time.Duration(checkDuration)) workerCheckTicker := time.NewTicker(time.Second * time.Duration(checkDuration))
defer workerCheckTicker.Stop() defer workerCheckTicker.Stop()
go worker.RecvMessage()
for { for {
select { select {
case <-wm.quit: case <-wm.quit:
return return
case workerErr := <-worker.errCh:
log.WithError(workerErr).WithField("worker-uuid", worker.uuid).Error("worker error")
worker.quit <- workerErr
return
case <-workerCheckTicker.C: case <-workerCheckTicker.C:
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(checkDuration) { if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(checkDuration) {
log.WithField("worker-uuid", worker.uuid).Error("worker heartbeat expired") log.WithField("worker-uuid", worker.uuid).Error("worker heartbeat expired")
...@@ -473,17 +434,9 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -473,17 +434,9 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
return return
} }
} }
default: case wmsg := <-worker.msgCh:
wmsg, err := worker.stream.Recv()
if err != nil {
l.WithError(err).WithField("worker-addr", worker.workerAddr).Error("recv msg failed")
worker.quit <- "recv msg failed"
return
}
worker.online = true worker.online = true
switch msg := wmsg.Message.(type) { switch msg := wmsg.Message.(type) {
case *omanager.WorkerMessage_GoodbyeMessage: case *omanager.WorkerMessage_GoodbyeMessage:
worker.online = false worker.online = false
worker.quit <- msg.GoodbyeMessage.Reason worker.quit <- msg.GoodbyeMessage.Reason
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment