Commit c28c604f authored by vicotor's avatar vicotor

add callback when dispatch failed

parent 8708d4c9
...@@ -5,6 +5,7 @@ go 1.20 ...@@ -5,6 +5,7 @@ go 1.20
require ( require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1 github.com/IBM/sarama v1.42.1
github.com/astaxie/beego v1.12.3
github.com/ethereum/go-ethereum v1.13.10 github.com/ethereum/go-ethereum v1.13.10
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
...@@ -72,6 +73,7 @@ require ( ...@@ -72,6 +73,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/protobuf v1.32.0 // indirect google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )
......
This diff is collapsed.
...@@ -66,6 +66,17 @@ func (n *Node) Loop(idx int) { ...@@ -66,6 +66,17 @@ func (n *Node) Loop(idx int) {
log.WithError(err).Error("attach kafka consumer failed") log.WithError(err).Error("attach kafka consumer failed")
return return
} }
postResult := func(task *odysseus.TaskContent, result *odysseus.TaskResponse) error {
d, _ := proto.Marshal(result)
err := utils.Post(task.TaskCallback, d)
if err != nil {
log.WithError(err).Error("post task result failed")
} else {
log.WithField("taskid", task.TaskId).Debug("post task result")
}
return err
}
for { for {
select { select {
case <-n.quit: case <-n.quit:
...@@ -75,7 +86,23 @@ func (n *Node) Loop(idx int) { ...@@ -75,7 +86,23 @@ func (n *Node) Loop(idx int) {
case task := <-taskCh: case task := <-taskCh:
log.WithField("task", task).Info("get task") log.WithField("task", task).Info("get task")
for {
worker, err := PopWorker(n.rdb) worker, err := PopWorker(n.rdb)
if err == ErrNoWorker {
result := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: false,
TaskError: err.Error(),
}
err = postResult(task, result)
if err != nil {
log.WithError(err).Error("post task result failed")
}
break
}
if err != nil { if err != nil {
log.WithError(err).Error("pop worker failed") log.WithError(err).Error("pop worker failed")
continue continue
...@@ -86,6 +113,8 @@ func (n *Node) Loop(idx int) { ...@@ -86,6 +113,8 @@ func (n *Node) Loop(idx int) {
continue continue
} }
} }
}
} }
} }
......
...@@ -17,6 +17,11 @@ var ( ...@@ -17,6 +17,11 @@ var (
maxPriority = 1 // total priority for worker queue maxPriority = 1 // total priority for worker queue
) )
var (
ErrNoWorker = errors.New("no worker")
ErrDispatchFailed = errors.New("dispatch to nodemanager failed")
)
type Worker struct { type Worker struct {
workerid string workerid string
priority int priority int
...@@ -43,7 +48,7 @@ func PopWorker(rdb *redis.Client) (Worker, error) { ...@@ -43,7 +48,7 @@ func PopWorker(rdb *redis.Client) (Worker, error) {
}, nil }, nil
} }
return Worker{}, errors.New("no worker") return Worker{}, ErrNoWorker
} }
func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) { func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) {
...@@ -85,5 +90,5 @@ func DispatchTask(worker Worker, task *odysseus.TaskContent) error { ...@@ -85,5 +90,5 @@ func DispatchTask(worker Worker, task *odysseus.TaskContent) error {
return nil return nil
} }
return errors.New("dispatch to manager all failed") return ErrDispatchFailed
} }
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"encoding/hex" "encoding/hex"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"strings"
) )
func HexToPrivatekey(key string) (*ecdsa.PrivateKey, error) { func HexToPrivatekey(key string) (*ecdsa.PrivateKey, error) {
...@@ -28,6 +29,9 @@ func PubkeyToHex(key *ecdsa.PublicKey) string { ...@@ -28,6 +29,9 @@ func PubkeyToHex(key *ecdsa.PublicKey) string {
} }
func HexToPubkey(key string) (*ecdsa.PublicKey, error) { func HexToPubkey(key string) (*ecdsa.PublicKey, error) {
if strings.HasPrefix(key, "0x") {
key = key[2:]
}
pub, err := hex.DecodeString(key) pub, err := hex.DecodeString(key)
if err != nil { if err != nil {
return nil, err return nil, err
......
package utils
import (
"github.com/astaxie/beego/httplib"
)
func Post(url string, data []byte) error {
_, err := httplib.Post(url).Body(data).Response()
if err != nil {
return err
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment