Commit 13ed95a7 authored by 贾浩@五瓣科技's avatar 贾浩@五瓣科技

fix mst tree

parent 4da27f86
...@@ -33,7 +33,7 @@ func newChain(rpc, privateKey, storageCa, validatorCa string) *ChainRPC { ...@@ -33,7 +33,7 @@ func newChain(rpc, privateKey, storageCa, validatorCa string) *ChainRPC {
panic(err) panic(err)
} }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel() defer cancel()
chainID, err := ethRpc.ChainID(ctx) chainID, err := ethRpc.ChainID(ctx)
...@@ -88,6 +88,7 @@ func (r *ChainRPC) SubmitProofs(dateTimestamp int64, merkleSumTreeRoot, merkleTr ...@@ -88,6 +88,7 @@ func (r *ChainRPC) SubmitProofs(dateTimestamp int64, merkleSumTreeRoot, merkleTr
if err != nil { if err != nil {
return return
} }
opts.GasLimit = 500000
opts.Context = ctx opts.Context = ctx
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
signedTx, err := r.validatorContract.SubmitMerkleRoot(opts, big.NewInt(dateTimestamp), merkleSumTreeRoot, merkleTreeRoot) signedTx, err := r.validatorContract.SubmitMerkleRoot(opts, big.NewInt(dateTimestamp), merkleSumTreeRoot, merkleTreeRoot)
......
...@@ -68,6 +68,10 @@ func (v *Validator) CommitMST(proofMap map[common.Address]*validatorv1.Validated ...@@ -68,6 +68,10 @@ func (v *Validator) CommitMST(proofMap map[common.Address]*validatorv1.Validated
"cost": time.Since(st).String(), "cost": time.Since(st).String(),
}).Info("commit MST root") }).Info("commit MST root")
v.Lock()
v.mstTreeCache[v.date] = mstTree
v.Unlock()
return rootNode.Value.Hash, rootNode.Value.BigValue, nil return rootNode.Value.Hash, rootNode.Value.BigValue, nil
} }
...@@ -81,7 +85,6 @@ func (v *Validator) CommitMT(objects []*validatorv1.MinerObject) (root common.Ha ...@@ -81,7 +85,6 @@ func (v *Validator) CommitMT(objects []*validatorv1.MinerObject) (root common.Ha
for _, object := range objects { for _, object := range objects {
bigBalance, _ := new(big.Int).SetString(object.Balance, 10) bigBalance, _ := new(big.Int).SetString(object.Balance, 10)
payload := append(common.HexToAddress(object.Miner).Bytes(), common.LeftPadBytes(bigBalance.Bytes(), 32)...) payload := append(common.HexToAddress(object.Miner).Bytes(), common.LeftPadBytes(bigBalance.Bytes(), 32)...)
log.Warnln(object.Miner)
_proof := crypto.Keccak256Hash(payload) _proof := crypto.Keccak256Hash(payload)
merkleProofs = append(merkleProofs, _proof) merkleProofs = append(merkleProofs, _proof)
dbProofs = append(dbProofs, _proof[:]...) dbProofs = append(dbProofs, _proof[:]...)
...@@ -111,7 +114,9 @@ func (v *Validator) CommitMT(objects []*validatorv1.MinerObject) (root common.Ha ...@@ -111,7 +114,9 @@ func (v *Validator) CommitMT(objects []*validatorv1.MinerObject) (root common.Ha
"k": fmt.Sprintf("mtk:%s", v.date), "k": fmt.Sprintf("mtk:%s", v.date),
"v_length": len(dbProofs), "v_length": len(dbProofs),
}).Debug() }).Debug()
v.Lock()
v.mtTreeCache[v.date] = mtTree v.mtTreeCache[v.date] = mtTree
v.Unlock()
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"root": root.Hex(), "root": root.Hex(),
...@@ -143,7 +148,7 @@ func (v *Validator) LoadMerkleTree(date string) (ok bool) { ...@@ -143,7 +148,7 @@ func (v *Validator) LoadMerkleTree(date string) (ok bool) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"key": merkleTreeKey, "key": merkleTreeKey,
"length": len(data), "length": len(data),
}).Debug("diskdb load merkle proof") }).Debug("diskdb load merkle tree")
proofs := make([]common.Hash, len(data)/32) proofs := make([]common.Hash, len(data)/32)
for i := 0; i < len(data)/32; i++ { for i := 0; i < len(data)/32; i++ {
...@@ -170,6 +175,7 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) { ...@@ -170,6 +175,7 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) {
if _, ok = v.mstTreeCache[date]; ok { if _, ok = v.mstTreeCache[date]; ok {
return return
} }
merkleSumTreeKey := fmt.Sprintf("mstk:%s", date) merkleSumTreeKey := fmt.Sprintf("mstk:%s", date)
keyData, err := v.lvdb.Get([]byte(merkleSumTreeKey)) keyData, err := v.lvdb.Get([]byte(merkleSumTreeKey))
if err != nil { if err != nil {
...@@ -185,11 +191,11 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) { ...@@ -185,11 +191,11 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) {
}).Debug("diskdb load merkle sum proof key") }).Debug("diskdb load merkle sum proof key")
// data = addr1:addr2 // data = addr1:addr2
datas := make([][]byte, len(keyData)/20) rawKeys := make([][]byte, len(keyData)/20)
for i := 0; i < len(keyData)/20; i++ { for i := 0; i < len(keyData)/20; i++ {
copy(datas[i], keyData[i*20:(i+1)*20]) rawKeys[i] = make([]byte, 20)
copy(rawKeys[i], keyData[i*20:(i+1)*20])
} }
merkleSumTreeVal := fmt.Sprintf("mstv:%s", date) merkleSumTreeVal := fmt.Sprintf("mstv:%s", date)
valData, err := v.lvdb.Get([]byte(merkleSumTreeVal)) valData, err := v.lvdb.Get([]byte(merkleSumTreeVal))
if err != nil { if err != nil {
...@@ -211,7 +217,7 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) { ...@@ -211,7 +217,7 @@ func (v *Validator) LoadMerkleSumTree(date string) (ok bool) {
bigVals[i] = big.NewInt(0).SetBytes(vals[i]) bigVals[i] = big.NewInt(0).SetBytes(vals[i])
} }
mstTree := tree.NewMerkleSumTree(datas, bigVals) mstTree := tree.NewMerkleSumTree(rawKeys, bigVals)
v.Lock() v.Lock()
v.mstTreeCache[date] = mstTree v.mstTreeCache[date] = mstTree
v.Unlock() v.Unlock()
......
package core package core
import ( import (
"errors"
"fmt" "fmt"
"math/big" "math/big"
"math/rand" "math/rand"
...@@ -61,28 +62,29 @@ func RunValidator(q *quest.Quest, cfg *conf.Config) *Validator { ...@@ -61,28 +62,29 @@ func RunValidator(q *quest.Quest, cfg *conf.Config) *Validator {
q: q, q: q,
} }
go v.UpdateContractAddressJob()
go v.UpdateGlobalWorkloadJob()
<-time.After(time.Second * 3)
if v.date != "" { if v.date != "" {
// 当前5号,假设db 1号 // 当前5号,假设db 1号
// v.date = 1, 2,需要处理的是 2,3号 // v.date = 1, 2,需要处理的是 2,3号
for v.dateToTimestamp(v.date) < v.yesterdayTimestamp()-86400 { for v.dateToTimestamp(v.date) < v.yesterdayTimestamp()-86400 {
if err = v.SyncDayJob(v.dateToTimestamp(v.date) + 86400); err != nil { if err = v.SyncDayJob(v.dateToTimestamp(v.date)+86400, true); err != nil {
return nil return nil
} }
} }
// v.date = 3, 如果当前时间超过提交昨天的时间,需要提交昨天的 // v.date = 3, 如果当前时间超过提交昨天的时间,需要提交昨天的
if v.dateToTimestamp(v.date) < v.yesterdayTimestamp() && if v.dateToTimestamp(v.date) < v.yesterdayTimestamp() &&
time.Now().After(time.Unix(v.todayTimestamp()+int64(cfg.CommitOffset), 0)) { time.Now().After(time.Unix(v.todayTimestamp()+int64(cfg.CommitOffset), 0)) {
if err = v.SyncDayJob(v.dateToTimestamp(v.date) + 86400); err != nil { if err = v.SyncDayJob(v.dateToTimestamp(v.date)+86400, true); err != nil {
return nil return nil
} }
} }
lastDay, _ = getDBLastDayRoot(diskDB)
} }
v.LoadMerkleTree(lastDay) v.LoadMerkleTree(lastDay)
v.LoadMerkleSumTree(lastDay) v.LoadMerkleSumTree(lastDay)
go v.UpdateContractAddressJob()
go v.UpdateGlobalWorkloadJob()
<-time.After(time.Second * 3)
go v.Ticker() go v.Ticker()
return v return v
} }
...@@ -101,7 +103,7 @@ func getDBLastDayRoot(diskDB *leveldb.Database) (lastDay string, stateRoot commo ...@@ -101,7 +103,7 @@ func getDBLastDayRoot(diskDB *leveldb.Database) (lastDay string, stateRoot commo
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"last day": lastDay, "last day": lastDay,
"state root": stateRoot.Hex(), "state root": stateRoot.Hex(),
}).Info("load state from diskdb") }).Info("load last day state from diskdb")
} }
return lastDay, stateRoot return lastDay, stateRoot
} }
...@@ -176,11 +178,11 @@ func (v *Validator) ProcessDayJob() { ...@@ -176,11 +178,11 @@ func (v *Validator) ProcessDayJob() {
} }
// SyncDayJob 同步任务 // SyncDayJob 同步任务
func (v *Validator) SyncDayJob(dateTimestamp int64) (err error) { func (v *Validator) SyncDayJob(dateTimestamp int64, commitToChain bool) (err error) {
dbLastDay, _ := getDBLastDayRoot(v.lvdb) dbLastDay, _ := getDBLastDayRoot(v.lvdb)
if dbLastDay == v.yesterdayString() && v.date != "" { if dbLastDay == v.yesterdayString() && v.date != "" {
log.WithField("date", v.date).Warn("already process day job") log.WithField("date", v.date).Warn("already process day job")
return return errors.New("already process day job")
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"date": v.timestampToDate(dateTimestamp), "date": v.timestampToDate(dateTimestamp),
...@@ -200,10 +202,13 @@ func (v *Validator) SyncDayJob(dateTimestamp int64) (err error) { ...@@ -200,10 +202,13 @@ func (v *Validator) SyncDayJob(dateTimestamp int64) (err error) {
log.WithError(err).Error("failed to commit merkle tree") log.WithError(err).Error("failed to commit merkle tree")
return return
} }
txHash, err := v.rpc.SubmitProofs(dateTimestamp, mstRoot, mtRoot) var txHash common.Hash
if err != nil { if commitToChain {
log.WithError(err).Error("submit proofs") txHash, err = v.rpc.SubmitProofs(dateTimestamp, mstRoot, mtRoot)
return if err != nil {
log.WithError(err).Error("submit proofs")
return
}
} }
err = v.lvdb.Put([]byte(fmt.Sprintf("txid:%s", v.date)), txHash.Bytes()) err = v.lvdb.Put([]byte(fmt.Sprintf("txid:%s", v.date)), txHash.Bytes())
if err != nil { if err != nil {
...@@ -237,7 +242,6 @@ func (v *Validator) Commit() (dayProofs map[common.Address]*validatorv1.Validate ...@@ -237,7 +242,6 @@ func (v *Validator) Commit() (dayProofs map[common.Address]*validatorv1.Validate
for miner, proof := range proof { for miner, proof := range proof {
balance := big.NewInt(0).Mul(balancePerWorkload, big.NewInt(int64(proof.Workload))) balance := big.NewInt(0).Mul(balancePerWorkload, big.NewInt(int64(proof.Workload)))
proof.Balance = balance.String() proof.Balance = balance.String()
log.Warnln("889888", miner.Hex())
err := v.SealProof(miner, proof) err := v.SealProof(miner, proof)
if err != nil { if err != nil {
log.WithError(err).Error("failed to seal proof") log.WithError(err).Error("failed to seal proof")
......
...@@ -28,6 +28,13 @@ func NewMerkleSumTree(datas [][]byte, vals []*big.Int) *MerkleSumTree { ...@@ -28,6 +28,13 @@ func NewMerkleSumTree(datas [][]byte, vals []*big.Int) *MerkleSumTree {
var nodes []MerkleSumNode var nodes []MerkleSumNode
var levels [][]MerkleSumNode var levels [][]MerkleSumNode
if len(datas) == 1 {
return &MerkleSumTree{
Levels: levels,
RootNode: buildToNode(datas[0], vals[0]),
}
}
if len(datas)%2 != 0 { if len(datas)%2 != 0 {
datas = append(datas, []byte{}) datas = append(datas, []byte{})
vals = append(vals, big.NewInt(0)) vals = append(vals, big.NewInt(0))
......
...@@ -5,32 +5,25 @@ import ( ...@@ -5,32 +5,25 @@ import (
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
) )
func TestMst(t *testing.T) { func TestMst(t *testing.T) {
datas := [][]byte{} datas := [][]byte{}
vals := []*big.Int{} vals := []*big.Int{}
for i := 1; i <= 7; i++ { for i := 1; i <= 1; i++ {
datas = append(datas, crypto.Keccak256(big.NewInt(int64(i)).Bytes())) datas = append(datas, common.HexToAddress("0x84A3175be614F5886f99Da506dF08682DF530739").Bytes())
vals = append(vals, big.NewInt(int64(i))) vals = append(vals, big.NewInt(int64(3)))
} }
tree := NewMerkleSumTree(datas, vals) tree := NewMerkleSumTree(datas, vals)
t.Log(tree.GetRoot()) t.Log(tree.GetRoot())
t.Log(tree.RootNode.Value.Hash, tree.RootNode.Value.BigValue) t.Log(tree.RootNode.Value.Hash, tree.RootNode.Value.BigValue)
t.Log("---") t.Log("---")
for i := len(tree.Levels) - 1; i >= 0; i-- {
t.Log(tree.Levels[i])
}
t.Log("---")
subNode := tree.FindMerkleSumNode(common.HexToHash("0x8a28d124351a95140ed4fad3468a9833523faf12e07c24eac13e1c1d468adb95"))
layers := MerkleSumTreeTraversal(subNode, 3) resp := MerkleSumTreeTraversal(tree.RootNode, 3)
for i := range resp {
for _, layer := range layers { t.Log(resp[i])
t.Log(layer)
} }
t.Log("---")
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment