Commit a2607c16 authored by vicotor's avatar vicotor

add signature to check worker registed.

parent 0ab7e685
......@@ -5,6 +5,7 @@ metrics_port = 28010
private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC"
standard_task_file = "standardtask.json"
worker_multiple = 2
worker_signature_expired_time=1
[redis]
addr="127.0.0.1:6379"
......
......@@ -44,13 +44,14 @@ type Config struct {
Port int `json:"port" toml:"port"`
StandardTaskFile string `json:"standard_task_file" toml:"standard_task_file"`
//Endpoint string `json:"endpoint" toml:"endpoint"`
MetricPort int `json:"metrics_port" toml:"metrics_port"`
EnablePay bool `json:"enable_pay" toml:"enable_pay"`
WorkerMultiple int `json:"worker_multiple" toml:"worker_multiple"`
Redis RedisConfig `json:"redis" toml:"redis"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"`
Tickers TickerConfig `json:"ticker" toml:"ticker"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"`
MetricPort int `json:"metrics_port" toml:"metrics_port"`
EnablePay bool `json:"enable_pay" toml:"enable_pay"`
WorkerMultiple int `json:"worker_multiple" toml:"worker_multiple"`
WorkerSignatureExpiredTime int64 `json:"worker_signature_expired_time" toml:"worker_signature_expired_time"`
Redis RedisConfig `json:"redis" toml:"redis"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"`
Tickers TickerConfig `json:"ticker" toml:"ticker"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"`
}
var _cfg *Config = nil
......@@ -99,3 +100,10 @@ func (conf *Config) GetWorkerMultiple() int {
return 1
}
}
func (conf *Config) GetWorkerSignatureExpiredTime() int64 {
if conf.WorkerSignatureExpiredTime < 1 {
return 1
}
return conf.WorkerSignatureExpiredTime
}
......@@ -74,21 +74,8 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
{
// verify container_signature and miner_signature
// container_signature = sign(hash(task_id+hash(task_param)+hash(task_result)))
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
signature := result.ContainerSignature
if len(signature) == 65 {
signature = signature[:64]
}
pubLen := len(task.ContainerPubkey)
pubkey := []byte{}
if pubLen == 130 || pubLen == 132 {
pubkey = utils.FromHex(string(task.ContainerPubkey))
} else if pubLen == 65 {
pubkey = task.ContainerPubkey
}
verified := crypto.VerifySignature(pubkey, dataHash[:], signature)
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.ContainerSignature, task.ContainerPubkey)
log.WithFields(log.Fields{
"task-id": task.TaskId,
"containerSignatureVerify": verified,
......@@ -101,12 +88,8 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
{
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
signature := result.MinerSignature
if len(signature) == 65 {
signature = signature[:64]
}
verified := crypto.VerifySignature(utils.FromHex(worker.info.nodeInfo.MinerPubkey), dataHash[:], signature)
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.MinerPubkey))
log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified,
......@@ -183,21 +166,8 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
{
// verify container_signature and miner_signature
// container_signature = sign(hash(task_id+hash(task_param)+hash(task_result)))
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
signature := result.ContainerSignature
if len(signature) == 65 {
signature = signature[:64]
}
pubLen := len(task.ContainerPubkey)
pubkey := []byte{}
if pubLen == 130 || pubLen == 132 {
pubkey = utils.FromHex(string(task.ContainerPubkey))
} else if pubLen == 65 {
pubkey = task.ContainerPubkey
}
verified := crypto.VerifySignature(pubkey, dataHash[:], signature)
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.ContainerSignature, task.ContainerPubkey)
log.WithFields(log.Fields{
"task-id": task.TaskId,
"containerSignatureVerify": verified,
......@@ -210,12 +180,8 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
{
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
signature := result.MinerSignature
if len(signature) == 65 {
signature = signature[:64]
}
verified := crypto.VerifySignature(utils.FromHex(worker.info.nodeInfo.MinerPubkey), dataHash[:], signature)
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.MinerPubkey))
log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified,
......
......@@ -18,6 +18,7 @@ import (
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
"math/big"
"regexp"
"strconv"
"sync"
......@@ -30,6 +31,9 @@ var (
ErrHeartBeatExpired = errors.New("worker heartbeat expired")
ErrLongtimeNoTask = errors.New("worker long time no task")
ErrInvalidMessageValue = errors.New("invalid message value")
ErrInvalidMsgSignature = errors.New("invalid message signature")
ErrExpiredMsgSignature = errors.New("expired message signature")
ErrOldConnection = errors.New("old connection")
)
type TaskStatus int
......@@ -611,6 +615,27 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Debug("receive registed message")
// todo: verify signature
{
sig := msg.RegisteMessage.DeviceSignature
data := utils.CombineBytes([]byte(msg.RegisteMessage.MinerPubkey), []byte(msg.RegisteMessage.BenefitAddress),
[]byte(msg.RegisteMessage.DeviceIp), big.NewInt(int64(msg.RegisteMessage.Timestamp)).Bytes())
if !utils.VerifySignature(data, sig, utils.FromHex(msg.RegisteMessage.MinerPubkey)) {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Error("verify device signature failed")
worker.quit <- ErrInvalidMessageValue
return
}
if time.Now().Unix()-int64(msg.RegisteMessage.Timestamp) > config.GetConfig().GetWorkerSignatureExpiredTime() {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Error("message signature expired")
worker.quit <- ErrExpiredMsgSignature
return
}
}
if pubkey, err := utils.HexToPubkey(msg.RegisteMessage.MinerPubkey); err != nil {
l.WithFields(log.Fields{
......@@ -619,8 +644,10 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
}).Error("parse pubkey failed")
} else {
addr := utils.PubkeyToAddress(pubkey)
if wm.GetWorkerByAddr(addr) != nil {
l.WithField("worker-addr", worker.workerAddr).Error("worker with the address is existed")
if old := wm.GetWorkerByAddr(addr); old != nil {
old.errCh <- ErrOldConnection
l.WithField("worker-addr", worker.workerAddr).Error("worker with the address is existed, and disconnect it")
worker.quit <- ErrWorkerExist
return
}
......
......@@ -41,3 +41,21 @@ func HexToPubkey(key string) (*ecdsa.PublicKey, error) {
}
return crypto.UnmarshalPubkey(pub)
}
func VerifySignature(data []byte, signature []byte, oripubkey []byte) bool {
dataHash := crypto.Keccak256Hash(data)
if len(signature) == 65 {
signature = signature[:64]
}
pubLen := len(oripubkey)
pubkey := []byte{}
if pubLen == 130 || pubLen == 132 {
pubkey = FromHex(string(oripubkey))
} else if pubLen == 65 {
pubkey = oripubkey
}
verified := crypto.VerifySignature(pubkey, dataHash[:], signature)
return verified
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment