Commit 6a1ef51d authored by vicotor's avatar vicotor

update sendmsg

parent bba4e8d3
......@@ -16,10 +16,15 @@ type workerInfo struct {
resourceInfo *omanager.SubmitResourceMap
}
type sendMsgCallback struct {
msg *omanager.ManagerMessage
callback func(err error) bool
}
type Worker struct {
quit chan interface{}
taskCh chan *dispatchTask
msgCh chan *omanager.WorkerMessage
sendCh chan sendMsgCallback
resultCh chan *omanager.SubmitTaskResult
uuid int64 // worker uuid in the local.
......@@ -59,14 +64,39 @@ func (w *Worker) DisConnected() bool {
return w.disconnect
}
func (w *Worker) SendMessage(msg *omanager.ManagerMessage, callback func(err error) bool) {
if msg.Message != nil {
err := w.stream.Send(msg)
if err != nil {
log.WithError(err).Error("send message to worker failed")
func (w *Worker) SendToWorker(msg *omanager.ManagerMessage, callback func(err error) bool) {
w.sendCh <- sendMsgCallback{msg, callback}
}
func (w *Worker) SendMessage() {
l := log.WithField("worker-uuid", w.uuid)
defer func() {
if e := recover(); e != nil {
l.WithField("worker-addr", w.workerAddr).Error("worker handle message panic")
}
}()
for w.DisConnected() == false {
select {
case <-w.quit:
return
case m := <-w.sendCh:
if w.DisConnected() {
return
}
start := time.Now()
if m.msg.Message != nil {
err := w.stream.Send(m.msg)
if err != nil {
log.WithError(err).Error("send message to worker failed")
}
if m.callback != nil {
m.callback(err)
}
}
log.WithField("worker-addr", w.workerAddr).WithField("send duration", time.Now().Sub(start).String()).Debug("send msg to worker")
}
callback(err)
}
}
func (w *Worker) RecvMessage() {
......
......@@ -143,6 +143,7 @@ func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerServi
errCh: make(chan error, 1),
taskCh: make(chan *dispatchTask),
msgCh: make(chan *omanager.WorkerMessage, 30),
sendCh: make(chan sendMsgCallback, 30),
resultCh: make(chan *omanager.SubmitTaskResult, 30),
uuid: id,
......@@ -239,6 +240,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
wm.disconnect(worker)
}()
go worker.SendMessage()
for {
var msg = new(omanager.ManagerMessage)
......@@ -388,7 +390,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
}).Warn("task is timeout")
}
}
worker.SendMessage(msg, callback)
worker.SendToWorker(msg, callback)
}
return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment