Commit 0c7cbbc0 authored by vicotor's avatar vicotor

add post receipt

parent e1b452e2
......@@ -15,6 +15,10 @@ user="root"
password="12345678"
database="liuxuzhong"
[kafka]
brokers="127.0.0.1:9092"
receipt_topic="taskreceipt"
[ticker]
heart_beat = 10
status_ticker = 10
......
......@@ -14,6 +14,12 @@ type MysqlConfig struct {
DbName string `json:"database" toml:"database"`
}
type KafkaConfig struct {
Brokers string `json:"brokers" toml:"brokers"`
ReceiptTopic string `json:"receipt_topic" toml:"receipt_topic"`
TaskTopic string `json:"task_topic" toml:"task_topic"`
}
type RedisConfig struct {
Addr string `json:"addr" toml:"addr"`
Password string `json:"password" toml:"password"`
......@@ -35,6 +41,7 @@ type Config struct {
Redis RedisConfig `json:"redis" toml:"redis"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"`
Tickers TickerConfig `json:"ticker" toml:"ticker"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"`
}
var _cfg *Config = nil
......
......@@ -4,8 +4,10 @@ go 1.18
require (
github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1
github.com/astaxie/beego v1.12.3
github.com/ethereum/go-ethereum v1.13.10
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/hashicorp/golang-lru v0.5.4
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
......@@ -25,26 +27,41 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
......
This diff is collapsed.
......@@ -3,10 +3,12 @@ package server
import (
"context"
"crypto/ecdsa"
"github.com/IBM/sarama"
"github.com/ethereum/go-ethereum/crypto"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/nmregistry"
"github.com/odysseus/nodemanager/utils"
basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/odysseus/payment/cachedata"
"github.com/odysseus/payment/model"
......@@ -14,6 +16,7 @@ import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"net"
"strings"
)
type Node struct {
......@@ -23,6 +26,8 @@ type Node struct {
wm *WorkerManager
privk *ecdsa.PrivateKey
cache *cachedata.CacheData
kafkaProducer sarama.AsyncProducer
taskResultCh chan *basev1.TaskReceipt
}
func NewNode() *Node {
......@@ -50,12 +55,16 @@ func NewNode() *Node {
log.WithError(err).Error("failed to parse node manager private key")
return nil
}
brokers := strings.Split(config.GetConfig().Kafka.Brokers, ";")
producer, _ := utils.NewKafkaProducer(brokers)
node := &Node{
rdb: rdb,
privk: privk,
cache: pay,
apiServer: grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)),
registry: nmregistry.NewRegistryService(config.GetConfig(), rdb, privk.PublicKey),
kafkaProducer: producer,
taskResultCh: make(chan *basev1.TaskReceipt, 100000),
}
node.wm = NewWorkerManager(rdb, node)
......@@ -69,6 +78,8 @@ func (n *Node) Sign(hash []byte) ([]byte, error) {
func (n *Node) Start() error {
go n.registry.Start()
go n.postLoop()
if err := n.apiStart(); err != nil {
return err
}
......@@ -96,7 +107,32 @@ func (n *Node) apiStart() error {
return nil
}
func (n *Node) PostResult(result *basev1.TaskReceipt) {
defer func() {
// handler recover
if err := recover(); err != nil {
log.WithError(err.(error)).Error("post result panic")
}
}()
n.taskResultCh <- result
}
func (n *Node) postLoop() {
for {
select {
case receipt, ok := <-n.taskResultCh:
if !ok {
return
}
if err := utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic); err != nil {
log.WithError(err).Error("fire task receipt to kafka failed")
}
}
}
}
func (n *Node) Stop() {
n.registry.Stop()
n.apiServer.Stop()
close(n.taskResultCh)
}
......@@ -24,6 +24,7 @@ import (
)
var (
Succeed = errors.New("succeed")
ErrWorkerExist = errors.New("worker exist")
ErrHeartBeatExpired = errors.New("worker heartbeat expired")
)
......@@ -296,6 +297,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
log.WithField("worker", worker.uuid).Error("task id not match")
continue
}
if result.IsSuccessed == false {
taskResponse := &odysseus.TaskResponse{
TaskUuid: task.TaskUuid,
......@@ -305,6 +307,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
TaskIsSucceed: false,
TaskError: "worker failed",
}
receipt := wm.makeReceipt(worker, task, result, errors.New("worker failed"))
wm.node.PostResult(receipt)
go wm.doCallback(task.TaskCallback, taskResponse)
continue
}
......@@ -335,6 +339,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
}
}
receipt := wm.makeReceipt(worker, task, result, Succeed)
wm.node.PostResult(receipt)
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResult)
......@@ -423,6 +430,11 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
log.WithFields(log.Fields{
"worker": worker.uuid,
}).Debugf("receive worker status:0x%x", msg.Status.DeviceStatus)
case *omanager.WorkerMessage_ResourceMap:
// todo: store worker resource map.
log.WithFields(log.Fields{
"worker": worker.uuid,
}).Debugf("receive worker resource map:%v", msg.ResourceMap)
case *omanager.WorkerMessage_DeviceInfo:
// todo: handler worker device info
log.WithFields(log.Fields{
......@@ -488,3 +500,27 @@ func (wm *WorkerManager) Payment(task *odysseus.TaskContent) error {
}
return nil
}
func (wm *WorkerManager) makeReceipt(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult, err error) *odysseus.TaskReceipt {
now := uint64(time.Now().Unix())
receipt := &odysseus.TaskReceipt{
TaskUuid: task.TaskUuid,
TaskFinishTime: now,
TaskId: task.TaskId,
TaskUid: task.TaskUid,
TaskWorkload: task.TaskWorkload,
TaskDuration: int64(now - task.TaskTimestamp),
TaskFee: 0,
TaskOutLen: int64(len(result.TaskResult)),
TaskProfitAccount: worker.ProfitAccount().Hex(),
TaskWorkerAccount: worker.WorkerAccount().Hex(),
}
if result.IsSuccessed {
fee, _ := strconv.ParseInt(task.TaskFee, 10, 64)
receipt.TaskFee = fee
receipt.TaskResult = Succeed.Error()
} else {
receipt.TaskResult = err.Error()
}
return receipt
}
package utils
import (
"github.com/IBM/sarama"
"github.com/gogo/protobuf/proto"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"time"
)
// NewKafkaProducer Create a new KafkaProducer.
func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Return.Errors = false
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Net.ResolveCanonicalBootstrapServers = false
producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
if err != nil {
return nil, err
}
go func() {
for _ = range producer.Errors() {
//log.Printf("Failed to send log entry to kafka: %v\n", err)
}
}()
return producer, nil
}
func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error {
// Check time for partition key
var partitionKey sarama.StringEncoder
partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02"))
b, err := proto.Marshal(receipt)
if err != nil {
return err
}
value := sarama.ByteEncoder(b)
producer.Input() <- &sarama.ProducerMessage{
Key: partitionKey,
Topic: topic,
Value: value,
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment