Commit e7204109 authored by 贾浩@五瓣科技's avatar 贾浩@五瓣科技

update

parents
.idea
.DS_Store
.vscode
build
./log
ca.crt
\ No newline at end of file
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY . .
RUN go mod tidy && go build -v -o /tmp/api ./cmd/api && go build -v -o /tmp/sync ./cmd/server
FROM alpine:latest
WORKDIR /app
COPY ./config.toml .
COPY --from=builder /tmp/server /usr/bin/server
COPY --from=builder /tmp/sync /usr/bin/sync
EXPOSE 8080
\ No newline at end of file
.PHONY: default all clean dev sync server
GOBIN = $(shell pwd)/build/bin
default: all
all: server sync
server:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ ./cmd/server
sync:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ ./cmd/sync
docker:
docker build -t caduceus/contract_backend:latest -f Dockerfile .
package main
import (
"contract_backend/config"
"contract_backend/dao"
"contract_backend/server"
"flag"
"io"
"os"
log "github.com/sirupsen/logrus"
)
func initLog() {
file, _ := os.OpenFile("log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
mw := io.MultiWriter(os.Stdout, file)
log.SetOutput(mw)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
func main() {
initLog()
flag.Parse()
cfg, err := config.New()
if err != nil {
panic(err)
}
da, err := dao.New(cfg)
if err != nil {
panic(err)
}
if cfg.Debug {
log.SetLevel(log.DebugLevel)
}
server.StartServer(cfg, da)
}
package main
import (
"contract_backend/config"
"contract_backend/dao"
monitor "contract_backend/monitor_task"
"flag"
"io"
"os"
log "github.com/sirupsen/logrus"
)
func initLog() {
file, _ := os.OpenFile("log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
mw := io.MultiWriter(os.Stdout, file)
log.SetOutput(mw)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
func main() {
initLog()
flag.Parse()
cfg, err := config.New()
if err != nil {
panic(err)
}
da, err := dao.New(cfg)
if err != nil {
panic(err)
}
if cfg.Debug {
log.SetLevel(log.DebugLevel)
}
m := monitor.NewMonitor(cfg, da)
m.Start()
}
debug = true
[pgsql]
host = "aws-0-ap-northeast-1.pooler.supabase.com"
port = 5432
user = "postgres.vbxtvjffhsirnyxjcuku"
password = "igFIRs1WmA1VLR4o"
database = "postgres"
max_conn = 5
max_idle_conn = 2
enable_log = true
cert_file = "ca.crt"
[server]
listen = "0.0.0.0:8080"
[chain]
rpc = "https://opbnb-testnet-rpc.bnbchain.org"
user_contract = "0xb50e74F8903bb72b3bFf13c37CcC9114336102D1"
executor_contract = "0x7A47D1E515B448A5C18d12A5f603a1fd5C814B34"
x_private_key = "xprv9xwX8JDK1LBoHbfUiif8tmaj5dTGEnrYrDat8RUNowmbz2hCD8N1wydNao1TBhgTzvF1gojUwekF4J28BNo4mCVW81BY3XUwDCQhscSMaC3"
gas_sender_private_key = "2322e0021df509399321fa934e5869f914f7f82a1e366a29559d4a50bf1afa82" # 0x213EE93Ca7069C587e1a6ce5240B4A5eaD9Dd633
executor_private_key = "61acb7bf51fac4c34767ab0c8faa2205d01d006f41051d86cb284a1e2d1e6f2a"
model_price = 10
\ No newline at end of file
package config
import (
"flag"
"github.com/BurntSushi/toml"
)
type Config struct {
Debug bool `toml:"debug"`
PGSQL PGSQLConfig `toml:"pgsql"`
Server ServerConfig `toml:"server"`
Chain ChainConfig `toml:"chain"`
}
type ChainConfig struct {
RPC string `toml:"rpc"`
UserContract string `toml:"user_contract"`
ExecutorContract string `toml:"executor_contract"`
XPrivateKey string `toml:"x_private_key"`
GasSenderPrivateKey string `toml:"gas_sender_private_key"`
ExecutorPrivateKey string `toml:"executor_private_key"`
ModelPrice int `toml:"model_price"`
}
type PGSQLConfig struct {
Host string `toml:"host"`
Port int `toml:"port"`
User string `toml:"user"`
Password string `toml:"password"`
Database string `toml:"database"`
MaxConn int `toml:"max_conn"`
MaxIdleConn int `toml:"max_idle_conn"`
EnableLog bool `toml:"enable_log"`
CertFile string `toml:"cert_file"`
}
type ServerConfig struct {
Listen string `toml:"listen"`
}
var confPath = flag.String("c", "config.toml", "config file path")
func New() (config *Config, err error) {
config = new(Config)
_, err = toml.DecodeFile(*confPath, config)
return
}
package constant
const (
InvalidParam = "invalid param"
InternalError = "internal error"
)
const (
TxStatusPending = 2
TxStatusSuccess = 3
TxStatusFailed = 4
TxStatusRevert = 5
TxStatusUnknown = 6
)
[
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "executeId",
"type": "string"
},
{
"indexed": false,
"internalType": "uint256",
"name": "clusterId",
"type": "uint256"
}
],
"name": "FailExecute",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "string",
"name": "executeId",
"type": "string"
},
{
"indexed": true,
"internalType": "string",
"name": "userId",
"type": "string"
},
{
"indexed": true,
"internalType": "string",
"name": "appId",
"type": "string"
},
{
"indexed": false,
"internalType": "string[]",
"name": "modelIds",
"type": "string[]"
},
{
"indexed": false,
"internalType": "uint256",
"name": "price",
"type": "uint256"
}
],
"name": "StartExecute",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "executeId",
"type": "string"
},
{
"indexed": false,
"internalType": "uint256",
"name": "clusterId",
"type": "uint256"
}
],
"name": "SuccessExecute",
"type": "event"
},
{
"inputs": [
{
"internalType": "string",
"name": "_executeId",
"type": "string"
},
{
"internalType": "string",
"name": "_userId",
"type": "string"
},
{
"internalType": "string",
"name": "_appId",
"type": "string"
},
{
"internalType": "string[]",
"name": "_modelIds",
"type": "string[]"
},
{
"internalType": "uint256",
"name": "_price",
"type": "uint256"
}
],
"name": "executeByUserId",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "string",
"name": "_executeId",
"type": "string"
},
{
"internalType": "string",
"name": "_clusterId",
"type": "string"
}
],
"name": "failExecuteByClusterId",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "string",
"name": "_executeId",
"type": "string"
},
{
"internalType": "uint256",
"name": "_tokenId",
"type": "uint256"
}
],
"name": "failExecuteByTokenId",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "string",
"name": "_executeId",
"type": "string"
},
{
"internalType": "string",
"name": "_userId",
"type": "string"
},
{
"internalType": "string",
"name": "_appId",
"type": "string"
},
{
"internalType": "string[]",
"name": "_modelIds",
"type": "string[]"
},
{
"internalType": "uint256",
"name": "_price",
"type": "uint256"
}
],
"name": "startExecuteByUserId",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "string",
"name": "_executeId",
"type": "string"
},
{
"internalType": "string",
"name": "_clusterId",
"type": "string"
}
],
"name": "successExecuteByComputeId",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "string",
"name": "_executeId",
"type": "string"
},
{
"internalType": "uint256",
"name": "_tokenId",
"type": "uint256"
}
],
"name": "successExecuteByTokenId",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
}
]
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package dao
import (
"contract_backend/contract/aon_executor"
"contract_backend/contract/aon_user"
"crypto/ecdsa"
"math/big"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
func (d *Dao) Login(privateKey *ecdsa.PrivateKey, userId string, inviter string) (txHash common.Hash, err error) {
instance, err := aon_user.NewAonUser(common.HexToAddress(d.c.Chain.UserContract), d.ethClient)
if err != nil {
return common.Hash{}, err
}
auth, err := bind.NewKeyedTransactorWithChainID(privateKey, d.chainId)
if err != nil {
return common.Hash{}, err
}
auth.GasPrice = d.gasPrice
auth.GasLimit = 800000
tx, err := instance.Login(auth, userId, inviter)
if err != nil {
return common.Hash{}, err
}
return tx.Hash(), nil
}
func (d *Dao) ExecuteTask(privateKey *ecdsa.PrivateKey, executeId, userId, appId string, modelIds []string, price *big.Int) (txHash common.Hash, err error) {
instance, err := aon_executor.NewAonExecutor(common.HexToAddress(d.c.Chain.UserContract), d.ethClient)
if err != nil {
return common.Hash{}, err
}
auth, err := bind.NewKeyedTransactorWithChainID(privateKey, d.chainId)
if err != nil {
return common.Hash{}, err
}
auth.GasPrice = d.gasPrice
tx, err := instance.ExecuteByUserId(auth, executeId, userId, appId, modelIds, price)
if err != nil {
return common.Hash{}, err
}
return tx.Hash(), nil
}
package dao
import (
"context"
"contract_backend/config"
"fmt"
"math/big"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
)
type Dao struct {
chainId *big.Int
c *config.Config
db *gorm.DB
ethClient *ethclient.Client
ethRPC *rpc.Client
gasSenderNonce uint64
gasSenderLock sync.Mutex
gasPrice *big.Int
}
func New(_c *config.Config) (dao *Dao, err error) {
dao = &Dao{
c: _c,
}
dao.ethClient, err = ethclient.Dial(_c.Chain.RPC)
if err != nil {
return
}
dao.ethRPC, err = rpc.Dial(_c.Chain.RPC)
if err != nil {
return
}
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d",
_c.PGSQL.Host, _c.PGSQL.User, _c.PGSQL.Password, _c.PGSQL.Database, _c.PGSQL.Port,
)
chainId, err := dao.ethClient.ChainID(context.Background())
if err != nil {
return
}
dao.chainId = chainId
k, _ := crypto.HexToECDSA(_c.Chain.GasSenderPrivateKey)
dao.gasSenderNonce, _ = dao.ethClient.NonceAt(context.Background(), crypto.PubkeyToAddress(k.PublicKey), nil)
go dao.LoopGasPrice()
if _c.PGSQL.CertFile != "" {
dsn = fmt.Sprintf("%s sslmode=require sslrootcert=%s", dsn, _c.PGSQL.CertFile)
}
lgr := logger.Default
if _c.PGSQL.EnableLog {
lgr = logger.Default.LogMode(logger.Info)
}
dao.db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
},
Logger: lgr,
})
if err != nil {
return
}
sqlDB, err := dao.db.DB()
if err != nil {
return
}
sqlDB.SetMaxOpenConns(_c.PGSQL.MaxConn)
sqlDB.SetMaxIdleConns(_c.PGSQL.MaxIdleConn)
sqlDB.SetConnMaxIdleTime(time.Hour)
migrate := os.Getenv("MIGRATE")
if migrate == "true" {
// err = dao.db.AutoMigrate(&dbModel.User{}, &dbModel.Active{}, &dbModel.ChatGroup{})
if err != nil {
return
}
}
return dao, nil
}
package dao
import (
"contract_backend/model"
"encoding/json"
)
func (d *Dao) UpdateTask(id string, txHash string, status int) (err error) {
sql := `UPDATE public.exec_task SET tx_hash = ?, tx_status = ? WHERE id = ?`
return d.db.Exec(sql, txHash, status, id).Error
}
func (d *Dao) GetUnprocessedTasks(limit int) (tasks []*model.ExecTask, err error) {
sql := `SELECT id, user_id, app_key, model_ids, tx_hash, tx_status FROM public.exec_task WHERE tx_hash = '' AND tx_status = 1 LIMIT ?`
err = d.db.Raw(sql, limit).Scan(&tasks).Error
if err != nil {
return
}
for i := 0; i < len(tasks); i++ {
err = json.Unmarshal(tasks[i].ModelIds, &tasks[i].ModelIdsArray)
if err != nil {
return
}
}
return
}
package dao
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
log "github.com/sirupsen/logrus"
)
func (d *Dao) LoopGasPrice() {
gasPrice, err := d.ethClient.SuggestGasPrice(context.Background())
if err != nil {
log.WithError(err).Error("get gas price error")
panic(err)
}
d.gasPrice = big.NewInt(0).Div(
big.NewInt(0).Mul(gasPrice, big.NewInt(11)),
big.NewInt(10),
)
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
gasPrice, err := d.ethClient.SuggestGasPrice(context.Background())
if err != nil {
log.WithError(err).Error("get gas price error")
continue
}
d.gasPrice = big.NewInt(0).Div(
big.NewInt(0).Mul(gasPrice, big.NewInt(11)),
big.NewInt(10),
)
}
}
}
// EstimateGasRecharge 估算充值金额
func (d *Dao) EstimateGasRecharge(address common.Address) (val *big.Int, err error) {
_10val := new(big.Int).Mul(d.gasPrice, big.NewInt(10*500000)) // 10次 每次50w
_20val := new(big.Int).Mul(d.gasPrice, big.NewInt(20*500000)) // 20次 每次50w
currentBalance, err := d.ethClient.BalanceAt(context.Background(), address, nil)
if err != nil {
return nil, err
}
if currentBalance.Cmp(_10val) < 0 {
return new(big.Int).Sub(_20val, currentBalance), nil
}
return nil, nil
}
func (d *Dao) TransferETH(hexPrivateKey string, address common.Address, value *big.Int) (txHash common.Hash, err error) {
privateKey, _ := crypto.HexToECDSA(hexPrivateKey)
d.gasSenderLock.Lock()
defer func() {
d.gasSenderNonce++
d.gasSenderLock.Unlock()
}()
innerTx := types.LegacyTx{
Nonce: d.gasSenderNonce,
To: &address,
Value: value,
Gas: 21000,
GasPrice: d.gasPrice,
}
tx := types.NewTx(&innerTx)
from := crypto.PubkeyToAddress(privateKey.PublicKey)
gasLimit, err := d.ethClient.EstimateGas(context.Background(), ethereum.CallMsg{
From: from,
To: &address,
Value: value,
})
innerTx.Gas = gasLimit
innerTx.GasPrice = d.gasPrice
signer := types.NewCancunSigner(d.chainId)
signedTx, err := types.SignTx(tx, signer, privateKey)
if err != nil {
return common.Hash{}, err
}
return signedTx.Hash(), d.ethClient.SendTransaction(context.Background(), signedTx)
}
func (d *Dao) WaitForReceipt(hash common.Hash) (receipt *types.Receipt, err error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
queryTicker := time.NewTicker(time.Second)
defer queryTicker.Stop()
for {
receipt, err := d.ethClient.TransactionReceipt(timeoutCtx, hash)
if err == nil {
return receipt, nil
}
select {
case <-timeoutCtx.Done():
return nil, timeoutCtx.Err()
case <-queryTicker.C:
}
}
}
networks:
default:
name: aon-backend
services:
contract-backend-webhook:
image: caduceus/contract_backend:latest
pull_policy: always
container_name: contract-backend-webhook
ports:
- "16669:8080"
volumes:
- ./conf/contract_backend/config.toml:/config.toml
- ./conf/contract_backend/db.crt:/app/db.crt
- ./data/tg-contract_backend/webhook-log:/app
command:
- "/bin/sh"
- "-c"
- "/usr/bin/server -c /config.toml"
restart:
unless-stopped
contract-backend-sync:
image: caduceus/contract_backend:latest
pull_policy: always
container_name: contract-backend-sync
volumes:
- ./conf/contract_backend/config.toml:/config.toml
- ./conf/contract_backend/db.crt:/app/db.crt
- ./data/tg-contract_backend/sync-log:/app
command:
- "/bin/sh"
- "-c"
- "/usr/bin/sync -c /config.toml"
restart:
unless-stopped
\ No newline at end of file
module contract_backend
go 1.22
toolchain go1.22.8
require (
github.com/BurntSushi/toml v1.4.0
github.com/btcsuite/btcutil v1.0.2
github.com/ethereum/go-ethereum v1.14.11
github.com/gin-contrib/cors v1.7.2
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.3.0
github.com/sirupsen/logrus v1.9.3
github.com/tidwall/gjson v1.18.0
gorm.io/driver/postgres v1.5.9
gorm.io/gorm v1.25.12
)
require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/holiman/uint256 v1.3.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/supranational/blst v0.3.13 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
This diff is collapsed.
package middleware
import (
"bytes"
"io"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
func PrintRequestResponseBodyMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
// 读取请求 body
var requestBody []byte
if c.Request.Body != nil {
requestBody, _ = io.ReadAll(c.Request.Body)
c.Request.Body = io.NopCloser(bytes.NewBuffer(requestBody))
}
log.WithFields(log.Fields{"method": c.Request.Method, "uri": c.Request.RequestURI, "body": string(requestBody)}).Debug("request body")
bodyWriter := &responseBodyWriter{body: bytes.NewBufferString(""), ResponseWriter: c.Writer}
c.Writer = bodyWriter
c.Next()
responseBody := bodyWriter.body.String()
log.WithFields(log.Fields{"status": c.Writer.Status(), "body": responseBody}).Debug("response body")
}
}
type responseBodyWriter struct {
gin.ResponseWriter
body *bytes.Buffer
}
func (r *responseBodyWriter) Write(b []byte) (int, error) {
r.body.Write(b)
return r.ResponseWriter.Write(b)
}
package model
import (
"encoding/json"
)
type ExecTask struct {
Id string
UserId string
AppKey string
ModelIds json.RawMessage
ModelIdsArray []string `gorm:"-"`
}
package monitor
import (
"contract_backend/config"
. "contract_backend/constant"
"contract_backend/dao"
"contract_backend/model"
"crypto/ecdsa"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
log "github.com/sirupsen/logrus"
)
type Monitor struct {
conf *config.Config
d *dao.Dao
privateKey *ecdsa.PrivateKey
}
func NewMonitor(_conf *config.Config, _dao *dao.Dao) *Monitor {
k, err := crypto.HexToECDSA(common.Bytes2Hex(common.FromHex(_conf.Chain.ExecutorPrivateKey)))
if err != nil {
panic(err)
}
return &Monitor{
conf: _conf,
d: _dao,
privateKey: k,
}
}
func (m *Monitor) Start() {
for {
tasks, err := m.d.GetUnprocessedTasks(1)
if err != nil {
log.WithError(err).Error("get unprocessed tasks error")
continue
}
if len(tasks) == 0 {
time.Sleep(2 * time.Second)
continue
}
log.WithField("tasks", len(tasks)).Info("get unprocessed tasks")
for _, task := range tasks {
m.ProcessOne(task)
}
return
}
}
func (m *Monitor) ProcessOne(task *model.ExecTask) {
err := m.d.UpdateTask(task.Id, "", TxStatusPending)
if err != nil {
log.WithFields(log.Fields{
"id": task.Id,
"err": err.Error(),
}).Error("update task error")
return
}
txHash, err := m.d.ExecuteTask(m.privateKey, task.Id, task.UserId, task.AppKey, task.ModelIdsArray, big.NewInt(int64(m.conf.Chain.ModelPrice)))
if err != nil {
log.WithFields(log.Fields{
"id": task.Id,
"err": err.Error(),
}).Error("execute task error")
err = m.d.UpdateTask(task.Id, "", TxStatusFailed)
return
}
receipt, err := m.d.WaitForReceipt(txHash)
if err != nil {
log.WithFields(log.Fields{
"id": task.Id,
"err": err.Error(),
}).Error("wait for receipt error")
err = m.d.UpdateTask(task.Id, txHash.String(), TxStatusUnknown)
}
if receipt.Status == 1 {
err = m.d.UpdateTask(task.Id, txHash.String(), TxStatusSuccess)
} else {
err = m.d.UpdateTask(task.Id, txHash.String(), TxStatusRevert)
}
if err != nil {
log.WithFields(log.Fields{
"id": task.Id,
"err": err.Error(),
}).Error("update task error")
return
}
}
package server
import (
"contract_backend/middleware"
"github.com/gin-gonic/gin"
)
func initRouter(e *gin.Engine) {
e.Use(middleware.PrintRequestResponseBodyMiddleware())
webhook := e.Group("/webhook")
webhook.POST("/handleUserChange", userChange)
}
package server
import (
"contract_backend/config"
"contract_backend/dao"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
var conf *config.Config
var d *dao.Dao
func StartServer(_conf *config.Config, _da *dao.Dao) {
conf = _conf
d = _da
if !conf.Debug {
gin.SetMode(gin.ReleaseMode)
}
engine := gin.Default()
_cors := cors.DefaultConfig()
_cors.AllowAllOrigins = true
_cors.AllowHeaders = []string{"*"}
engine.Use(cors.New(_cors))
initRouter(engine)
log.Infof("start http server listening %s", conf.Server.Listen)
if err := engine.Run(conf.Server.Listen); err != nil {
log.Error("http server run error: ", err)
}
}
func withSuccess(obj interface{}) interface{} {
return gin.H{
"code": 0,
"msg": "ok",
"data": obj,
}
}
func withError(msg string) interface{} {
return gin.H{
"code": 1,
"error": msg,
"data": "",
}
}
package server
import (
. "contract_backend/constant"
"contract_backend/util"
"crypto/ecdsa"
"io"
"math/big"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
func taskChange(c *gin.Context) {
// 获取原始json数据
rawJson, err := io.ReadAll(c.Request.Body)
if err != nil {
log.WithError(err).Error("read body error")
c.JSON(200, withError(InvalidParam))
return
}
// 判断是插入还是update
table := gjson.Get(string(rawJson), "table").String()
if table != "exec_task" {
log.WithField("table", table).Error("wrong table")
c.JSON(200, withError(InvalidParam))
return
}
var userId string
var taskId string
var appId string
var modelIds []string
var key *ecdsa.PrivateKey
operation := gjson.Get(string(rawJson), "operation").String()
switch operation {
case "UPDATE":
userId = gjson.Get(string(rawJson), "new_row.user_id").String()
taskId = gjson.Get(string(rawJson), "new_row.id").String()
appId = gjson.Get(string(rawJson), "new_row.app_key").String()
tmp := gjson.Get(string(rawJson), "new_row.model_ids").Array()
for _, v := range tmp {
modelIds = append(modelIds, v.String())
}
key, err = util.GetPrivKeyByPriv(conf.Chain.XPrivateKey, userId)
if err != nil {
log.WithError(err).Error("get private key error")
c.JSON(200, withError(InternalError))
return
}
default:
log.WithField("operation", operation).Error("wrong operation")
c.JSON(200, withError(InvalidParam))
return
}
oldStatus := gjson.Get(string(rawJson), "old_row.status").Int()
newStatus := gjson.Get(string(rawJson), "new_row.status").Int()
if !(oldStatus == 0 && newStatus == 1) {
c.JSON(200, withSuccess(""))
return
}
dbTxHash := gjson.Get(string(rawJson), "new_row.tx_hash").String()
txStatus := gjson.Get(string(rawJson), "new_row.tx_status").Int()
if dbTxHash != "" || txStatus != 0 {
log.WithField("txHash", dbTxHash).WithField("txStatus", txStatus).Error("already processed")
c.JSON(200, withError(InternalError))
return
}
txHash, err := d.ExecuteTask(key, taskId, userId, appId, modelIds, big.NewInt(10))
if err != nil {
log.WithError(err).Error("execute task error")
c.JSON(200, withError(InternalError))
return
}
err = d.UpdateTask(taskId, txHash.String(), 0)
if err != nil {
log.WithError(err).Error("update task error")
c.JSON(200, withError(InternalError))
return
}
c.JSON(200, withSuccess(txHash.String()))
}
package server
import (
. "contract_backend/constant"
"contract_backend/util"
"crypto/ecdsa"
"fmt"
"io"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
func userChange(c *gin.Context) {
// 获取原始json数据
rawJson, err := io.ReadAll(c.Request.Body)
if err != nil {
log.WithError(err).Error("read body error")
c.JSON(200, withError(InvalidParam))
return
}
// 判断是插入还是update
table := gjson.Get(string(rawJson), "table").String()
if table != "users" {
log.WithField("table", table).Error("wrong table")
c.JSON(200, withError(InvalidParam))
return
}
var userId string
var key *ecdsa.PrivateKey
operation := gjson.Get(string(rawJson), "operation").String()
switch operation {
case "UPDATE":
userId = gjson.Get(string(rawJson), "new_row.id").String()
key, err = util.GetPrivateKeyByUserId(conf.Chain.XPrivateKey, userId)
if err != nil {
log.WithError(err).Error("get private key error")
c.JSON(200, withError(InternalError))
return
}
default:
log.WithField("operation", operation).Error("wrong operation")
c.JSON(200, withError(InvalidParam))
return
}
needLogin := true
if operation == "UPDATE" {
lastLogin := gjson.Get(string(rawJson), "old_row.last_sign_in_at").String()
newLogin := gjson.Get(string(rawJson), "new_row.last_sign_in_at").String()
if lastLogin == newLogin {
needLogin = false
} else if lastLogin != "" {
fmt.Println(lastLogin)
loginTime, err := time.Parse(lastLogin, "2006-01-02T15:04:05.999999+07:00")
if err != nil {
log.WithError(err).Error("parse time error")
c.JSON(200, withError(InternalError))
return
}
// 如果loginTime是0点之前
todayUnix := time.Now().Unix() % 86400
lastLoginUnix := loginTime.Unix() % 86400
if lastLoginUnix >= todayUnix {
needLogin = false
}
}
}
if !needLogin {
c.JSON(200, withSuccess(""))
return
}
// 需要登录
userAddress := crypto.PubkeyToAddress(key.PublicKey)
val, err := d.EstimateGasRecharge(userAddress)
if err != nil {
log.WithError(err).Error("estimate gas error")
c.JSON(200, withError(InternalError))
return
}
if val != nil {
// 需要充值
txHash, err := d.TransferETH(conf.Chain.GasSenderPrivateKey, userAddress, val)
if err != nil {
log.WithError(err).Error("transfer eth error")
c.JSON(200, withError(InternalError))
return
}
receipt, err := d.WaitForReceipt(txHash)
if err != nil {
log.WithError(err).Error("wait for receipt error")
c.JSON(200, withError(InternalError))
return
}
if receipt.Status != 1 {
log.WithField("receipt", receipt).Error("receipt status error")
c.JSON(200, withError(InternalError))
return
}
log.WithFields(log.Fields{
"userId": userId,
"address": userAddress.String(),
"txHash": txHash,
"val": val.String(),
}).Info("user recharge success")
}
txHash, err := d.Login(key, userId, "")
if err != nil {
log.WithError(err).Error("call ca login error")
c.JSON(200, withError(InternalError))
return
}
log.WithFields(log.Fields{
"userId": userId,
"txHash": txHash,
}).Info("user login")
c.JSON(200, withSuccess(""))
}
CREATE
OR REPLACE FUNCTION auth.userChange () RETURNS TRIGGER SECURITY definer AS $$
BEGIN
IF new.last_sign_in_at IS NOT NULL OR old.last_sign_in_at IS NOT NULL THEN
perform "net"."http_post"(
url := 'http://webhook.atqg.top/webhook/handleUserChange'::text,
body := jsonb_build_object(
'old_row', jsonb_build_object(
'id', old.id,
'last_sign_in_at', old.last_sign_in_at
),
'new_row', jsonb_build_object(
'id', new.id,
'last_sign_in_at', new.last_sign_in_at
),
'table', TG_TABLE_NAME,
'operation', TG_OP
),
headers:='{"Content-Type": "application/json"}'::jsonb
) as request_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE
OR REPLACE TRIGGER userChange
AFTER
UPDATE ON auth.users FOR EACH ROW
EXECUTE FUNCTION auth.userChange();
CREATE
OR REPLACE FUNCTION public.taskChange () RETURNS TRIGGER SECURITY definer AS $$
BEGIN
perform "net"."http_post"(
url := 'http://webhook.atqg.top/webhook/handleTaskChange'::text,
body := jsonb_build_object(
'old_row', jsonb_build_object(
'id', old.id,
'user_id', old.user_id,
'app_key', old.app_key,
'model_ids', old.model_ids,
'status', old.status,
'tx_hash', old.tx_hash,
'tx_status', old.tx_status
),
'new_row', jsonb_build_object(
'id', new.id,
'user_id', new.user_id,
'app_key', new.app_key,
'model_ids', new.model_ids,
'status', new.status,
'tx_hash', new.tx_hash,
'tx_status', new.tx_status
),
'table', TG_TABLE_NAME,
'operation', TG_OP
),
headers:='{"Content-Type": "application/json"}'::jsonb
) as request_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.taskChange()
RETURNS TRIGGER
SECURITY DEFINER AS $$
BEGIN
-- 只在状态从0变为1时执行
IF (TG_OP = 'UPDATE' AND OLD.status = 0 AND NEW.status = 1) THEN
UPDATE public.exec_task SET tx_status = 1 WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE
OR REPLACE TRIGGER taskChange
AFTER UPDATE ON public.exec_task FOR EACH ROW
EXECUTE FUNCTION public.taskChange();
package util
import (
"crypto/ecdsa"
"encoding/binary"
"fmt"
"strconv"
"strings"
"github.com/btcsuite/btcutil/hdkeychain"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/google/uuid"
)
func GetAddressByUserId(xpub, userId string) (output common.Address, err error) {
uid, err := uuid.Parse(userId)
if err != nil {
return common.Address{}, err
}
paths := make([]string, 0)
for i := 0; i < 16; i += 3 {
end := i + 3
if end > 16 {
end = 16
}
uint32Temp := binary.BigEndian.Uint32(common.LeftPadBytes(uid[i:end], 4))
paths = append(paths, fmt.Sprintf("%d", uint32Temp))
}
pubkey, err := GetPubKeyByPub(xpub, strings.Join(paths, "/"))
if err != nil {
return common.Address{}, err
}
return crypto.PubkeyToAddress(*pubkey), nil
}
func GetPrivateKeyByUserId(xpriv, userId string) (key *ecdsa.PrivateKey, err error) {
uid, err := uuid.Parse(userId)
if err != nil {
return nil, err
}
paths := make([]string, 0)
for i := 0; i < 16; i += 3 {
end := i + 3
if end > 16 {
end = 16
}
uint32Temp := binary.BigEndian.Uint32(common.LeftPadBytes(uid[i:end], 4))
paths = append(paths, fmt.Sprintf("%d", uint32Temp))
}
return GetPrivKeyByPriv(xpriv, strings.Join(paths, "/"))
}
func GetDepositAddress(input common.Address, xpub string) (output common.Address, err error) {
paths := make([]string, 7)
for i := 0; i < 7; i++ {
uint32Temp := uint32(0)
if i == 6 {
uint32Temp = binary.BigEndian.Uint32(common.LeftPadBytes(input.Bytes()[i*3:i*3+2], 4))
} else {
uint32Temp = binary.BigEndian.Uint32(common.LeftPadBytes(input.Bytes()[i*3:i*3+3], 4))
}
paths[i] = fmt.Sprintf("%d", uint32Temp)
}
pubkey, err := GetPubKeyByPub(xpub, strings.Join(paths, "/"))
if err != nil {
return common.Address{}, err
}
return crypto.PubkeyToAddress(*pubkey), nil
}
// pathList := parsePath(path)
// var next *hdkeychain.ExtendedKey
// for _, floor := range pathList {
// idx := floor[0]
// isHardened, _ := strconv.ParseBool(strconv.Itoa(floor[1]))
// next, err = nextFloor(xpub, isHardened, uint32(idx))
// if err != nil {
// return
// }
// xpub = next.String()
// }
// pk, err := next.ECPubKey()
// if err != nil {
// return
// }
// return pk.ToECDSA(), nil
func GetPrivKeyByPriv(xpriv string, path string) (privkey *ecdsa.PrivateKey, err error) {
pathList := parsePath(path)
var next *hdkeychain.ExtendedKey
for _, floor := range pathList {
idx := floor[0]
isHardened, _ := strconv.ParseBool(strconv.Itoa(floor[1]))
next, err = nextFloor(xpriv, isHardened, uint32(idx))
if err != nil {
return
}
xpriv = next.String()
}
pk, err := next.ECPrivKey()
if err != nil {
return
}
return pk.ToECDSA(), nil
}
func GetPubKeyByPub(xpub string, path string) (pubkey *ecdsa.PublicKey, err error) {
pathList := parsePath(path)
var next *hdkeychain.ExtendedKey
for _, floor := range pathList {
idx := floor[0]
isHardened, _ := strconv.ParseBool(strconv.Itoa(floor[1]))
next, err = nextFloor(xpub, isHardened, uint32(idx))
if err != nil {
return
}
xpub = next.String()
}
pk, err := next.ECPubKey()
if err != nil {
return
}
return pk.ToECDSA(), nil
}
// 返回一个二维数组 参数1 对应每一层偏移 参数2 1代表hardened 0普通
func parsePath(path string) [][]int {
l := strings.Split(path, "/")
var resList [][]int
// m开头或者/开头 去掉第一个
if l[0] == "m" || l[0] == "" {
l = l[1:]
}
// /结尾 去掉最后一个
if l[len(l)-1] == "" {
l = l[:len(l)-1]
}
for _, s := range l {
if strings.HasSuffix(s, "'") {
idx, _ := strconv.Atoi(s[:len(s)-1])
resList = append(resList, []int{idx, 1})
} else {
idx, _ := strconv.Atoi(s)
resList = append(resList, []int{idx, 0})
}
}
return resList
}
func nextFloor(key string, hardened bool, idx uint32) (*hdkeychain.ExtendedKey, error) {
key1, err := hdkeychain.NewKeyFromString(key)
if err != nil {
return nil, err
}
if hardened {
return key1.Child(hdkeychain.HardenedKeyStart + idx)
} else {
return key1.Child(idx)
}
}
package util
import (
"encoding/binary"
"fmt"
"strings"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
func TestXpriv(t *testing.T) {
xpri := "xprv9xwX8JDK1LBoHbfUiif8tmaj5dTGEnrYrDat8RUNowmbz2hCD8N1wydNao1TBhgTzvF1gojUwekF4J28BNo4mCVW81BY3XUwDCQhscSMaC3"
uid := "6fc7f308-af15-463f-b15b-fee798d3e296"
k, _ := GetPrivateKeyByUserId(xpri, uid)
fmt.Println(crypto.PubkeyToAddress(k.PublicKey))
}
func TestGenHDKey(t *testing.T) {
xpub := "xpub6C9QTBQW4NDiwahEdPibQbLUtnhfny3E464dgDXGDkWFdqnkddYicVqUvaeAe8QrmXoSqXrcKMxszf8DJ2xzaQhth3nAgoBwjfC1J8jYwFS"
// xpub := "xpub6C2ojpneBn4KHz1zaHpsyVHQMuJQbeDPbdkXywAUR43hXpjyQNcRv1ZQdvxnGmGKvXLoGPoN1G7cwfmW5CGZjPagpLnggXmqN52HhJk9F4B"
ret, err := GetAddressByUserId("001b6462-43e6-4e8e-88b9-9636c8dece0d", xpub)
t.Log(ret.String(), err)
return
// k, _ := GetPubKeyByPub(xpub, "0/1")
// addr := crypto.PubkeyToAddress(*k)
// t.Log(addr.String())
userA := common.HexToAddress("0xAaf459E071637dE222D0a9e3b439704f478ed767")
path := convertAddrToPath(userA.String())
st := time.Now()
k, _ := GetPubKeyByPub(xpub, path)
addr2 := crypto.PubkeyToAddress(*k)
t.Log(path, addr2.String())
t.Log(time.Since(st))
a, _ := GetDepositAddress(userA, xpub)
t.Log(a.String())
t.Log(addr2.String(), time.Since(st))
}
func convertAddrToPath(address string) string {
addrBytes := common.HexToAddress(address).Bytes()
// split to 8
addrByteArray := make([]string, 7)
for i := 0; i < 7; i++ {
uint32Temp := uint32(0)
if i == 6 {
// fmt.Println("tt", tt)
// binary.BigEndian.PutUint32(tt, addrByteArray[i])
// fmt.Println("after tt", addrByteArray[i])
uint32Temp = binary.BigEndian.Uint32(common.LeftPadBytes(addrBytes[i*3:i*3+2], 4))
} else {
// addrByteArray[i] = addrBytes[i*3 : i*3+3]
// binary.BigEndian.PutUint32(temp4Byte(addrBytes[i*3:i*3+3]), addrByteArray[i])
uint32Temp = binary.BigEndian.Uint32(common.LeftPadBytes(addrBytes[i*3:i*3+3], 4))
}
addrByteArray[i] = fmt.Sprintf("%d", uint32Temp)
}
fmt.Println(strings.Join(addrByteArray, "/"))
return strings.Join(addrByteArray, "/")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment