Commit a257556c authored by 贾浩@五瓣科技's avatar 贾浩@五瓣科技

update

parent e94c6651
package api
import (
"bytes"
"context"
"errors"
"math/big"
"net"
"witness/core"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
witnessv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/witness/v1"
log "github.com/sirupsen/logrus"
......@@ -26,74 +21,6 @@ func (s *Server) WitnessStatus(ctx context.Context, req *witnessv1.WitnessStatus
}
func (s *Server) PushProof(ctx context.Context, req *witnessv1.PushProofRequest) (resp *witnessv1.PushProofResponse, err error) {
resp = &witnessv1.PushProofResponse{}
proofs := req.GetProofs()
if len(proofs) == 0 {
return nil, errors.New("no proofs provided")
}
miner := common.HexToAddress(req.MinerAddress)
reward := common.HexToAddress(req.RewardAddress)
validatedProofs := make([]*witnessv1.ValidatedProof, 0, len(proofs))
log.WithFields(log.Fields{"miner": miner, "reward": reward, "proofs": len(proofs)}).Debug("push proof")
for _, proof := range proofs {
if proof.Workload == 0 {
continue
}
hashPayload := append(append([]byte(proof.TaskId), proof.ReqHash...), proof.RespHash...)
containerPubKey, err := crypto.SigToPub(crypto.Keccak256(hashPayload), proof.ContainerSignature)
if err != nil {
log.WithError(err).Error("failed to verify container signature")
return nil, err
}
containerSigner := crypto.PubkeyToAddress(*containerPubKey)
verified := s.w.IsContainerAddress(containerSigner)
if !verified {
log.WithFields(log.Fields{"signer": containerSigner}).Error("invalid container signature")
return nil, errors.New("invalid container signature")
}
minerHashPayload := append(append([]byte(proof.TaskId), proof.ReqHash...), proof.RespHash...)
minerPubKey, err := crypto.SigToPub(crypto.Keccak256(minerHashPayload), proof.MinerSignature)
if err != nil {
log.WithError(err).Error("failed to verify miner signature")
return nil, err
}
minerAddress := crypto.PubkeyToAddress(*minerPubKey)
if minerAddress.Hex() != miner.Hex() {
log.WithFields(log.Fields{"miner": miner, "signer": minerAddress}).Error("invalid miner address")
return nil, errors.New("invalid miner address")
}
managerHashPayload := bytes.NewBuffer(hashPayload)
managerHashPayload.Write(reward.Bytes())
managerHashPayload.Write(miner.Bytes())
managerHashPayload.Write(proof.ContainerSignature)
managerHashPayload.Write(proof.MinerSignature)
managerHashPayload.Write(big.NewInt(int64(proof.Workload)).Bytes())
managerHashPayload.Write(big.NewInt(int64(proof.Timestamp)).Bytes())
managerPubKey, err := crypto.SigToPub(crypto.Keccak256(managerHashPayload.Bytes()), proof.ManagerSignature)
if err != nil {
log.WithError(err).Error("failed to verify manager signature")
return nil, err
}
managerSigner := crypto.PubkeyToAddress(*managerPubKey)
verified = s.w.IsNMAddress(managerSigner)
if !verified {
log.WithFields(log.Fields{"signer": managerSigner}).Error("invalid manager signature")
return nil, errors.New("invalid manager signature")
}
resp.Workload += proof.Workload
validatedProofs = append(validatedProofs, &witnessv1.ValidatedProof{
Workload: proof.Workload,
})
}
if len(validatedProofs) > 0 {
s.w.AddPendingProof(reward, validatedProofs)
}
return resp, nil
}
......
......@@ -150,6 +150,10 @@ func getPendingWorkload(params []byte, resp *jsonrpcMessage) {
return
}
func findNode(params []byte, resp *jsonrpcMessage) {
}
func StartJSONRPC(listenAddress string, w *core.Witness) {
witness = w
http.HandleFunc("/", rpcHandle)
......
......@@ -71,83 +71,32 @@ var (
Name: "commit-time",
Usage: "The time to commit the proof",
}
commitExpireFlag = &cli.IntFlag{
Name: "commit-expire",
Usage: "The time to expire the proof",
}
)
// p2p flags
var (
p2pUseDiscoveryFlag = &cli.BoolFlag{
Name: "p2p-use-discovery",
Usage: "Whether to use discovery",
Value: true,
}
p2pStaticPeersFlag = &cli.StringSliceFlag{
Name: "p2p-static-peers",
Usage: "Static peers",
}
p2pBootstrapNodeAddrFlag = &cli.StringSliceFlag{
Name: "p2p-bootstrap-node-addr",
Usage: "Bootstrap node address",
}
p2pLocalIPFlag = &cli.StringFlag{
Name: "p2p-local-ip",
Usage: "The local ip address to listen for incoming data.",
Value: "",
}
p2pHostFlag = &cli.StringFlag{
Name: "p2p-host-ip",
Usage: "The IP address advertised by libp2p. This may be used to advertise an external IP.",
Value: "",
}
p2pPrivKeyFlag = &cli.StringFlag{
Name: "p2p-private-key",
Usage: "The file path of the private key to use in communications with other peers.",
EnvVars: []string{"P2P_PRIVATE_KEY"},
}
p2pTCPPortFlag = &cli.IntFlag{
Name: "p2p-tcp-port",
Usage: "The port used by libp2p.",
Value: 30333,
}
p2pUDPPortFlag = &cli.IntFlag{
Name: "p2p-udp-port",
Usage: "The port used by discv5.",
Value: 30334,
questHostFlag = &cli.StringFlag{
Name: "quest-host",
Usage: "The host of the quest server",
}
p2pMaxPeersFlag = &cli.IntFlag{
Name: "p2p-max-peers",
Usage: "The max number of p2p peers to maintain.",
Value: 16,
questPortFlag = &cli.IntFlag{
Name: "quest-port",
Usage: "The port of the quest server",
}
p2pMaxInboundPeersFlag = &cli.IntFlag{
Name: "p2p-max-inbound-peers",
Usage: "The max number of inbound p2p peers to maintain.",
Value: 10,
questUserFlag = &cli.StringFlag{
Name: "quest-user",
Usage: "The user of the quest server",
}
p2pMaxOutboundPeersFlag = &cli.IntFlag{
Name: "p2p-max-outbound-peers",
Usage: "The max number of outbound p2p peers to maintain.",
Value: 10,
questPassFlag = &cli.StringFlag{
Name: "quest-pass",
Usage: "The password of the quest server",
}
ignoreLocalIPFlag = &cli.BoolFlag{
Name: "ignore-local-ip",
Usage: "The node is not connected to the local IP address",
Value: false,
questDBFlag = &cli.StringFlag{
Name: "quest-db",
Usage: "The database of the quest server",
}
)
......
package main
import (
"context"
"net/http"
"os"
"witness/api"
"witness/conf"
"witness/core"
"witness/p2p"
"witness/quest"
"witness/version"
"github.com/prometheus/client_golang/prometheus/promhttp"
......@@ -28,28 +27,17 @@ var (
storeContractFlag,
witnessContractFlag,
commitTimeFlag,
commitExpireFlag,
}
p2pFlags = []cli.Flag{
p2pUseDiscoveryFlag,
p2pStaticPeersFlag,
p2pBootstrapNodeAddrFlag,
p2pLocalIPFlag,
p2pPrivKeyFlag,
p2pHostFlag,
p2pTCPPortFlag,
p2pUDPPortFlag,
p2pMaxPeersFlag,
p2pMaxInboundPeersFlag,
p2pMaxOutboundPeersFlag,
ignoreLocalIPFlag,
questHostFlag,
questPortFlag,
questUserFlag,
questPassFlag,
questDBFlag,
}
)
func main() {
app := cli.App{}
app.Flags = wrapFlags(append(appFlags, p2pFlags...))
app.Flags = wrapFlags(append(appFlags, []cli.Flag{}...))
app.Name = "witness"
app.Usage = "this is witness"
app.Version = version.Version
......@@ -77,13 +65,22 @@ func run(ctx *cli.Context) {
WitnessContract: ctx.String(witnessContractFlag.Name),
DataDir: ctx.String(dataDirFlag.Name),
CommitTime: ctx.Int(commitTimeFlag.Name),
CommitExpire: ctx.Int(commitExpireFlag.Name),
}
p2pSrv := runP2PServer(ctx)
setLogLevel(cfg.LogLevel)
runMetrics(cfg.MetricsListenAddr)
w := core.RunWitness(p2pSrv, cfg)
runGrpcServer(cfg.GRPCListenAddr, w)
qCfg := &conf.QuestConfig{
Host: ctx.String(questHostFlag.Name),
Port: ctx.Int(questPortFlag.Name),
User: ctx.String(questUserFlag.Name),
Password: ctx.String(questPassFlag.Name),
Database: ctx.String(questDBFlag.Name),
}
q := quest.NewQuest(qCfg)
w := core.RunWitness(q, cfg)
// runGrpcServer(cfg.GRPCListenAddr, w)
runJSONRPCServer(cfg.RPCListenAddr, w)
select {}
}
......@@ -115,27 +112,3 @@ func runGrpcServer(listen string, w *core.Witness) {
func runJSONRPCServer(listen string, w *core.Witness) {
go api.StartJSONRPC(listen, w)
}
func runP2PServer(ctx *cli.Context) *p2p.Service {
p2pCfg := &p2p.Config{
UseDiscovery: ctx.Bool(p2pUseDiscoveryFlag.Name),
StaticPeers: ctx.StringSlice(p2pStaticPeersFlag.Name),
BootstrapNodeAddr: ctx.StringSlice(p2pBootstrapNodeAddrFlag.Name),
LocalIP: ctx.String(p2pLocalIPFlag.Name),
HostAddress: ctx.String(p2pHostFlag.Name),
PrivateKeyPath: ctx.String(p2pPrivKeyFlag.Name),
TCPPort: ctx.Uint(p2pTCPPortFlag.Name),
UDPPort: ctx.Uint(p2pUDPPortFlag.Name),
MaxPeers: ctx.Uint(p2pMaxPeersFlag.Name),
MaxInboundPeers: ctx.Uint(p2pMaxInboundPeersFlag.Name),
MaxOutboundPeers: ctx.Uint(p2pMaxOutboundPeersFlag.Name),
IgnoreLocalIP: ctx.Bool(ignoreLocalIPFlag.Name),
}
svs, err := p2p.NewService(context.Background(), p2pCfg)
if err != nil {
log.WithError(err).Fatal("failed to create p2p service")
}
svs.Start()
return svs
}
......@@ -12,5 +12,12 @@ type Config struct {
RewardContract string
DataDir string
CommitTime int
CommitExpire int
}
type QuestConfig struct {
Host string
Port int
User string
Password string
Database string
}
......@@ -16,4 +16,12 @@ witness-contract = "0xf49133dD7B7ed75fA0f877413D293c05Bff0D8F0"
commit-time = 3600 # utc + n seconds
commit-expire = 3600
\ No newline at end of file
quest-host = "43.198.252.255"
quest-port = 8812
quest-user = "admin"
quest-pass = "quest"
quest-db = "qdb"
\ No newline at end of file
package core
import (
"witness/quest"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
witnessv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/witness/v1"
log "github.com/sirupsen/logrus"
)
func (w *Witness) LoadPendingProofs(startTimestamp, endTimestamp int64) {
defaultLimit := 100
lastTaskID := ""
var dbProofs []*quest.ProofModel
var err error
for {
dbProofs, err = w.q.GetProofs(startTimestamp, endTimestamp, lastTaskID, defaultLimit)
if err != nil {
log.WithError(err).Error("failed to get proofs")
return
}
log.WithField("proof count", len(dbProofs)).Debug("load pending proofs")
if len(dbProofs) == 0 {
return
}
for _, dbProof := range dbProofs {
miner, proof := w.VerifyProof(dbProof)
if proof != nil {
// w.AddPendingProof(miner, proof)
}
_ = miner
lastTaskID = dbProof.TaskId
}
}
}
func (w *Witness) VerifyProof(dbProof *quest.ProofModel) (miner common.Address, proof *witnessv1.ValidatedProof) {
if dbProof.TaskWorkload == 0 {
return
}
containerPubKey, err := crypto.SigToPub(dbProof.GenerateHashPayload(), common.Hex2Bytes(dbProof.TaskContainerSignature))
if err != nil {
log.WithFields(log.Fields{"taskid": dbProof.TaskId}).WithError(err).Error("failed to verify container signature")
return
}
containerSigner := crypto.PubkeyToAddress(*containerPubKey)
verified := w.IsContainerAddress(containerSigner)
if !verified {
log.WithFields(log.Fields{"taskid": dbProof.TaskId, "signer": containerSigner.Hex()}).Error("invalid container signature")
return
}
minerPubKey, err := crypto.SigToPub(dbProof.GenerateHashPayload(), common.Hex2Bytes(dbProof.TaskMinerSignature))
if err != nil {
log.WithFields(log.Fields{"taskid": dbProof.TaskId}).WithError(err).Error("failed to verify miner signature")
return
}
workerAddress := crypto.PubkeyToAddress(*minerPubKey)
if workerAddress.Hex() != common.HexToAddress(dbProof.TaskWorkerAccount).Hex() {
log.WithFields(log.Fields{"taskid": dbProof.TaskId, "miner": dbProof.TaskWorkerAccount, "signer": workerAddress.Hex()}).Error("invalid miner address")
return
}
nmPubKey, err := crypto.SigToPub(dbProof.GenerateNMHashPayload(), common.Hex2Bytes(dbProof.TaskManagerSignature))
if err != nil {
log.WithFields(log.Fields{"taskid": dbProof.TaskId}).WithError(err).Error("failed to verify manager signature")
return
}
nmAddress := crypto.PubkeyToAddress(*nmPubKey)
verified = w.IsNMAddress(nmAddress)
if !verified {
log.WithFields(log.Fields{"taskid": dbProof.TaskId, "signer": nmAddress.Hex()}).Error("invalid manager signature")
return
}
proof = &witnessv1.ValidatedProof{
Workload: dbProof.TaskWorkload,
Timestamp: uint64(dbProof.TaskFinishTimestamp),
}
return common.HexToAddress(dbProof.TaskProfitAccount), proof
}
package core
import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
var (
msgTopic *pubsub.Topic
)
func (w *Witness) JoinTopic(topic string) (err error) {
msgTopic, err = w.p2pSrv.JoinTopic("topic")
return err
}
......@@ -10,7 +10,7 @@ import (
"sync"
"time"
"witness/conf"
"witness/p2p"
"witness/quest"
"witness/tree"
"witness/util"
......@@ -34,15 +34,14 @@ type Witness struct {
rpc *ChainRPC
containerAddresses []common.Address
nmAddresses []common.Address
p2pSrv *p2p.Service
q *quest.Quest
cfg *conf.Config
sync.Mutex
}
func RunWitness(_p2pSrv *p2p.Service, cfg *conf.Config) *Witness {
func RunWitness(q *quest.Quest, cfg *conf.Config) *Witness {
_rpc := newChain(cfg.ChainRPC, cfg.PrivateKey, cfg.StoreContract, cfg.WitnessContract)
diskDB, err := leveldb.New(fmt.Sprintf("%s/db", cfg.DataDir), 128, 1024, "", false)
if err != nil {
......@@ -76,8 +75,8 @@ func RunWitness(_p2pSrv *p2p.Service, cfg *conf.Config) *Witness {
mtTreeCache: make(map[string]*tree.MerkleTree),
rpc: _rpc,
date: lastDay,
p2pSrv: _p2pSrv,
cfg: cfg,
q: q,
}
w.LoadMerkleProof(lastDay)
w.LoadPendingProof()
......@@ -85,72 +84,39 @@ func RunWitness(_p2pSrv *p2p.Service, cfg *conf.Config) *Witness {
go w.UpdateAddress()
<-time.After(time.Second * 3)
go w.Ticker()
go w.Mock()
// go w.Mock()
// go w.ProcessDay()
return w
}
func (w *Witness) AddPendingProof(miner common.Address, proofs []*witnessv1.ValidatedProof) {
func (w *Witness) AddPendingProof(miner common.Address, proof *witnessv1.ValidatedProof) {
w.Lock()
defer w.Unlock()
yesterdayProofs := make([]*witnessv1.ValidatedProof, 0)
todayProofs := make([]*witnessv1.ValidatedProof, 0)
yesterdayWorkload := uint64(0)
yesterdayPendingWorkload := uint64(0)
todayWorkload := uint64(0)
todayPendingWorkload := uint64(0)
for _, proof := range proofs {
if int64(proof.Timestamp) < time.Now().Unix()-int64(w.cfg.CommitExpire) {
continue
}
if int64(proof.Timestamp) < w.todayTimestamp() {
yesterdayProofs = append(yesterdayProofs, proof)
} else {
todayProofs = append(todayProofs, proof)
}
}
if len(yesterdayProofs) > 0 {
// 保存到昨天的里面
yesterdayProof, ok := w.pendingProof[w.yesterdayTimestamp()][miner]
if ok {
yesterdayWorkload = yesterdayProof.Workload
yesterdayPendingWorkload = yesterdayProof.Workload
}
for _, proof := range yesterdayProofs {
yesterdayWorkload += proof.Workload
w.pendingWorkload += proof.Workload // todo change
}
w.pendingProof[w.yesterdayTimestamp()][miner] = &witnessv1.ValidatedProof{Workload: yesterdayWorkload}
var userTodayWorkload uint64
currentPendingProof, ok := w.pendingProof[w.todayTimestamp()][miner]
if ok {
userTodayWorkload = currentPendingProof.Workload
}
_ = yesterdayPendingWorkload
if len(todayProofs) > 0 {
currentPendingProof, ok := w.pendingProof[w.todayTimestamp()][miner]
if ok {
todayWorkload = currentPendingProof.Workload
todayPendingWorkload = currentPendingProof.Workload
}
for _, proof := range todayProofs {
todayWorkload += proof.Workload
w.pendingWorkload += proof.Workload
}
if w.pendingProof[w.todayTimestamp()] == nil {
w.pendingProof[w.todayTimestamp()] = make(map[common.Address]*witnessv1.ValidatedProof)
}
w.pendingProof[w.todayTimestamp()][miner] = &witnessv1.ValidatedProof{Workload: todayWorkload}
w.pendingWorkload += proof.Workload
if w.pendingProof[w.todayTimestamp()] == nil {
w.pendingProof[w.todayTimestamp()] = make(map[common.Address]*witnessv1.ValidatedProof)
}
w.pendingProof[w.todayTimestamp()][miner] = &witnessv1.ValidatedProof{Workload: userTodayWorkload + proof.Workload}
log.WithFields(log.Fields{
"miner": miner.Hex(),
"proof_count": len(proofs),
"current_workload": todayWorkload,
"pending_workload": todayPendingWorkload,
"workload_diff": todayWorkload - todayPendingWorkload,
"current_workload": userTodayWorkload + proof.Workload,
"workload": proof.Workload,
"global_workload": w.pendingWorkload,
}).Debug("add validated proof")
}
func (w *Witness) ProcessDay() {
log.Debugln("process day", w.yesterdayTimestamp(), w.todayTimestamp())
w.LoadPendingProofs(w.yesterdayTimestamp(), w.todayTimestamp())
w.date = w.yesterdayString()
dayProof := w.Commit()
mstRoot, _, err := w.CommitMST(dayProof)
......@@ -158,21 +124,17 @@ func (w *Witness) ProcessDay() {
log.WithError(err).Error("failed to commit merkle sum tree")
return
}
log.Warnln("a2")
objects := w.state.IterAllObject()
log.Warnln("a3")
mtRoot, err := w.CommitMT(objects)
if err != nil {
log.WithError(err).Error("failed to commit merkle tree")
return
}
log.Warnln("a4")
err = w.lvdb.Put([]byte("lastday"), []byte(w.date))
if err != nil {
log.WithError(err).Error("db failed to update last day")
return
}
log.Warnln("a5")
txHash, err := w.rpc.SubmitProofs(w.date, mstRoot, mtRoot)
if err != nil {
log.WithError(err).Error("submit proofs")
......@@ -354,8 +316,8 @@ func (w *Witness) CommitMT(objects []*witnessv1.MinerObject) (root common.Hash,
}
func (w *Witness) Ticker() {
// executionTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, w.cfg.CommitTime, 0, time.UTC).Add(w.duration())
executionTime := time.Now().Add(w.duration())
executionTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, w.cfg.CommitTime, 0, time.UTC).Add(w.duration())
// executionTime := time.Now().Add(w.duration())
waitTime := executionTime.Sub(time.Now())
timer := time.NewTimer(waitTime)
log.WithField("wait_time", waitTime.String()).Info("prepare commit task")
......@@ -380,11 +342,9 @@ func (w *Witness) Mock() {
for {
time.Sleep(time.Second)
w.AddPendingProof(addresses[rand.Intn(10)], []*witnessv1.ValidatedProof{
{
Workload: uint64((rand.Int63n(9) + 1) * 100),
Timestamp: uint64(time.Now().Unix() - 1),
},
w.AddPendingProof(addresses[rand.Intn(10)], &witnessv1.ValidatedProof{
Workload: uint64((rand.Int63n(9) + 1) * 100),
Timestamp: uint64(time.Now().Unix() - 1),
})
}
}
......@@ -447,6 +407,7 @@ func (w *Witness) GetMerkleProof(address common.Address, date string) (balance s
return "0", nil
}
}
cacheTree = w.mtTreeCache[date]
dateStateRootKey := fmt.Sprintf("sroot:%s", date)
dateStateRoot, err := w.lvdb.Get([]byte(dateStateRootKey))
if err != nil {
......@@ -481,6 +442,30 @@ func (w *Witness) GetMerkleProof(address common.Address, date string) (balance s
return object.Balance, proofs
}
func (w *Witness) GetDailyMerkleProofs(date string, depth int, rootHash common.Hash) (proofs [][]common.Hash) {
if date == "" {
date = w.date
}
w.Lock()
cacheTree, ok := w.mtTreeCache[date]
w.Unlock()
if !ok {
if ok = w.LoadMerkleProof(date); !ok {
log.WithFields(log.Fields{
"date": date,
}).Error("load merkle proof empty")
return nil
}
}
cacheTree = w.mtTreeCache[date]
rootNode := cacheTree.GetRootNode()
if rootHash.Hex() != rootNode.Hash.Hex() {
rootNode = cacheTree.FindNode(rootHash)
}
return tree.Traversal(rootNode, depth)
}
func (w *Witness) GetPendingWorkload(address common.Address) (workload, globalWorkload uint64) {
w.Lock()
proof, ok := w.pendingProof[w.todayTimestamp()][address]
......@@ -492,6 +477,9 @@ func (w *Witness) GetPendingWorkload(address common.Address) (workload, globalWo
}
func (w *Witness) LoadMerkleProof(date string) (ok bool) {
if date == "" {
return false
}
merkleTreeKey := fmt.Sprintf("mtk:%s", date)
log.Warnln("load merkle proof", merkleTreeKey)
data, err := w.lvdb.Get([]byte(merkleTreeKey))
......@@ -571,7 +559,7 @@ func (w *Witness) LoadPendingProof() {
_, err := os.Stat(filename)
if err != nil {
if os.IsNotExist(err) {
log.WithError(err).Debug("failed to load pending proof")
log.WithError(err).Debug("no pending proof")
return
}
log.WithError(err).Error("failed to load pending proof, file status")
......@@ -601,16 +589,17 @@ func (w *Witness) LoadPendingProof() {
}
func (w *Witness) todayString() string {
return time.Now().UTC().Format("2006-01-02")
return time.Unix(w.todayTimestamp(), 0).Format("2006-01-02")
}
func (w *Witness) yesterdayString() string {
return time.Now().UTC().AddDate(0, 0, -1).Format("2006-01-02")
return time.Unix(w.yesterdayTimestamp(), 0).Format("2006-01-02")
}
func (w *Witness) todayTimestamp() int64 {
now := time.Now().UTC()
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
// today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
today := time.Date(now.Year(), now.Month(), 6, 0, 0, 0, 0, time.UTC)
return today.Unix()
}
......@@ -624,5 +613,5 @@ func (w *Witness) dateToTimestamp(date string) int64 {
}
func (w *Witness) duration() time.Duration {
return 10 * time.Second
return 3600 * 24 * time.Second
}
package main
import (
"fmt"
"testing"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func TestPG(t *testing.T) {
host := "43.198.252.255"
port := 8812
user := "admin"
password := "quest"
database := "qdb"
db, err := gorm.Open(postgres.Open(fmt.Sprintf("host=%s port=%d user=%s dbname=%s password=%s sslmode=disable", host, port, user, database, password)), &gorm.Config{})
if err != nil {
panic("failed to connect database")
}
// show tables
querySQL := "SELECT " +
"`TaskType`, `TaskId`, `TaskFinishTimestamp`, `TaskWorkload`, `TaskReqHash`, `TaskRespHash`, `TaskManagerSignature`, `TaskContainerSignature`, `TaskMinerSignature`, `TaskProfitAccount`, `TaskWorkerAccount` " +
"FROM `proof` " +
"WHERE `TaskFinishTimestamp` > ? AND `TaskFinishTimestamp` < ? " +
"AND `TaskId` > ? " +
"ORDER BY `TaskFinishTimestamp` DESC, `TaskId` DESC " +
"LIMIT ?;"
type Temp struct {
TaskType int
TaskId string
TaskFinishTimestamp int
TaskWorkload int
TaskReqHash string
TaskRespHash string
TaskManagerSignature string
TaskContainerSignature string
TaskMinerSignature string
TaskProfitAccount string
TaskWorkerAccount string
}
temp := &Temp{}
err = db.Raw(querySQL, 1709610200, 1709610300, "", 5).Scan(&temp).Error
// err = db.Debug().Raw(querySQL, 1709610200, 1709610300, 5).Scan(&temp).Error
// err = db.Raw(querySQL, 5).Scan(&temp).Error
t.Log(err)
t.Log(temp)
}
......@@ -69,8 +69,14 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/pgx/v5 v5.5.3 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
......@@ -148,6 +154,8 @@ require (
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/postgres v1.5.6 // indirect
gorm.io/gorm v1.25.7 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
......
......@@ -90,6 +90,7 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU=
github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
......@@ -112,6 +113,7 @@ github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 h1:aPEJyR4rPBvD
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593/go.mod h1:6hk1eMY/u5t+Cf18q5lFMUA1Rc+Sm5I6Ra1QuPyxXCo=
github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ=
github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
......@@ -211,6 +213,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
......@@ -367,18 +370,33 @@ github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/
github.com/iris-contrib/jade v1.1.3/go.mod h1:H/geBymxJhShH5kecoiOCSssPX7QWYH7UaeZTSWddIk=
github.com/iris-contrib/pongo2 v0.0.1/go.mod h1:Ssh+00+3GAZqSQb30AvBRNxBx7rf0GqwkjqxNd0u65g=
github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.3 h1:Ces6/M3wbDXYpM8JyyPD57ivTtJACFZJd885pdIaV2s=
github.com/jackc/pgx/v5 v5.5.3/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk=
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/kataras/golog v0.0.10/go.mod h1:yJ8YKCmyL+nWjERB90Qwn+bdyBZsaQwU3bTVFgkFIp8=
github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYbq3UhfoFmE=
......@@ -495,6 +513,7 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
......@@ -526,6 +545,7 @@ github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
......@@ -1184,6 +1204,10 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.6 h1:ydr9xEd5YAM0vxVDY0X139dyzNz10spDiDlC7+ibLeU=
gorm.io/driver/postgres v1.5.6/go.mod h1:3e019WlBaYI5o5LIdNV+LyxCMNtLOQETBXL2h4chKpA=
gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
......
package p2p
import (
"encoding/json"
"io"
"os"
)
const bootnodefile = ".bootnodes.json"
var bootnodes []string
func loadBootnodesFile() {
file, err := os.Open(bootnodefile)
if err != nil {
return
}
defer file.Close()
b, err := io.ReadAll(file)
if err != nil {
return
}
_ = json.Unmarshal(b, &bootnodes)
return
}
func attachBootnodeToFile(bootnode string) error {
for _, node := range bootnodes {
if node == bootnode {
return nil
}
}
bootnodes = append(bootnodes, bootnode)
b, err := json.Marshal(bootnodes)
if err != nil {
return err
}
file, err := os.OpenFile(bootnodefile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(b)
return err
}
package p2p
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
// Broadcast a message to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message, topic string) error {
msgBytes, err := proto.Marshal(msg)
if err != nil {
return err
}
return s.broadcastObject(ctx, msgBytes, topic)
}
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj []byte, topic string) error {
if err := s.PublishToTopic(ctx, topic, obj); err != nil {
err := errors.Wrap(err, "could not publish message")
return err
}
return nil
}
package p2p
// Config for the p2p service. These parameters are set from application level flags
// to initialize the p2p service.
type Config struct {
UseDiscovery bool
StaticPeers []string
BootstrapNodeAddr []string
Discv5BootStrapAddr []string
LocalIP string
HostAddress string
PrivateKeyPath string
TCPPort uint
UDPPort uint
MaxPeers uint
MaxInboundPeers uint
MaxOutboundPeers uint
IgnoreLocalIP bool
SingleNode bool
}
package p2p
import (
"context"
"runtime"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/sirupsen/logrus"
)
// InterceptPeerDial tests whether we're permitted to Dial the specified peer.
func (_ *Service) InterceptPeerDial(_ peer.ID) (allow bool) {
return true
}
// InterceptAddrDial tests whether we're permitted to dial the specified
// multiaddr for the given peer.
func (s *Service) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool) {
// Disallow bad peers from dialing in.
return !s.peers.IsBad(pid)
}
// InterceptAccept checks whether the incidental inbound connection is allowed.
func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {
// Deny all incoming connections before we are ready
if !s.started {
return false
}
if !s.validateDial(n.RemoteMultiaddr()) {
// Allow other go-routines to run in the event
// we receive a large amount of junk connections.
runtime.Gosched()
log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(),
"reason": "exceeded dial limit"}).Trace("Not accepting inbound dial from ip address")
return false
}
if s.isPeerAtLimit(true /* inbound */) {
log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(),
"reason": "at peer limit"}).Trace("Not accepting inbound dial")
return false
}
return true
}
// InterceptSecured tests whether a given connection, now authenticated,
// is allowed.
func (_ *Service) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool) {
return true
}
// InterceptUpgraded tests whether a fully capable connection is allowed.
func (_ *Service) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}
func (s *Service) validateDial(addr multiaddr.Multiaddr) bool {
_, err := manet.ToIP(addr)
return err == nil
}
func onConnectSuccess(ctx context.Context, peerID peer.ID) error {
log.WithField("peerID", peerID).Info("on connect success")
return nil
}
func onConnectFail(ctx context.Context, peerID peer.ID) error {
log.WithField("peerID", peerID).Info("on connect fail")
return nil
}
func onDisconnect(ctx context.Context, peerID peer.ID) error {
log.WithField("peerID", peerID).Info("on disconnect")
return nil
}
This diff is collapsed.
package p2p
import (
"testing"
"github.com/ethereum/go-ethereum/p2p/enode"
)
func TestDecodeV5(t *testing.T) {
nodes := []string{
"enr:-JG4QL9QdmQjI-AOv-zFL1oaUAcWP6dPXBBOUAEXE_z4KZNfNTYDfmtAvxq-E0Xl1cqWGO_WmPldHoZIaVKzA0moWQ-GAYupV_MKgmlkgnY0gmlwhMCoAdOJc2VjcDI1NmsxoQLvx1jTRx75jpj8jD3KJ3FCZ_6UiICRIQJ_1MWmVx0IbYN0Y3CCdn6DdWRwgnZ-",
"enr:-JG4QPGTzcn6tp8UqoSmEBBj5f0_42wonWmoedFgvncdZL3Qf0DCEQVzDSHGi9jM35qdD76rvpWFuGpQ_rmoX14ZnACGAYupV_plgmlkgnY0gmlwhMCoAdOJc2VjcDI1NmsxoQLvx1jTRx75jpj8jD3KJ3FCZ_6UiICRIQJ_1MWmVx0IbYN0Y3CCdn6DdWRwgnZ-",
"enr:-JG4QP1aYRCaCknvIa3hSqE1zAUrm203DNFcvS8E1lQkqHq-PR0sQhIkzK7L4ZLLLPOijD9BTeCLWi8-0LVORt63dimGAYutv-WkgmlkgnY0gmlwhMCoAdyJc2VjcDI1NmsxoQJ81pyuyqa2QoPztOPsCqpkcW1gJwUxOcSJuWr9VUNavIN0Y3CCdn2DdWRwgnZ-",
}
r1, r2, r3 := parseGenericAddrs(nodes)
t.Log(r1)
t.Log(r2)
t.Log(r3)
for _, addr := range r1 {
bootnode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
t.Fatal(err)
}
t.Log(bootnode.String())
maddr, _ := convertToSingleMultiAddr(bootnode)
t.Log(maddr.String())
}
}
package p2p
import (
"context"
"fmt"
"io"
"sync"
"time"
"witness/p2p/peers"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
const (
// The time to wait for a status request.
timeForStatus = 10 * time.Second
)
func peerMultiaddrString(conn network.Conn) string {
return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String())
}
// AddConnectionHandler adds a callback function which handles the connection with a
// newly added peer. It performs a handshake with that peer by sending a hello request
// and validating the response from the peer.
func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Context, id peer.ID) error) {
// Peer map and lock to keep track of current connection attempts.
peerMap := make(map[peer.ID]bool)
peerLock := new(sync.Mutex)
// This is run at the start of each connection attempt, to ensure
// that there aren't multiple inflight connection requests for the
// same peer at once.
peerHandshaking := func(id peer.ID) bool {
peerLock.Lock()
defer peerLock.Unlock()
if peerMap[id] {
return true
}
peerMap[id] = true
return false
}
peerFinished := func(id peer.ID) {
peerLock.Lock()
defer peerLock.Unlock()
delete(peerMap, id)
}
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
remotePeer := conn.RemotePeer()
disconnectFromPeer := func() {
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnecting)
// Only attempt a goodbye if we are still connected to the peer.
if s.host.Network().Connectedness(remotePeer) == network.Connected {
if err := goodByeFunc(context.TODO(), remotePeer); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnected)
}
// Connection handler must be non-blocking as part of libp2p design.
go func() {
if peerHandshaking(remotePeer) {
// Exit this if there is already another connection
// request in flight.
return
}
defer peerFinished(remotePeer)
// Handle the various pre-existing conditions that will result in us not handshaking.
peerConnectionState, err := s.peers.ConnectionState(remotePeer)
if err == nil && (peerConnectionState == peers.PeerConnected || peerConnectionState == peers.PeerConnecting) {
log.WithField("currentState", peerConnectionState).WithField("reason", "already active").Trace("Ignoring connection request")
return
}
s.peers.Add(nil /* ENR */, remotePeer, conn.RemoteMultiaddr(), conn.Stat().Direction)
// Defensive check in the event we still get a bad peer.
if s.peers.IsBad(remotePeer) {
log.WithField("reason", "bad peer").Trace("Ignoring connection request")
disconnectFromPeer()
return
}
validPeerConnection := func() {
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction,
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Info("Peer connected")
}
// Do not perform handshake on inbound dials.
if conn.Stat().Direction == network.DirInbound {
// Wait for peer to initiate handshake
time.Sleep(timeForStatus)
// Exit if we are disconnected with the peer.
if s.host.Network().Connectedness(remotePeer) != network.Connected {
return
}
validPeerConnection()
return
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && err != io.EOF {
log.WithError(err).Trace("Handshake failed")
disconnectFromPeer()
return
}
validPeerConnection()
}()
},
})
}
// AddDisconnectionHandler disconnects from peers. It handles updating the peer status.
// This also calls the handler responsible for maintaining other parts of the sync or p2p system.
func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
log := log.WithField("multiAddr", peerMultiaddrString(conn))
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
// Exit early if we are still connected to the peer.
if net.Connectedness(conn.RemotePeer()) == network.Connected {
return
}
priorState, err := s.peers.ConnectionState(conn.RemotePeer())
if err != nil {
// Can happen if the peer has already disconnected, so...
priorState = peers.PeerDisconnected
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
if err := handler(context.TODO(), conn.RemotePeer()); err != nil {
log.WithError(err).Error("Disconnect handler failed")
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithField("activePeers", len(s.peers.Active())).Info("Peer disconnected")
}
}()
},
})
}
package p2p
import (
"strings"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("module", "p2p")
func logIPAddr(id peer.ID, addrs ...ma.Multiaddr) {
var correctAddr ma.Multiaddr
for _, addr := range addrs {
if strings.Contains(addr.String(), "/ip4/") || strings.Contains(addr.String(), "/ip6/") {
correctAddr = addr
break
}
}
if correctAddr != nil {
log.WithField(
"multiAddr",
correctAddr.String()+"/p2p/"+id.String(),
).Info("p2p node started")
}
}
func logExternalIPAddr(id peer.ID, addr string, port uint) {
if addr != "" {
multiAddr, err := MultiAddressBuilder(addr, port)
if err != nil {
log.WithError(err).Error("Could not create multiaddress")
return
}
log.WithField(
"multiAddr",
multiAddr.String()+"/p2p/"+id.String(),
).Info("Node started external p2p server")
}
}
package p2p
import (
"fmt"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/v3/crypto/hash"
)
func msgID(pmsg *pubsubpb.Message) string {
h := hash.Hash(pmsg.Data)
return fmt.Sprintf("%x", h[:20])
}
package p2p
import (
"testing"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
)
func TestMessageID(t *testing.T) {
tt := []pubsubpb.Message{
{
Data: []byte("hello"),
},
{
Data: []byte("world"),
},
}
wants := []string{
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c",
"486ea46224d1bb4fb680f34f7c9ad96a8f24ec88",
}
for i := range tt {
t.Run(string(tt[i].Data), func(t *testing.T) {
if msgID(&tt[i]) != wants[i] {
t.Errorf("Wanted %s, got %s", wants[i], msgID(&tt[i]))
}
})
}
}
package p2p
import (
"crypto/ecdsa"
"fmt"
"net"
"witness/version"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
ecdsaprysm "github.com/prysmaticlabs/prysm/v3/crypto/ecdsa"
)
// MultiAddressBuilder takes in an ip address string and port to produce a go multiaddr format.
func MultiAddressBuilder(ipAddr string, port uint) (ma.Multiaddr, error) {
parsedIP := net.ParseIP(ipAddr)
if parsedIP.To4() == nil && parsedIP.To16() == nil {
return nil, errors.Errorf("invalid ip address provided: %s", ipAddr)
}
if parsedIP.To4() != nil {
return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port))
}
return ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/%d", ipAddr, port))
}
// buildOptions for the libp2p host.
func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Option {
cfg := s.cfg
listen, err := MultiAddressBuilder(ip.String(), cfg.TCPPort)
if err != nil {
log.WithError(err).Fatal("Failed to p2p listen")
}
if cfg.LocalIP != "" {
if net.ParseIP(cfg.LocalIP) == nil {
log.Fatalf("Invalid local ip provided: %s", cfg.LocalIP)
}
listen, err = MultiAddressBuilder(cfg.LocalIP, cfg.TCPPort)
if err != nil {
log.WithError(err).Fatal("Failed to p2p listen")
}
}
ifaceKey, err := ecdsaprysm.ConvertToInterfacePrivkey(priKey)
if err != nil {
log.WithError(err).Fatal("Failed to retrieve private key")
}
id, err := peer.IDFromPublicKey(ifaceKey.GetPublic())
if err != nil {
log.WithError(err).Fatal("Failed to retrieve peer id")
}
log.Infof("Running node with peer id of %s ", id.String())
myPeerIDToFile(id.String())
log.Debugf("listen %s", listen.String())
options := []libp2p.Option{
privKeyOption(priKey),
libp2p.ListenAddrs(listen),
libp2p.UserAgent(version.Version),
libp2p.ConnectionGater(s),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Muxer("/yamux/6.7.0", yamux.DefaultTransport),
libp2p.DefaultMuxers,
}
options = append(options, libp2p.Security(noise.ID, noise.New))
options = append(options, libp2p.DisableRelay())
if cfg.HostAddress != "" {
options = append(options, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
external, err := MultiAddressBuilder(cfg.HostAddress, cfg.TCPPort)
if err != nil {
log.WithError(err).Error("Unable to create external multiaddress")
} else {
addrs = append(addrs, external)
}
return addrs
}))
}
return options
}
func multiAddressBuilderWithID(ipAddr, protocol string, port uint, id peer.ID) (ma.Multiaddr, error) {
parsedIP := net.ParseIP(ipAddr)
if parsedIP.To4() == nil && parsedIP.To16() == nil {
return nil, errors.Errorf("invalid ip address provided: %s", ipAddr)
}
if id.String() == "" {
return nil, errors.New("empty peer id given")
}
if parsedIP.To4() != nil {
return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/%s/%d/p2p/%s", ipAddr, protocol, port, id.String()))
}
return ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/%s/%d/p2p/%s", ipAddr, protocol, port, id.String()))
}
// Adds a private key to the libp2p option if the option was provided.
// If the private key file is missing or cannot be read, or if the
// private key contents cannot be marshaled, an exception is thrown.
func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option {
return func(cfg *libp2p.Config) error {
ifaceKey, err := ecdsaprysm.ConvertToInterfacePrivkey(privkey)
if err != nil {
return err
}
log.Debug("ECDSA private key generated")
return cfg.Apply(libp2p.Identity(ifaceKey))
}
}
package p2p
import (
"crypto/rand"
"encoding/hex"
"net"
"os"
"reflect"
"testing"
gethCrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
ecdsaprysm "github.com/prysmaticlabs/prysm/v3/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v3/network"
)
func TestPrivateKeyLoading(t *testing.T) {
file, err := os.CreateTemp(t.TempDir(), "key")
if err != nil {
t.Fatal(err)
}
key, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
if err != nil {
t.Fatal(err, "Could not generate key")
}
raw, err := key.Raw()
if err != nil {
panic(err)
}
out := hex.EncodeToString(raw)
err = os.WriteFile(file.Name(), []byte(out), 0666)
if err != nil {
t.Fatal(err, "Could not write key to file")
}
log.WithField("file", file.Name()).WithField("key", out).Info("Wrote key to file")
cfg := &Config{
PrivateKeyPath: file.Name(),
}
pKey, err := privKey(cfg)
if err != nil {
t.Fatal(err, "Could not apply option")
}
newPkey, err := ecdsaprysm.ConvertToInterfacePrivkey(pKey)
if err != nil {
panic(err)
}
rawBytes, err := key.Raw()
if err != nil {
panic(err)
}
newRaw, err := newPkey.Raw()
if err != nil {
panic(err)
}
if !reflect.DeepEqual(rawBytes, newRaw) {
t.Fatal("Private keys do not match")
}
}
func TestIPV6Support(t *testing.T) {
key, err := gethCrypto.GenerateKey()
if err != nil {
t.Fatal(err)
}
db, err := enode.OpenDB("")
if err != nil {
log.Error("could not open node's peer database")
}
lNode := enode.NewLocalNode(db, key)
mockIPV6 := net.IP{0xff, 0x02, 0xAA, 0, 0x1F, 0, 0x2E, 0, 0, 0x36, 0x45, 0, 0, 0, 0, 0x02}
lNode.Set(enr.IP(mockIPV6))
ma, err := convertToSingleMultiAddr(lNode.Node())
if err != nil {
t.Fatal(err)
}
ipv6Exists := false
for _, p := range ma.Protocols() {
if p.Name == "ip4" {
t.Error("Got ip4 address instead of ip6")
}
if p.Name == "ip6" {
ipv6Exists = true
}
}
if !ipv6Exists {
t.Error("Multiaddress did not have ipv6 protocol")
}
}
func TestDefaultMultiplexers(t *testing.T) {
var cfg libp2p.Config
_ = cfg
p2pCfg := &Config{
TCPPort: 2000,
UDPPort: 2000,
}
svc := &Service{cfg: p2pCfg}
var err error
svc.privKey, err = privKey(svc.cfg)
if err != nil {
t.Fatal(err)
}
ipAddr := network.IPAddr()
opts := svc.buildOptions(ipAddr, svc.privKey)
err = cfg.Apply(append(opts, libp2p.FallbackDefaults)...)
if err != nil {
t.Fatal(err)
}
if "/mplex/6.7.0" != cfg.Muxers[0].ID {
t.Fatal("Mplex not set as default")
}
if "/yamux/1.0.0" != cfg.Muxers[1].ID {
t.Fatal("Yamux not set as default")
}
}
package peers
import (
"context"
"math"
"math/rand"
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/peerdata"
)
const (
// MinBackOffDuration minimum amount (in milliseconds) to wait before peer is re-dialed.
// When node and peer are dialing each other simultaneously connection may fail. In order, to break
// of constant dialing, peer is assigned some backoff period, and only dialed again once that backoff is up.
MinBackOffDuration = 100
// MaxBackOffDuration maximum amount (in milliseconds) to wait before peer is re-dialed.
MaxBackOffDuration = 5000
DefaultBadNodeReleaseDuration = time.Minute
)
type Status struct {
ctx context.Context
store *Store
ipTracker map[string]uint64
rand *rand.Rand
config *StatusConfig
}
type StatusConfig struct {
MaxInboundPeers int
MaxOutboundPeers int
MaxPeers int
MaxBadResponses int
BadNodeReleaseDuration time.Duration
}
func NewStatus(ctx context.Context, cfg *StatusConfig) *Status {
store := NewStore(ctx, &storeConfig{
MaxInboundPeers: cfg.MaxInboundPeers,
MaxOutboundPeers: cfg.MaxOutboundPeers,
MaxPeers: cfg.MaxPeers,
MaxBadResponses: cfg.MaxBadResponses,
})
// fallback set
if cfg.BadNodeReleaseDuration == 0 {
cfg.BadNodeReleaseDuration = DefaultBadNodeReleaseDuration
}
return &Status{
ctx: ctx,
store: store,
ipTracker: make(map[string]uint64),
rand: rand.New(rand.NewSource(rand.Int63())),
config: cfg,
}
}
// MaxPeerLimit returns the max peer limit stored in the current peer store.
func (p *Status) MaxPeerLimit() int {
return p.store.Config().MaxPeers
}
// MaxInboundPeerLimit returns the max inbound peer limit stored in the current peer store.
func (p *Status) MaxInboundPeerLimit() int {
return p.store.Config().MaxInboundPeers
}
// MaxOutboundPeerLimit returns the max inbound peer limit stored in the current peer store.
func (p *Status) MaxOutboundPeerLimit() int {
return p.store.Config().MaxOutboundPeers
}
// MaxBadResponses returns the max bad responses stored in the current peer store.
func (p *Status) MaxBadResponses() int {
return p.store.Config().MaxBadResponses
}
// Add adds a peer.
// If a peer already exists with this ID its address and direction are updated with the supplied data.
func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, direction network.Direction) {
p.store.Lock()
defer p.store.Unlock()
if peerData, ok := p.store.PeerData(pid); ok {
// Peer already exists, just update its address info.
prevAddress := peerData.Address
peerData.Address = address
peerData.Direction = direction
if record != nil {
peerData.Enr = record
}
if !sameIP(prevAddress, address) {
p.addIpToTracker(pid)
}
return
}
peerData := &PeerData{
Address: address,
Direction: direction,
// Peers start disconnected; state will be updated when the handshake process begins.
ConnState: PeerDisconnected,
}
if record != nil {
peerData.Enr = record
}
p.store.SetPeerData(pid, peerData)
p.addIpToTracker(pid)
}
// SetConnectionState sets the connection state of the given remote peer.
func (p *Status) SetConnectionState(pid peer.ID, state PeerConnectionState) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.ConnState = state
}
// ConnectionState gets the connection state of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ConnState, nil
}
return PeerDisconnected, peerdata.ErrPeerUnknown
}
// InboundConnected returns the current batch of inbound peers that are connected.
func (p *Status) InboundConnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound {
peers = append(peers, pid)
}
}
return peers
}
// OutboundConnected returns the current batch of outbound peers that are connected.
func (p *Status) OutboundConnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound {
peers = append(peers, pid)
}
}
return peers
}
// Active returns the peers that are connecting or connected.
func (p *Status) Active() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected {
peers = append(peers, pid)
}
}
return peers
}
// RandomizeBackOff adds extra backoff period during which peer will not be dialed.
func (p *Status) RandomizeBackOff(pid peer.ID) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.store.PeerDataGetOrCreate(pid)
// No need to add backoff period, if the previous one hasn't expired yet.
if !time.Now().After(peerData.NextValidTime) {
return
}
duration := time.Duration(math.Max(MinBackOffDuration, float64(p.rand.Intn(MaxBackOffDuration)))) * time.Millisecond
peerData.NextValidTime = time.Now().Add(duration)
}
// IsReadyToDial checks where the given peer is ready to be
// dialed again.
func (p *Status) IsReadyToDial(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
timeIsZero := peerData.NextValidTime.IsZero()
isInvalidTime := peerData.NextValidTime.After(time.Now())
return timeIsZero || !isInvalidTime
}
// If no record exists, we don't restrict dials to the
// peer.
return true
}
// IsActive checks if a peers is active and returns the result appropriately.
func (p *Status) IsActive(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
peerData, ok := p.store.PeerData(pid)
return ok && (peerData.ConnState == PeerConnected || peerData.ConnState == PeerConnecting)
}
// IncBadResponses increments the number of bad responses received from the given peer.
func (p *Status) IncBadResponses(pid peer.ID) {
p.store.Lock()
defer p.store.Unlock()
peerData, ok := p.store.PeerData(pid)
if !ok {
p.store.SetPeerData(pid, &PeerData{
BadResponses: 1,
})
return
}
if time.Now().Before(peerData.NextBadNodeReleaseTime) {
return
}
peerData.BadResponses++
if peerData.BadResponses >= p.MaxBadResponses() {
// freeze for a while
peerData.NextBadNodeReleaseTime = time.Now().Add(p.config.BadNodeReleaseDuration)
}
}
// IsBad states if the peer is to be considered bad (by *any* of the registered scorers).
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (p *Status) IsBad(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
return p.isBad(pid)
}
// isBad is the lock-free version of IsBad.
func (p *Status) isBad(pid peer.ID) bool {
return p.isFromBadIP(pid) || p.isFromBadResponses(pid)
}
// this method assumes the store lock is acquired before
// executing the method.
func (p *Status) isFromBadIP(pid peer.ID) bool {
peerData, ok := p.store.PeerData(pid)
if !ok {
return false
}
if peerData.Address == nil {
return false
}
_, err := manet.ToIP(peerData.Address)
if err != nil {
return true
}
return false
}
// isFromBadResponses
func (p *Status) isFromBadResponses(pid peer.ID) bool {
peerData, ok := p.store.PeerData(pid)
if !ok {
return false
}
// release bad node
if !peerData.NextBadNodeReleaseTime.IsZero() && time.Now().After(peerData.NextBadNodeReleaseTime) {
peerData.BadResponses = 0
peerData.NextBadNodeReleaseTime = time.Time{}
}
return peerData.BadResponses >= p.MaxBadResponses()
}
func (p *Status) addIpToTracker(pid peer.ID) {
data, ok := p.store.PeerData(pid)
if !ok {
return
}
if data.Address == nil {
return
}
ip, err := manet.ToIP(data.Address)
if err != nil {
// Should never happen, it is
// assumed every IP coming in
// is a valid ip.
return
}
// Ignore loopback addresses.
if ip.IsLoopback() {
return
}
stringIP := ip.String()
p.ipTracker[stringIP] += 1
}
func (p *Status) tallyIPTracker() {
tracker := map[string]uint64{}
// Iterate through all peers.
for _, peerData := range p.store.Peers() {
if peerData.Address == nil {
continue
}
ip, err := manet.ToIP(peerData.Address)
if err != nil {
// Should never happen, it is
// assumed every IP coming in
// is a valid ip.
continue
}
stringIP := ip.String()
tracker[stringIP] += 1
}
p.ipTracker = tracker
}
func sameIP(firstAddr, secondAddr ma.Multiaddr) bool {
// Exit early if we do get nil multiaddresses
if firstAddr == nil || secondAddr == nil {
return false
}
firstIP, err := manet.ToIP(firstAddr)
if err != nil {
return false
}
secondIP, err := manet.ToIP(secondAddr)
if err != nil {
return false
}
return firstIP.Equal(secondIP)
}
package peers
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)
type PeerConnectionState int
const (
// PeerDisconnected means there is no connection to the peer.
PeerDisconnected PeerConnectionState = iota
// PeerDisconnecting means there is an on-going attempt to disconnect from the peer.
PeerDisconnecting
// PeerConnected means the peer has an active connection.
PeerConnected
// PeerConnecting means there is an on-going attempt to connect to the peer.
PeerConnecting
)
type PeerData struct {
Address ma.Multiaddr
Direction network.Direction
ConnState PeerConnectionState
Enr *enr.Record
NextValidTime time.Time
BadResponses int
NextBadNodeReleaseTime time.Time
}
type Store struct {
ctx context.Context
peers map[peer.ID]*PeerData
config *storeConfig
sync.RWMutex
}
type storeConfig struct {
MaxInboundPeers int
MaxOutboundPeers int
MaxPeers int
MaxBadResponses int
}
func NewStore(ctx context.Context, cfg *storeConfig) *Store {
return &Store{
ctx: ctx,
peers: make(map[peer.ID]*PeerData),
config: cfg,
}
}
// PeerData returns data associated with a given peer, if any.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) PeerData(pid peer.ID) (*PeerData, bool) {
peerData, ok := s.peers[pid]
return peerData, ok
}
// PeerDataGetOrCreate returns data associated with a given peer.
// If no data has been associated yet, newly created and associated data object is returned.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) PeerDataGetOrCreate(pid peer.ID) *PeerData {
if peerData, ok := s.peers[pid]; ok {
return peerData
}
s.peers[pid] = &PeerData{}
return s.peers[pid]
}
// SetPeerData updates data associated with a given peer.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) SetPeerData(pid peer.ID, data *PeerData) {
s.peers[pid] = data
}
// DeletePeerData removes data associated with a given peer.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) DeletePeerData(pid peer.ID) {
delete(s.peers, pid)
}
// Peers returns map of peer data objects.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) Peers() map[peer.ID]*PeerData {
return s.peers
}
// Config return store config
func (s *Store) Config() *storeConfig {
return s.config
}
package p2p
import (
"context"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/pkg/errors"
)
const (
// overlay parameters
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark
gossipSubDhi = 12 // topic stable mesh high watermark
// gossip parameters
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about
gossipSubSeenTTL = 60 // number of heartbeat intervals to retain message IDs
// heartbeat interval
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
)
// JoinTopic will join PubSub topic, if not already joined.
func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) {
s.joinedTopicsLock.Lock()
defer s.joinedTopicsLock.Unlock()
if _, ok := s.joinedTopics[topic]; !ok {
topicHandle, err := s.pubsub.Join(topic, opts...)
if err != nil {
return nil, err
}
s.joinedTopics[topic] = topicHandle
}
return s.joinedTopics[topic], nil
}
// LeaveTopic closes topic and removes corresponding handler from list of joined topics.
// This method will return error if there are outstanding event handlers or subscriptions.
func (s *Service) LeaveTopic(topic string) error {
s.joinedTopicsLock.Lock()
defer s.joinedTopicsLock.Unlock()
if t, ok := s.joinedTopics[topic]; ok {
if err := t.Close(); err != nil {
return err
}
delete(s.joinedTopics, topic)
}
return nil
}
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return err
}
if s.cfg.SingleNode {
return topicHandle.Publish(ctx, data, opts...)
}
// Wait for at least 1 peer to be available to receive the published message.
for {
if len(topicHandle.ListPeers()) > 0 {
return topicHandle.Publish(ctx, data, opts...)
}
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
default:
time.Sleep(100 * time.Millisecond)
}
}
}
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return nil, err
}
return topicHandle.Subscribe(opts...)
}
// Creates a list of pubsub options to configure out router with.
func (s *Service) pubsubOptions() []pubsub.Option {
psOpts := []pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(func(pmsg *pubsubpb.Message) string {
return msgID(pmsg)
}),
pubsub.WithPeerOutboundQueueSize(pubsubQueueSize),
pubsub.WithMaxMessageSize(GossipMaxSize),
pubsub.WithValidateQueueSize(pubsubQueueSize),
pubsub.WithGossipSubParams(pubsubGossipParam()),
}
return psOpts
}
// creates a custom gossipsub parameter set.
func pubsubGossipParam() pubsub.GossipSubParams {
gParams := pubsub.DefaultGossipSubParams()
gParams.Dlo = gossipSubDlo
gParams.D = gossipSubD
gParams.HeartbeatInterval = gossipSubHeartbeatInterval
gParams.HistoryLength = gossipSubMcacheLen
gParams.HistoryGossip = gossipSubMcacheGossip
return gParams
}
// We have to unfortunately set this globally in order
// to configure our message id time-cache rather than instantiating
// it with a router instance.
func setPubSubParameters() {
pubsub.TimeCacheDuration = gossipSubSeenTTL * gossipSubHeartbeatInterval
}
package p2p
import (
"context"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/sirupsen/logrus"
)
// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
//
// When done, the caller must Close or Reset on the stream.
func (s *Service) Send(ctx context.Context, message []byte, baseTopic string, pid peer.ID) (network.Stream, error) {
topic := baseTopic
log.WithFields(logrus.Fields{
"topic": topic,
"request length": len(message),
}).Tracef("Sending RPC request to peer %s", pid.String())
// Apply max dial timeout when opening a new stream.
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
stream, err := s.host.NewStream(ctx, pid, protocol.ID(topic))
if err != nil {
return nil, err
}
_, err = stream.Write(message)
if err != nil {
return stream, err
}
// Close stream for writing.
if err := stream.CloseWrite(); err != nil {
_err := stream.Reset()
_ = _err
return nil, err
}
return stream, nil
}
// Package p2p including peer discovery using discv5, gossip-sub
// using libp2p, and handing peer lifecycles + handshakes.
package p2p
import (
"context"
"crypto/ecdsa"
"fmt"
"net"
"strings"
"sync"
"time"
"witness/p2p/peers"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/async/event"
prysmnetwork "github.com/prysmaticlabs/prysm/v3/network"
)
// In the event that we are at our peer limit, we
// stop looking for new peers and instead poll
// for the current peer limit status for the time period
// defined below.
var pollingPeriod = time.Second * 12
// maxDialTimeout is the timeout for a single peer dial.
var maxDialTimeout = time.Second * 10
// pubsubQueueSize is the size that we assign to our validation queue and outbound message queue for
// gossipsub.
const pubsubQueueSize = 80
const GossipMaxSize = 100 * 1 << 20 // 10MB
const MaxChunkSize = 10 * 1 << 20 // 1MB
type subSession struct {
sub event.Subscription
listeners int
}
// Service for managing peer to peer (p2p) networking.
type Service struct {
started bool
cancel context.CancelFunc
cfg *Config
privKey *ecdsa.PrivateKey
pubsub *pubsub.PubSub
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.Mutex
dv5Listener Listener
startupErr error
ctx context.Context
host host.Host
peers *peers.Status
subManage map[string]*subSession
subManageLock sync.Mutex
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
// connections are made until the Start function is called during the service registry startup.
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
var err error
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
s := &Service{
ctx: ctx,
cancel: cancel,
cfg: cfg,
subManage: make(map[string]*subSession),
joinedTopics: make(map[string]*pubsub.Topic),
}
dv5Nodes, _ := parseBootStrapAddrs(s.cfg.BootstrapNodeAddr)
cfg.Discv5BootStrapAddr = dv5Nodes
s.privKey, err = privKey(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to generate p2p private key")
return nil, err
}
// get external ip address
ipAddr := prysmnetwork.IPAddr()
if cfg.HostAddress != "" {
ipAddr = net.ParseIP("0.0.0.0")
if ipAddr.To4() == nil && ipAddr.To16() == nil {
log.Errorf("Invalid host address given: %s", ipAddr.String())
return nil, fmt.Errorf("invalid host address given: %s", ipAddr.String())
}
}
opts := s.buildOptions(ipAddr, s.privKey)
h, err := libp2p.New(opts...)
if err != nil {
log.WithError(err).Error("Failed to create p2p host")
return nil, err
}
s.host = h
// Gossipsub registration is done before we add in any new peers
// due to libp2p's gossipsub implementation not taking into
// account previously added peers when creating the gossipsub
// object.
psOpts := s.pubsubOptions()
// Set the pubsub global parameters that we require.
setPubSubParameters()
// Reinitialize them in the event we are running a custom config.
gs, err := pubsub.NewGossipSub(s.ctx, s.host, psOpts...)
if err != nil {
log.WithError(err).Error("Failed to start pubsub")
return nil, err
}
s.pubsub = gs
s.peers = peers.NewStatus(s.ctx, &peers.StatusConfig{
MaxInboundPeers: int(cfg.MaxInboundPeers),
MaxOutboundPeers: int(cfg.MaxOutboundPeers),
MaxPeers: int(cfg.MaxPeers),
MaxBadResponses: 3,
})
return s, nil
}
func (s *Service) StartP2PNode(cfg *Config) (err error) {
srv, err := NewService(context.Background(), cfg)
if err != nil {
return err
}
srv.AddConnectionHandler(onConnectSuccess, onConnectFail)
srv.AddDisconnectionHandler(onDisconnect)
srv.Start()
return nil
}
// Start the p2p service.
func (s *Service) Start() {
if s.started {
log.Error("Attempted to start p2p service when it was already started")
return
}
if s.cfg.UseDiscovery {
ipAddr := prysmnetwork.IPAddr()
listener, err := s.startDiscoveryV5(
ipAddr,
s.privKey,
)
if err != nil {
log.WithError(err).Fatal("Failed to start discovery")
s.startupErr = err
return
}
log.Debug("debug: connecting to bootnode ", len(s.cfg.Discv5BootStrapAddr))
err = s.connectToBootnodes()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")
s.startupErr = err
return
}
s.dv5Listener = listener
go s.listenForNewNodes()
}
s.started = true
if len(s.cfg.StaticPeers) > 0 {
addrs, err := PeersFromStringAddrs(s.cfg.StaticPeers)
if err != nil {
log.WithError(err).Error("Could not connect to static peer")
}
s.connectWithAllPeers(addrs, false)
}
multiAddrs := s.host.Network().ListenAddresses()
logIPAddr(s.host.ID(), multiAddrs...)
p2pHostAddress := s.cfg.HostAddress
p2pTCPPort := s.cfg.TCPPort
if p2pHostAddress != "" {
logExternalIPAddr(s.host.ID(), p2pHostAddress, p2pTCPPort)
verifyConnectivity(p2pHostAddress, p2pTCPPort, "tcp")
}
}
// Stop the p2p service and terminate all peer connections.
func (s *Service) Stop() error {
defer s.cancel()
s.started = false
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
return nil
}
// Status of the p2p service. Will return an error if the service is considered unhealthy to
// indicate that this node should not serve traffic until the issue has been resolved.
func (s *Service) Status() error {
if !s.started {
return errors.New("not running")
}
if s.startupErr != nil {
return s.startupErr
}
return nil
}
// Started returns true if the p2p service has successfully started.
func (s *Service) Started() bool {
return s.started
}
// PubSub returns the p2p pubsub framework.
func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {
return s.host
}
// SetStreamHandler sets the protocol handler on the p2p host multiplexer.
// This method is a pass through to libp2pcore.Host.SetStreamHandler.
func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler) {
s.host.SetStreamHandler(protocol.ID(topic), handler)
}
// PeerID returns the Peer ID of the local peer.
func (s *Service) PeerID() peer.ID {
return s.host.ID()
}
// Disconnect from a peer.
func (s *Service) Disconnect(pid peer.ID) error {
return s.host.Network().ClosePeer(pid)
}
// Connect to a specific peer.
func (s *Service) Connect(pi peer.AddrInfo) error {
return s.host.Connect(s.ctx, pi)
}
func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr, isBootnode bool) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
log.WithError(err).Error("Could not convert to peer address info's from multiaddresses")
return
}
for _, info := range addrInfos {
for _, addr := range info.Addrs {
ipAddr := strings.Split(addr.String(), "/")[2]
if s.cfg.IgnoreLocalIP && s.isLocalIP(ipAddr) {
continue
}
}
// make each dial non-blocking
go func(info peer.AddrInfo) {
log.Debugf("connecting to peer %s", info.String())
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Errorf("Could not connect with peer %s, err %s", info.String(), err.Error())
} else if isBootnode {
// save to file
}
}(info)
}
}
func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error {
if info.ID == s.host.ID() {
return nil
}
if s.peers.IsBad(info.ID) {
return errors.New("refused to connect to bad peer")
}
for _, addr := range info.Addrs {
ipAddr := strings.Split(addr.String(), "/")[2]
if s.cfg.IgnoreLocalIP && s.isLocalIP(ipAddr) {
return nil
}
}
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.peers.IncBadResponses(info.ID)
return err
}
log.WithField("peer", info.ID).Debug("Connected to new peer")
return nil
}
func (s *Service) connectToBootnodes() error {
nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddr))
for _, addr := range s.cfg.Discv5BootStrapAddr {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return err
}
log.Debug("Adding bootnode to the list", bootNode.String())
// do not dial bootnodes with their tcp ports not set
if err := bootNode.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Error("Could not retrieve tcp port")
}
continue
}
if s.cfg.IgnoreLocalIP && s.isLocalIP(bootNode.IP().String()) {
continue
}
nodes = append(nodes, bootNode)
}
multiAddresses := convertToMultiAddr(nodes)
s.connectWithAllPeers(multiAddresses, true)
return nil
}
func (s *Service) isLocalIP(ipAddr string) bool {
return ipAddr == s.cfg.LocalIP || ipAddr == s.cfg.HostAddress || ipAddr == "127.0.0.1"
}
package p2p
import (
"crypto/ecdsa"
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"os"
"time"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/pkg/errors"
ecdsaprysm "github.com/prysmaticlabs/prysm/v3/crypto/ecdsa"
"github.com/sirupsen/logrus"
)
const dialTimeout = 1 * time.Second
const enrRecordFile = "discovery-enr"
const peerIDFile = "peer-id"
// Determines a private key for p2p networking from the p2p service's
// configuration struct. If no key is found, it generates a new one.
func privKey(cfg *Config) (*ecdsa.PrivateKey, error) {
privateKeyPath := cfg.PrivateKeyPath
_, err := os.Stat(privateKeyPath)
privateKeyExist := !os.IsNotExist(err)
if err != nil && privateKeyExist {
return nil, err
}
if privateKeyPath == "" || !privateKeyExist {
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
if err != nil {
return nil, err
}
return ecdsaprysm.ConvertFromInterfacePrivKey(priv)
}
return privKeyFromFile(privateKeyPath)
}
// Retrieves a p2p networking private key from a file path.
func privKeyFromFile(path string) (*ecdsa.PrivateKey, error) {
src, err := os.ReadFile(path) // #nosec G304
if err != nil {
log.WithError(err).Error("Error reading private key from file")
return nil, err
}
dst := make([]byte, hex.DecodedLen(len(src)))
_, err = hex.Decode(dst, src)
if err != nil {
return nil, errors.Wrap(err, "failed to decode hex string")
}
unmarshalledKey, err := crypto.UnmarshalSecp256k1PrivateKey(dst)
if err != nil {
return nil, err
}
return ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledKey)
}
// Attempt to dial an address to verify its connectivity
func verifyConnectivity(addr string, port uint, protocol string) {
if addr != "" {
a := net.JoinHostPort(addr, fmt.Sprintf("%d", port))
fields := logrus.Fields{
"protocol": protocol,
"address": a,
}
conn, err := net.DialTimeout(protocol, a, dialTimeout)
if err != nil {
log.WithError(err).WithFields(fields).Warn("IP address is not accessible")
return
}
if err := conn.Close(); err != nil {
log.WithError(err).Debug("Could not close connection")
}
}
}
func enrRecordToFile(enr string) {
// create or overwrite
f, err := os.Create(enrRecordFile)
if err != nil {
log.WithError(err).Warn("Error creating enrRecordFile")
return
}
defer f.Close()
_, err = f.WriteString(enr)
if err != nil {
log.WithError(err).Warn("Error writing enrRecordFile")
return
}
}
func myPeerIDToFile(peerID string) {
f, err := os.Create(peerIDFile)
if err != nil {
log.WithError(err).Warn("Error creating peerIDFile")
return
}
defer f.Close()
_, err = f.WriteString(peerID)
if err != nil {
log.WithError(err).Warn("Error writing peerIDFile")
return
}
}
package quest
import (
"fmt"
"witness/conf"
log "github.com/sirupsen/logrus"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type Quest struct {
db *gorm.DB
}
func NewQuest(cfg *conf.QuestConfig) (q *Quest) {
dsn := fmt.Sprintf("host=%s port=%d user=%s dbname=%s password=%s sslmode=disable", cfg.Host, cfg.Port, cfg.User, cfg.Database, cfg.Password)
_db, err := gorm.Open(postgres.Open(dsn))
if err != nil {
panic(err)
}
testSQL := "SELECT 1;"
err = _db.Raw(testSQL).Error
if err != nil {
panic(err)
}
log.Info("connect quest success")
return &Quest{db: _db}
}
func (q *Quest) GetProofs(startTimestamp, endTimestamp int64, lastTaskID string, limit int) (proofs []*ProofModel, err error) {
querySQL := "SELECT " +
"`TaskType`, `TaskId`, `TaskFinishTimestamp`, `TaskWorkload`, `TaskReqHash`, `TaskRespHash`, `TaskManagerSignature`, `TaskContainerSignature`, `TaskMinerSignature`, `TaskProfitAccount`, `TaskWorkerAccount` " +
"FROM `proof` " +
"WHERE `TaskFinishTimestamp` >= ? AND `TaskFinishTimestamp` < ? " +
"AND `TaskId` > ? " +
"ORDER BY `TaskId` ASC " +
"LIMIT ?;"
err = q.db.Debug().Raw(querySQL, startTimestamp, endTimestamp, lastTaskID, limit).Scan(&proofs).Error
return
}
package quest
import (
"math/big"
"witness/util"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
type ProofModel struct {
TaskType int
TaskId string
TaskFinishTimestamp int
TaskWorkload uint64
TaskReqHash string
TaskRespHash string
TaskManagerSignature string
TaskContainerSignature string
TaskMinerSignature string
TaskProfitAccount string
TaskWorkerAccount string
}
func (p *ProofModel) GenerateHashPayload() []byte {
return crypto.Keccak256(util.CombineBytes(
[]byte(p.TaskId),
common.Hex2Bytes(p.TaskReqHash),
common.Hex2Bytes(p.TaskRespHash),
))
}
func (p *ProofModel) GenerateNMHashPayload() []byte {
return crypto.Keccak256(util.CombineBytes(
[]byte(p.TaskId),
common.Hex2Bytes(p.TaskReqHash),
common.Hex2Bytes(p.TaskRespHash),
common.HexToAddress(p.TaskProfitAccount).Bytes(),
common.HexToAddress(p.TaskWorkerAccount).Bytes(),
common.Hex2Bytes(p.TaskContainerSignature),
common.Hex2Bytes(p.TaskMinerSignature),
big.NewInt(int64(p.TaskWorkload)).Bytes(),
big.NewInt(int64(p.TaskFinishTimestamp)).Bytes(),
))
}
package tree
import (
"encoding/hex"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
......@@ -11,33 +12,68 @@ import (
func TestMT(t *testing.T) {
proofs := make([]common.Hash, 0)
proofs = append(proofs, crypto.Keccak256Hash(common.HexToHash("0x1").Bytes()))
proofs = append(proofs, crypto.Keccak256Hash(common.HexToHash("0x2").Bytes()))
proofs = append(proofs, crypto.Keccak256Hash(common.HexToHash("0x3").Bytes()))
proofs = append(proofs, crypto.Keccak256Hash(common.HexToHash("0x4").Bytes()))
proofs = append(proofs, crypto.Keccak256Hash(common.HexToHash("0x5").Bytes()))
proofs = append(proofs, crypto.Keccak256Hash(common.HexToHash("0x6").Bytes()))
address := common.HexToAddress("0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326")
amount := big.NewInt(0).SetUint64(10)
payload := append(address.Bytes(), common.LeftPadBytes(amount.Bytes(), 32)...)
data := crypto.Keccak256Hash(payload)
proofs = append(proofs, data)
for i := 100; i < 200000; i++ {
_proof := crypto.Keccak256Hash(big.NewInt(0).SetInt64(int64(i)).Bytes())
proofs = append(proofs, _proof)
}
// address := common.HexToAddress("0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326")
// amount := big.NewInt(0).SetUint64(10)
// payload := append(address.Bytes(), common.LeftPadBytes(amount.Bytes(), 32)...)
// data := crypto.Keccak256Hash(payload)
// proofs = append(proofs, data)
st := time.Now()
tree, err := NewMerkleTree(proofs)
if err != nil {
t.Fatal(err)
}
mproofs, err := tree.GetProof(data)
if err != nil {
t.Fatal(err)
// mproofs, err := tree.GetProof(data)
// if err != nil {
// t.Fatal(err)
// }
// _ = mproofs
t.Log("root:", tree.GetRoot().Hex())
// for _, el := range mproofs {
// t.Log(hex.EncodeToString(el[:]))
// }
t.Log("---")
for i := len(tree.layers) - 1; i >= 0; i-- {
fmt.Println(len(tree.layers[i]), tree.layers[i])
if i < len(tree.layers)-5 {
break
}
}
t.Log("---")
t.Log(time.Since(st))
t.Log(tree.GetRoot().Hex())
tnode := tree.FindNode(common.HexToHash("0xa05a2b09dda15efbc0e7a2f9cd779d902d765c5ecb1abb3ef06a450f55a9fce7"))
t.Log("tnode", tnode)
t.Log(time.Since(st))
for _, el := range mproofs {
t.Log(hex.EncodeToString(el[:]))
layers := Traversal(tnode, 5)
t.Log(time.Since(st))
for _, layer := range layers {
fmt.Println(len(layer), layer)
}
// nodes := [][]common.Hash{}
// for i := len(tree.layers) - 1; i >= 0; i-- {
// layer := tree.layers[i]
// if len(layer)%2 == 1 && i != len(tree.layers)-1 {
// layer = append(layer, common.Hash{})
// }
// nodes = append(nodes, layer)
// t.Log(layer)
// }
// t.Log("==")
// t.Log(nodes)
// treeNode := buildTree(nodes)
// t.Log("---")
// levelOrderTraversal(treeNode, common.HexToHash("0xad793b7e9f57201b9bfa52639a3a5ca6fe86cca2b69adf5295cf5f9fd2a5d12a"), 3)
}
......@@ -3,6 +3,7 @@ package tree
import (
"bytes"
"errors"
"fmt"
"math"
"sort"
......@@ -28,6 +29,13 @@ type MerkleTree struct {
layers [][]common.Hash
proofs Proofs
bufferElementPositionIndex map[common.Hash]int
treeNode *TreeNode
}
type TreeNode struct {
Hash common.Hash
Left *TreeNode
Right *TreeNode
}
func NewMerkleTree(proofs Proofs) (*MerkleTree, error) {
......@@ -48,6 +56,7 @@ func NewMerkleTree(proofs Proofs) (*MerkleTree, error) {
if err != nil {
return nil, err
}
tree.buildTree()
return &tree, nil
}
......@@ -114,3 +123,111 @@ func (m *MerkleTree) getPairElement(idx int, layer Proofs) (common.Hash, bool) {
return layer[pairIdx], true
}
func (m *MerkleTree) buildTree() {
nodes := make([][]common.Hash, 0)
for i := len(m.layers) - 1; i >= 0; i-- {
if len(m.layers[i])%2 == 1 && i != len(m.layers)-1 {
nodes = append(nodes, append(m.layers[i], common.Hash{}))
} else {
nodes = append(nodes, m.layers[i])
}
}
m.treeNode = buildTree(nodes)
}
func (m *MerkleTree) GetRootNode() *TreeNode {
return m.treeNode
}
func (m *MerkleTree) FindNode(hash common.Hash) *TreeNode {
if m.treeNode == nil {
fmt.Println("tnn", m.treeNode)
return nil
}
queue := []*TreeNode{m.treeNode}
for len(queue) > 0 {
size := len(queue)
for i := 0; i < size; i++ {
currentNode := queue[0]
queue = queue[1:]
if currentNode.Hash.Hex() == hash.Hex() {
fmt.Println("nodev", currentNode.Hash.Hex())
return currentNode
}
if currentNode.Left != nil {
queue = append(queue, currentNode.Left)
}
if currentNode.Right != nil {
queue = append(queue, currentNode.Right)
}
}
}
return nil
}
func Traversal(root *TreeNode, depth int) (retNodes [][]common.Hash) {
if root == nil {
return nil
}
queue := []*TreeNode{root}
currentDepth := 1
for len(queue) > 0 {
size := len(queue)
layerNodes := make([]common.Hash, 0)
for i := 0; i < size; i++ {
currentNode := queue[0]
queue = queue[1:]
layerNodes = append(layerNodes, currentNode.Hash)
if currentNode.Left != nil {
queue = append(queue, currentNode.Left)
}
if currentNode.Right != nil {
queue = append(queue, currentNode.Right)
}
}
retNodes = append(retNodes, layerNodes)
currentDepth++
if currentDepth > depth {
break
}
}
return
}
func buildTree(nodes [][]common.Hash) *TreeNode {
if len(nodes) == 0 {
return nil
}
root := &TreeNode{Hash: nodes[0][0]}
queue := []*TreeNode{root}
for i := 1; i < len(nodes); i++ {
var levelNodes []*TreeNode
for j := 0; j < len(nodes[i]); j += 2 {
current := queue[0]
queue = queue[1:]
current.Left = &TreeNode{Hash: nodes[i][j]}
levelNodes = append(levelNodes, current.Left)
if j+1 < len(nodes[i]) {
current.Right = &TreeNode{Hash: nodes[i][j+1]}
levelNodes = append(levelNodes, current.Right)
}
}
queue = append(queue, levelNodes...)
}
return root
}
package util
func CombineBytes(args ...[]byte) []byte {
var ret []byte
for _, arg := range args {
ret = append(ret, arg...)
}
return ret
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment