Commit 8cef9de9 authored by 贾浩@五瓣科技's avatar 贾浩@五瓣科技

update

parent 1754c613
......@@ -9,8 +9,8 @@ RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN apk add --update $PACKAGES
# Add source files
RUN mkdir -p ./witness
COPY ./ ./witness/
RUN mkdir -p ./validator
COPY ./ ./validator/
RUN go env -w GOPROXY="https://goproxy.cn,direct"
......@@ -18,20 +18,24 @@ RUN git clone https://code.wuban.net.cn/odysseus/odysseus-protocol.git
FROM base AS build
RUN cd witness && go mod tidy && go build -v -o /tmp/witness ./cmd/witness
RUN cd witness && go mod tidy && go build -v -o /tmp/validator ./cmd/validator
FROM alpine
WORKDIR /root
WORKDIR /app
COPY ./config.toml ./config.toml
COPY --from=build /tmp/witness /usr/bin/witness
VOLUME /app/config.toml
VOLUME /app/data
COPY --from=build /tmp/validator /usr/bin/validator
COPY ./entrypoint.sh /usr/local/bin/entrypoint.sh
RUN chmod u+x /usr/local/bin/entrypoint.sh
EXPOSE 20010 20011 20012
EXPOSE 20010 20012
ENTRYPOINT [ "/usr/local/bin/entrypoint.sh" ]
\ No newline at end of file
......@@ -25,6 +25,8 @@ validator:
dev:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/validator
DOCKER:
docker build -t validator:latest .
clean:
rm -rf build
......
......@@ -115,11 +115,11 @@ func getWithdrawProofs(params []byte, resp *jsonrpcMessage) {
date = paramList[1]
}
workload, proofs := validator.GetMerkleProof(common.HexToAddress(paramList[0]), date)
amount, proofs := validator.GetMerkleProof(common.HexToAddress(paramList[0]), date)
temp := map[string]interface{}{
"workload": workload,
"proofs": proofs,
"amount": amount,
"proofs": proofs,
}
resp.Result, _ = json.Marshal(temp)
......@@ -171,7 +171,7 @@ func getDailyMerkleNodes(params []byte, resp *jsonrpcMessage) {
var depth = 1
var rootHash common.Hash
_, err = time.Parse("2006-01-02", paramList[1].(string))
_, err = time.Parse("2006-01-02", paramList[0].(string))
if err != nil {
resp.Error = &jsonError{
Code: -32602,
......@@ -179,10 +179,10 @@ func getDailyMerkleNodes(params []byte, resp *jsonrpcMessage) {
}
return
}
date = paramList[1].(string)
date = paramList[0].(string)
if len(paramList) >= 2 {
_depth, ok := paramList[2].(float64)
_depth, ok := paramList[1].(float64)
if !ok {
resp.Error = &jsonError{
Code: -32602,
......@@ -194,7 +194,7 @@ func getDailyMerkleNodes(params []byte, resp *jsonrpcMessage) {
}
if len(paramList) >= 3 {
rootHash = common.HexToHash(paramList[3].(string))
rootHash = common.HexToHash(paramList[2].(string))
}
nodes := validator.GetDailyMerkleNodes(date, depth, rootHash)
......@@ -217,7 +217,7 @@ func getDailyMerkleSumNodes(params []byte, resp *jsonrpcMessage) {
var depth = 1
var rootHash common.Hash
_, err = time.Parse("2006-01-02", paramList[1].(string))
_, err = time.Parse("2006-01-02", paramList[0].(string))
if err != nil {
resp.Error = &jsonError{
Code: -32602,
......@@ -225,10 +225,10 @@ func getDailyMerkleSumNodes(params []byte, resp *jsonrpcMessage) {
}
return
}
date = paramList[1].(string)
date = paramList[0].(string)
if len(paramList) >= 2 {
_depth, ok := paramList[2].(float64)
_depth, ok := paramList[1].(float64)
if !ok {
resp.Error = &jsonError{
Code: -32602,
......@@ -240,7 +240,7 @@ func getDailyMerkleSumNodes(params []byte, resp *jsonrpcMessage) {
}
if len(paramList) >= 3 {
rootHash = common.HexToHash(paramList[3].(string))
rootHash = common.HexToHash(paramList[2].(string))
}
nodes, vals := validator.GetDailyMerkleSumNodes(date, depth, rootHash)
......
......@@ -8,11 +8,11 @@ rpc-listen = "0.0.0.0:20012"
private-key = "529f4efb80ac534f17d873104c71881c0970dbd5a886f183f63c5c6bb7a1fcd9"
chain-rpc = "https://ethereum-holesky-rpc.publicnode.com"
chain-rpc = "https://dev.rpc.gaicoin.ai"
store-contract = "0x7Cd36Bc2a477f60A14f08442179B2f626bE026Ea"
store-contract = "0xfFb096e2B90324FFcCbaf987BdD724462c0aE18c"
validator-contract = "0x6c77B761DC30e7844D40C8CA681731db8Ee61Cbd"
validator-contract = "0x2A03bA42139860aF46263755f6e5CBAe8195bB92" # 测试版本 不会revert
commit-offset = 3600 # utc + n seconds
......
......@@ -77,7 +77,7 @@ func (r *ChainRPC) GetNMAddresses() (addrs []common.Address, err error) {
}
func (r *ChainRPC) GetWorkloadThreshold(totalWorkload uint64) (threshold *big.Int, err error) {
return r.validatorContract.GetWorkloadThreshold(nil, big.NewInt(int64(totalWorkload)))
return r.validatorContract.GetWorkloadDistribution(nil, big.NewInt(int64(totalWorkload)))
}
// SubmitProofs 调用合约提交root
......@@ -89,9 +89,10 @@ func (r *ChainRPC) SubmitProofs(dateTimestamp int64, merkleSumTreeRoot, merkleTr
return
}
opts.Context = ctx
for i := 0; i < 3; i++ {
for i := 0; i < 10; i++ {
signedTx, err := r.validatorContract.SubmitMerkleRoot(opts, big.NewInt(dateTimestamp), merkleSumTreeRoot, merkleTreeRoot)
if err != nil {
log.WithError(err).Error("submit root, call contract failed, retry after 3 seconds")
time.Sleep(time.Second * 3)
continue
}
......@@ -118,7 +119,7 @@ func (r *ChainRPC) WaitForReceipt(ctx context.Context, txHash common.Hash) (rece
return receipt, nil
}
if err == ethereum.NotFound {
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 1)
continue
}
return nil, err
......
......@@ -16,7 +16,10 @@ func (v *Validator) GetPendingWorkload(address common.Address) (workload, global
log.WithError(err).Error("failed to get pending workload")
return
}
log.WithField("workload", wl).Debug("quest get pending workload")
log.WithFields(log.Fields{
"address": address.Hex(),
"workload": wl,
}).Debug("quest get pending workload")
return wl, v.pendingWorkload
}
......@@ -24,33 +27,28 @@ func (v *Validator) GetMerkleProof(address common.Address, date string) (balance
if date == "" {
date = v.date
}
v.Lock()
cacheTree, ok := v.mtTreeCache[date]
v.Unlock()
if !ok {
if ok = v.LoadMerkleTree(date); !ok {
log.WithFields(log.Fields{
"date": date,
}).Error("load merkle proof empty")
return "0", nil
}
}
v.Lock()
cacheTree = v.mtTreeCache[date]
v.Unlock()
dateStateRootKey := fmt.Sprintf("sroot:%s", date)
dateStateRoot, err := v.lvdb.Get([]byte(dateStateRootKey))
if err != nil {
if ok := v.LoadMerkleTree(date); !ok {
log.WithFields(log.Fields{
"key": dateStateRootKey,
"err": err.Error(),
}).Error("failed to get state root")
"date": date,
}).Error("load merkle proof empty")
return "0", nil
}
v.Lock()
cacheTree := v.mtTreeCache[date]
v.Unlock()
var sdb *StateDB
if date == v.date {
sdb = v.state
} else {
dateStateRootKey := fmt.Sprintf("sroot:%s", date)
dateStateRoot, err := v.lvdb.Get([]byte(dateStateRootKey))
if err != nil {
log.WithFields(log.Fields{
"key": dateStateRootKey,
"err": err.Error(),
}).Error("failed to get state root")
return "0", nil
}
sdb, err = NewStateDB(v.lvdb, common.BytesToHash(dateStateRoot))
if err != nil {
log.WithError(err).Error("failed to create state db")
......@@ -64,11 +62,15 @@ func (v *Validator) GetMerkleProof(address common.Address, date string) (balance
bigBalance, _ := new(big.Int).SetString(object.Balance, 10)
payload := append(common.HexToAddress(object.Miner).Bytes(), common.LeftPadBytes(bigBalance.Bytes(), 32)...)
leaf := crypto.Keccak256Hash(payload)
proofs, err = cacheTree.GetProof(leaf)
proofs, err := cacheTree.GetProof(leaf)
if err != nil {
log.WithError(err).Error("failed to get merkle proof")
return "0", make([]common.Hash, 0)
}
if len(proofs) == 0 {
return "0", nil
}
return object.Balance, proofs
}
......@@ -76,19 +78,14 @@ func (v *Validator) GetDailyMerkleNodes(date string, depth int, rootHash common.
if date == "" {
date = v.date
}
v.Lock()
cacheTree, ok := v.mtTreeCache[date]
v.Unlock()
if !ok {
if ok = v.LoadMerkleTree(date); !ok {
log.WithFields(log.Fields{
"date": date,
}).Error("load merkle proof empty")
return nil
}
if ok := v.LoadMerkleTree(date); !ok {
log.WithFields(log.Fields{
"date": date,
}).Error("load merkle proof empty")
return nil
}
v.Lock()
cacheTree = v.mtTreeCache[date]
cacheTree := v.mtTreeCache[date]
v.Unlock()
rootNode := cacheTree.GetRootNode()
......
......@@ -34,7 +34,7 @@ func (v *Validator) UpdateContractAddressJob() {
// UpdateGlobalWorkloadJob 定时从quest更新全局workload
func (v *Validator) UpdateGlobalWorkloadJob() {
ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
log.Info("start update global workload task")
for {
......
......@@ -58,9 +58,9 @@ func (v *Validator) verifyProof(dbProof *quest.ProofModel) (miner common.Address
log.WithFields(log.Fields{"taskid": dbProof.TaskId}).WithError(err).Error("failed to verify miner signature")
return
}
workerAddress := crypto.PubkeyToAddress(*minerPubKey)
if workerAddress.Hex() != common.HexToAddress(dbProof.TaskWorkerAccount).Hex() {
log.WithFields(log.Fields{"taskid": dbProof.TaskId, "miner": dbProof.TaskWorkerAccount, "signer": workerAddress.Hex()}).Error("invalid miner address")
minerAddress := crypto.PubkeyToAddress(*minerPubKey)
if minerAddress.Hex() != common.HexToAddress(dbProof.TaskWorkerAccount).Hex() {
log.WithFields(log.Fields{"taskid": dbProof.TaskId, "miner": dbProof.TaskWorkerAccount, "recovered miner": minerAddress.Hex()}).Error("invalid miner address")
return
}
......
......@@ -31,7 +31,9 @@ func NewStateDB(lvdb ethdb.KeyValueStore, root common.Hash) (statedb *StateDB, e
}
func (s *StateDB) GetMinerObject(miner common.Address) (object *validatorv1.MinerObject) {
object = &validatorv1.MinerObject{}
object = &validatorv1.MinerObject{
Balance: "0",
}
k := crypto.Keccak256(miner.Bytes())
v, err := s.trie.Get(k)
if err != nil {
......@@ -90,9 +92,10 @@ func (s *StateDB) IterAllObject() (objects []*validatorv1.MinerObject) {
object := &validatorv1.MinerObject{}
err := proto.Unmarshal(v, object)
if err != nil {
log.WithError(err).Error("failed to unmarshal object, iter")
continue
}
log.WithField("balance", object.Balance).Debug("miner object")
log.WithField("balance", object.Balance).Debug("iter miner object")
objects = append(objects, object)
}
return
......
......@@ -43,12 +43,12 @@ func (v *Validator) CommitMST(proofMap map[common.Address]*validatorv1.Validated
rootNode := mstTree.GetRoot()
err = v.lvdb.Put([]byte(fmt.Sprintf("mstroot:%s", v.date)), root.Bytes())
err = v.lvdb.Put([]byte(fmt.Sprintf("mstroot:%s", v.date)), rootNode.Value.Hash.Bytes())
if err != nil {
log.Error(err)
return
}
err = v.lvdb.Put([]byte(fmt.Sprintf("mstsum:%s", v.date)), sum.Bytes())
err = v.lvdb.Put([]byte(fmt.Sprintf("mstsum:%s", v.date)), rootNode.Value.BigValue.Bytes())
if err != nil {
log.Error(err)
return
......@@ -63,8 +63,8 @@ func (v *Validator) CommitMST(proofMap map[common.Address]*validatorv1.Validated
return
}
log.WithFields(log.Fields{
"root": root.Hex(),
"sum": sum.String(),
"root": rootNode.Value.Hash.Hex(),
"sum": rootNode.Value.BigValue.String(),
"cost": time.Since(st).String(),
}).Info("commit MST root")
......@@ -81,6 +81,7 @@ func (v *Validator) CommitMT(objects []*validatorv1.MinerObject) (root common.Ha
for _, object := range objects {
bigBalance, _ := new(big.Int).SetString(object.Balance, 10)
payload := append(common.HexToAddress(object.Miner).Bytes(), common.LeftPadBytes(bigBalance.Bytes(), 32)...)
log.Warnln(object.Miner)
_proof := crypto.Keccak256Hash(payload)
merkleProofs = append(merkleProofs, _proof)
dbProofs = append(dbProofs, _proof[:]...)
......@@ -125,6 +126,10 @@ func (v *Validator) LoadMerkleTree(date string) (ok bool) {
if date == "" {
return false
}
if _, ok = v.mtTreeCache[date]; ok {
return
}
merkleTreeKey := fmt.Sprintf("mtk:%s", date)
data, err := v.lvdb.Get([]byte(merkleTreeKey))
if err != nil {
......@@ -138,7 +143,7 @@ func (v *Validator) LoadMerkleTree(date string) (ok bool) {
log.WithFields(log.Fields{
"key": merkleTreeKey,
"length": len(data),
}).Info("diskdb load merkle proof")
}).Debug("diskdb load merkle proof")
proofs := make([]common.Hash, len(data)/32)
for i := 0; i < len(data)/32; i++ {
......@@ -153,7 +158,7 @@ func (v *Validator) LoadMerkleTree(date string) (ok bool) {
v.Lock()
v.mtTreeCache[date] = mTree
v.Unlock()
log.WithFields(log.Fields{"date": date, "root": mTree.GetRoot().Hex()}).Info("load merkle tree")
log.WithFields(log.Fields{"date": date, "root": mTree.GetRoot().Hex()}).Debug("load merkle tree")
return true
}
......@@ -162,6 +167,9 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) {
if date == "" {
return false
}
if _, ok = v.mstTreeCache[date]; ok {
return
}
merkleSumTreeKey := fmt.Sprintf("mstk:%s", date)
keyData, err := v.lvdb.Get([]byte(merkleSumTreeKey))
if err != nil {
......@@ -174,7 +182,7 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) {
log.WithFields(log.Fields{
"key": merkleSumTreeKey,
"length": len(keyData),
}).Info("diskdb load merkle sum proof key")
}).Debug("diskdb load merkle sum proof key")
// data = addr1:addr2
datas := make([][]byte, len(keyData)/20)
......@@ -195,7 +203,7 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) {
log.WithFields(log.Fields{
"key": merkleSumTreeVal,
"length": len(valData),
}).Info("diskdb load merkle sum proof val")
}).Debug("diskdb load merkle sum proof val")
vals := bytes.Split(valData, []byte(":"))
bigVals := make([]*big.Int, len(vals))
......@@ -207,6 +215,6 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) {
v.Lock()
v.mstTreeCache[date] = mstTree
v.Unlock()
log.WithFields(log.Fields{"date": date, "root": mstTree.GetRoot()}).Info("load merkle sum tree")
log.WithFields(log.Fields{"date": date, "root": mstTree.GetRoot()}).Debug("load merkle sum tree")
return true
}
......@@ -37,25 +37,6 @@ type Validator struct {
sync.Mutex
}
func getDBLastDayRoot(diskDB *leveldb.Database) (lastDay string, stateRoot common.Hash) {
ok, err := diskDB.Has([]byte("lastday"))
if err != nil {
panic(err)
}
if ok {
val, _ := diskDB.Get([]byte("lastday"))
log.WithField("last day", string(val)).Info("diskdb get last day")
lastDay = string(val)
root, _ := diskDB.Get([]byte(fmt.Sprintf("sroot:%s", string(val))))
stateRoot = common.BytesToHash(root)
log.WithFields(log.Fields{
"last day": lastDay,
"state root": stateRoot.Hex(),
}).Info("load state from diskdb")
}
return lastDay, stateRoot
}
func RunValidator(q *quest.Quest, cfg *conf.Config) *Validator {
_rpc := newChain(cfg.ChainRPC, cfg.PrivateKey, cfg.StoreContract, cfg.ValidatorContract)
diskDB, err := leveldb.New(fmt.Sprintf("%s/db", cfg.DataDir), 128, 1024, "", false)
......@@ -88,6 +69,25 @@ func RunValidator(q *quest.Quest, cfg *conf.Config) *Validator {
return v
}
func getDBLastDayRoot(diskDB *leveldb.Database) (lastDay string, stateRoot common.Hash) {
ok, err := diskDB.Has([]byte("lastday"))
if err != nil {
panic(err)
}
if ok {
val, _ := diskDB.Get([]byte("lastday"))
log.WithField("last day", string(val)).Info("diskdb get last day")
lastDay = string(val)
root, _ := diskDB.Get([]byte(fmt.Sprintf("sroot:%s", string(val))))
stateRoot = common.BytesToHash(root)
log.WithFields(log.Fields{
"last day": lastDay,
"state root": stateRoot.Hex(),
}).Info("load state from diskdb")
}
return lastDay, stateRoot
}
func (v *Validator) AddPendingProof(miner common.Address, proof *validatorv1.ValidatedProof) {
v.Lock()
defer v.Unlock()
......@@ -113,6 +113,11 @@ func (v *Validator) AddPendingProof(miner common.Address, proof *validatorv1.Val
// ProcessDayJob 每日任务
func (v *Validator) ProcessDayJob() {
dbLastDay, _ := getDBLastDayRoot(v.lvdb)
if dbLastDay == v.date && v.date != "" {
log.WithField("date", v.date).Warn("already process day job")
return
}
log.WithFields(log.Fields{
"today": v.todayString(),
"today timestamp": v.todayTimestamp(),
......@@ -133,11 +138,6 @@ func (v *Validator) ProcessDayJob() {
log.WithError(err).Error("failed to commit merkle tree")
return
}
err = v.lvdb.Put([]byte("lastday"), []byte(v.date))
if err != nil {
log.WithError(err).Error("db failed to update last day")
return
}
txHash, err := v.rpc.SubmitProofs(v.dateToTimestamp(v.date), mstRoot, mtRoot)
if err != nil {
log.WithError(err).Error("submit proofs")
......@@ -148,7 +148,12 @@ func (v *Validator) ProcessDayJob() {
log.WithError(err).Error("db failed to save txid")
return
}
log.WithField("tx_hash", txHash.Hex()).Info("submit proofs")
log.WithField("tx hash", txHash.Hex()).Info("submit proofs to contract")
err = v.lvdb.Put([]byte("lastday"), []byte(v.date))
if err != nil {
log.WithError(err).Error("db failed to update last day")
return
}
log.Info("process day done")
}
......@@ -161,11 +166,15 @@ func (v *Validator) Commit() (dayProofs map[common.Address]*validatorv1.Validate
log.WithError(err).Error("failed to get workload threshold")
return
}
log.WithField("balance_per_workload", balancePerWorkload.String()).Info("get workload threshold")
log.WithFields(log.Fields{
"workload": totalWorkload,
"amount per workload(wei)": balancePerWorkload.String(),
}).Info("get workload threshold from contract")
for miner, proof := range proof {
balance := big.NewInt(0).Mul(balancePerWorkload, big.NewInt(int64(proof.Workload)))
proof.Balance = balance.String()
log.Warnln("889888", miner.Hex())
err := v.SealProof(miner, proof)
if err != nil {
log.WithError(err).Error("failed to seal proof")
......@@ -182,6 +191,7 @@ func (v *Validator) Commit() (dayProofs map[common.Address]*validatorv1.Validate
log.WithError(err).Error("failed to update state root")
return
}
v.state, _ = NewStateDB(v.lvdb, root)
return proof
}
......@@ -219,11 +229,12 @@ func (v *Validator) RefreshPendingProof() (proof map[common.Address]*validatorv1
}
func (v *Validator) Ticker() {
executionTime := time.Now().Add(v.duration())
executionTime := time.Unix(v.todayTimestamp()+86400+int64(v.cfg.CommitOffset), 0)
// executionTime := time.Now().Add(time.Second * 3) for test
waitTime := executionTime.Sub(time.Now())
timer := time.NewTimer(waitTime)
log.WithFields(log.Fields{
"execution_time": executionTime.String(),
"execution_time": executionTime.UTC().String(),
"wait_time": waitTime.String(),
}).Info("prepare commit task")
for {
......@@ -233,7 +244,7 @@ func (v *Validator) Ticker() {
waitTime = executionTime.Sub(time.Now())
timer.Reset(waitTime)
log.WithFields(log.Fields{
"execution_time": executionTime.String(),
"execution_time": executionTime.UTC().String(),
"wait_time": waitTime.String(),
}).Info("prepare commit task")
}
......@@ -286,6 +297,7 @@ func (v *Validator) yesterdayString() string {
func (v *Validator) todayTimestamp() int64 {
now := time.Now().UTC()
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
// today := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC) for test
return today.Unix()
}
......
......@@ -3,6 +3,6 @@
# exit script on any error
set -e
echo "running witness"
nohup /usr/bin/witness -c /root/config.toml >> /root/witness.log 2>&1 &
tail -f /root/witness.log
echo "running validator"
nohup /usr/bin/validator -c /app/config.toml >> /app/validator.log 2>&1 &
tail -f /app/validator.log
......@@ -2,6 +2,7 @@ package quest
import (
"fmt"
"strings"
"validator/conf"
log "github.com/sirupsen/logrus"
......@@ -49,7 +50,7 @@ func (q *Quest) GetPendingWorkload(startTimestamp int64, address string) (worklo
querySQL := "SELECT " +
"`TaskWorkload`, `TaskReqHash`, `TaskRespHash`, `TaskManagerSignature`, `TaskContainerSignature`, `TaskMinerSignature`, `TaskProfitAccount`, `TaskWorkerAccount` " +
"FROM `proof` " +
"WHERE `TaskFinishTimestamp` >= ?" +
"WHERE `TaskFinishTimestamp` >= ? " +
"AND `TaskProfitAccount` = ? ;"
err = q.db.Raw(querySQL, startTimestamp, address).First(&proof).Error
......@@ -68,5 +69,8 @@ func (q *Quest) GetGlobalWorkload(startTimestamp int64) (workload uint64, err er
"FROM `proof` " +
"WHERE `TaskFinishTimestamp` >= ? AND `TaskFinishTimestamp` < ?;"
err = q.db.Raw(querySQL, startTimestamp, startTimestamp+86400).Row().Scan(&workload)
if err != nil && strings.Contains(err.Error(), "converting NULL to uint64 is unsupported") {
return 0, nil
}
return workload, err
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment