Commit 0cd18a9d authored by 贾浩@五瓣科技's avatar 贾浩@五瓣科技

update async job

parent d568bd33
......@@ -13,7 +13,7 @@ COPY ./ ./sdk-api/
FROM base AS build
RUN cd sdk-api && go mod tidy && go build -v -o /tmp/api ./cmd
RUN cd sdk-api && go mod tidy && go build -v -o /tmp/api ./cmd/api && go build -v -o /tmp/exec ./cmd/executor
FROM alpine
......@@ -23,4 +23,6 @@ COPY ./config.toml /config.toml
COPY --from=build /tmp/api /usr/bin/sdk_api
COPY --from=build /tmp/executor /usr/bin/sdk_executor
EXPOSE 8080
\ No newline at end of file
......@@ -4,10 +4,13 @@ GOBIN = $(shell pwd)/build/bin
default: all
all: sdk-api
all: sdk-api sdk-executor
sdk-api:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ ./cmd
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ ./cmd/api
sdk-executor:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ ./cmd/executor
dev:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd
......
......@@ -4,7 +4,6 @@ import (
"flag"
"sdk_api/config"
"sdk_api/dao"
"sdk_api/gassender"
"sdk_api/server"
"sdk_api/service"
......@@ -24,8 +23,6 @@ func main() {
panic(err)
}
gs := runGasSender(cfg)
da, err := dao.New(cfg)
if err != nil {
panic(err)
......@@ -35,18 +32,7 @@ func main() {
log.SetLevel(log.DebugLevel)
}
svs := service.New(cfg, da, gs)
svs := service.New(cfg, da)
server.StartServer(svs, cfg)
}
func runGasSender(cfg *config.Config) *gassender.GasSender {
gs, err := gassender.NewGasSender(cfg.GasSender.RPC, cfg.GasSender.PrivateKey, cfg.GasSender.UserContract)
if err != nil {
panic(err)
}
gs.Run()
// gs.AonLogin(common.HexToAddress("11111"), "1", "1")
// gs.SendAONGas(common.HexToAddress("11111").Hex(), 1)
return gs
}
package main
import (
"flag"
"sdk_api/config"
"sdk_api/dao"
exec "sdk_api/executor"
log "github.com/sirupsen/logrus"
)
func init() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
func main() {
flag.Parse()
cfg, err := config.New()
if err != nil {
panic(err)
}
da, err := dao.New(cfg)
if err != nil {
panic(err)
}
if cfg.Debug {
log.SetLevel(log.DebugLevel)
}
tasker := exec.NewExecutor(cfg, da)
tasker.Start()
select {}
}
......@@ -13,11 +13,11 @@ max_idle_conn = 2
[server]
listen = "0.0.0.0:8080"
[gas_sender]
[chain]
rpc = "https://sepolia.rpc.aonnet.io"
user_contract = "0x64ea0CC733f9B899545aE454f1890b5eb512560F"
# 0x0000000077024042e797Ae28A163C27E389CC5b2
private_key = "39494cd233573c94d6b4d24847f2f4d5da9d0b384b61f3ad4fae9abd5c48e6fc"
sender_private_key = "39494cd233573c94d6b4d24847f2f4d5da9d0b384b61f3ad4fae9abd5c48e6fc"
[tg_bot]
tokens = [
......
......@@ -4,14 +4,16 @@ import (
"flag"
"github.com/BurntSushi/toml"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
type Config struct {
Debug bool `toml:"debug"`
MySQL MysqlConfig `toml:"mysql"`
Server ServerConfig `toml:"server"`
GasSender GasSenderConfig `toml:"gas_sender"`
TGBot TGBotConfig `toml:"tg_bot"`
Chain ChainConfig `toml:"chain"`
}
type MysqlConfig struct {
......@@ -28,20 +30,23 @@ type ServerConfig struct {
Listen string `toml:"listen"`
}
type GasSenderConfig struct {
PrivateKey string `toml:"private_key"`
UserContract string `toml:"user_contract"`
RPC string `toml:"rpc"`
}
type TGBotConfig struct {
Tokens []string `toml:"tokens"`
}
type ChainConfig struct {
SenderPrivateKey string `toml:"sender_private_key"`
SenderAddress string `toml:"sender_address"`
UserContract string `toml:"user_contract"`
RPC string `toml:"rpc"`
}
var confPath = flag.String("c", "config.toml", "config file path")
func New() (config *Config, err error) {
config = new(Config)
_, err = toml.DecodeFile(*confPath, config)
config.Chain.SenderAddress = crypto.PubkeyToAddress(crypto.ToECDSAUnsafe(common.FromHex(config.Chain.SenderPrivateKey)).PublicKey).Hex()
return
}
package constant
import (
"math/big"
)
const JwtSecret = "cxcZa005Y5zWH1wFgXvPGDL02Ey4ZCLA"
const (
......@@ -12,3 +16,19 @@ const (
PlatformTelegram = "telegram"
PlatformFingerprint = "fingerprint"
)
var (
ZeroValue = big.NewInt(0)
Gwei = big.NewInt(1000000000)
Ether = big.NewInt(1000000000000000000)
)
const (
TransactionNotExecuted = 0
TransactionPending = 1
TransactionSuccessful = 2
TransactionFailed = 3
TransactionNotFound = 4
TransactionBroadcastError = 5
TransactionReceiptError = 6
)
package dao
import (
"context"
"fmt"
"math/big"
"sdk_api/config"
dbModel "sdk_api/model/db"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
......@@ -15,12 +18,24 @@ import (
type Dao struct {
c *config.Config
db *gorm.DB
ethClient *ethclient.Client
chainId *big.Int
}
func New(_c *config.Config) (dao *Dao, err error) {
dao = &Dao{
c: _c,
}
dao.ethClient, err = ethclient.Dial(_c.Chain.RPC)
if err != nil {
return
}
dao.chainId, err = dao.ethClient.ChainID(context.Background())
if err != nil {
panic(fmt.Sprintf("failed to get l1 chain id %+v", err))
}
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True",
_c.MySQL.User, _c.MySQL.Password, _c.MySQL.Host, _c.MySQL.Port, _c.MySQL.Database)
dao.db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
......@@ -39,7 +54,7 @@ func New(_c *config.Config) (dao *Dao, err error) {
sqlDB.SetMaxOpenConns(_c.MySQL.MaxConn)
sqlDB.SetMaxIdleConns(_c.MySQL.MaxIdleConn)
sqlDB.SetConnMaxIdleTime(time.Hour)
err = dao.db.AutoMigrate(&dbModel.User{})
err = dao.db.AutoMigrate(&dbModel.User{}, &dbModel.Task{})
if err != nil {
return
}
......
package dao
import (
"sdk_api/constant"
dbModel "sdk_api/model/db"
"gorm.io/gorm"
......@@ -47,3 +48,38 @@ func (d *Dao) SetKeystore(uid, address, keystore string) (err error) {
Updates(map[string]interface{}{"keystore": keystore, "address": address}).
Error
}
func (d *Dao) CreateTask(task *dbModel.Task) (err error) {
return d.db.Clauses(clause.OnConflict{DoNothing: true}).Create(task).Error
}
func (d *Dao) UpdateTaskStatus(taskId, txHash string, status int) (err error) {
return d.db.Model(&dbModel.Task{}).Where("task_id = ?", taskId).Updates(map[string]interface{}{
"tx_hash": txHash,
"tx_status": status,
}).Error
}
func (d *Dao) GetTxHashesByStatus(status int) (txHashes []string, err error) {
err = d.db.Model(&dbModel.Task{}).Select("tx_hash").Where("tx_status = ?", status).Limit(10).Scan(&txHashes).Error
if err != nil {
return
}
return
}
func (d *Dao) GetNotExecutedTasks() (tasks []*dbModel.Task, err error) {
err = d.db.Model(&dbModel.Task{}).
Where("tx_status = ?", constant.TransactionNotExecuted).
Limit(10).
Find(&tasks).Error
return
}
func (d *Dao) GetTaskStatus(taskId string) (status int, err error) {
err = d.db.Model(&dbModel.Task{}).Select("tx_status").Where("task_id = ?", taskId).First(&status).Error
if err == gorm.ErrRecordNotFound {
return -1, nil
}
return
}
package dao
import (
"context"
"errors"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
log "github.com/sirupsen/logrus"
)
func (d *Dao) BroadcastTx(to common.Address, nonce uint64, value *big.Int, calldata []byte) (txHash common.Hash, send bool, err error) {
// todo 未来改成并发可以传privatekey进来
ecdsaKey := crypto.ToECDSAUnsafe(common.FromHex(d.c.Chain.SenderPrivateKey))
tx := &types.LegacyTx{
Nonce: nonce,
GasPrice: big.NewInt(1000000000),
Gas: 2000000,
To: &to,
Value: value,
Data: calldata,
}
signer := types.NewEIP155Signer(d.chainId)
signedTx, err := types.SignNewTx(ecdsaKey, signer, tx)
if err != nil {
log.WithError(err).Error("sign tx failed")
return
}
err = d.ethClient.SendTransaction(context.Background(), signedTx)
if err != nil {
return
}
return signedTx.Hash(), true, nil
}
func (d *Dao) WaitForReceipt(txHash common.Hash, timeout ...time.Duration) (receipt *types.Receipt, err error) {
ctx := context.Background()
if len(timeout) > 0 {
ctx, _ = context.WithTimeout(ctx, timeout[0])
}
queryTicker := time.NewTicker(time.Second)
defer queryTicker.Stop()
for {
receipt, err := d.ethClient.TransactionReceipt(ctx, txHash)
if err == nil {
return receipt, nil
}
if !errors.Is(err, ethereum.NotFound) {
return nil, err
}
// Wait for the next round.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-queryTicker.C:
}
}
}
func (d *Dao) GetNonce(address common.Address) (uint64, error) {
return d.ethClient.NonceAt(context.Background(), address, nil)
}
version: "3.5"
version: "3.9"
networks:
default:
name: aon-sdk
......@@ -21,7 +21,8 @@ services:
start_period: 1s
sdk-api:
image: caduceus/ai-sdk-api:v0.0.2
image: caduceus/ai-sdk-api:v0.0.7
pull_policy: always
container_name: sdk-api
ports:
- "16666:8080"
......@@ -38,6 +39,23 @@ services:
restart:
unless-stopped
sdk-executor:
image: caduceus/ai-sdk-api:v0.0.7
pull_policy: always
container_name: sdk-executor
depends_on:
sdk-db:
condition: service_healthy
volumes:
- ./conf/sdk-api/config.toml:/config.toml
command:
- "/bin/sh"
- "-c"
- "/usr/bin/sdk_executor -c /config.toml"
restart:
unless-stopped
sdk-kv-api:
image: caduceus/ai-sdk-kv-api:v0.0.1
container_name: sdk-kv-api
......
package exec
import (
"math/big"
"sdk_api/config"
"sdk_api/constant"
"sdk_api/dao"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
log "github.com/sirupsen/logrus"
)
type Executor struct {
c *config.Config
d *dao.Dao
nonces map[common.Address]uint64
sync.Mutex
}
func NewExecutor(_c *config.Config, _d *dao.Dao) *Executor {
t := &Executor{
c: _c,
d: _d,
}
nonce, err := _d.GetNonce(common.HexToAddress(_c.Chain.SenderAddress))
if err != nil {
panic(err)
}
log.WithFields(log.Fields{"nonce": nonce, "addr": _c.Chain.SenderAddress}).Info("get nonce from chain")
t.nonces = make(map[common.Address]uint64)
t.nonces[common.HexToAddress(_c.Chain.SenderAddress)] = nonce
return t
}
func (e *Executor) Start() {
e.ProcessReceipt(true)
go e.ProcessTransaction()
go e.ProcessReceipt(false)
}
func (e *Executor) ProcessReceipt(sync bool) {
// sync:true 启动时运行
log.WithField("sync", sync).Info("start process receipt")
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
txHashes, err := e.d.GetTxHashesByStatus(constant.TransactionNotExecuted)
if err != nil {
log.WithError(err).Error("get txHashes failed")
time.Sleep(time.Millisecond * 500)
continue
}
if len(txHashes) == 0 && sync {
log.Info("no tx to process receipt, stop")
return
}
for _, txHash := range txHashes {
var status int
timeout := 30 * time.Second
if sync {
timeout = 5 * time.Second
}
receipt, err := e.d.WaitForReceipt(common.HexToHash(txHash), timeout)
if err != nil {
log.WithFields(log.Fields{
"txHash": txHash,
"err": err,
"sync": sync,
}).Error("wait receipt failed")
status = constant.TransactionReceiptError
} else if receipt == nil {
status = constant.TransactionNotFound
} else if receipt.Status == 1 {
status = constant.TransactionSuccessful
} else if receipt.Status == 0 {
status = constant.TransactionFailed
}
log.WithFields(log.Fields{"txHash": txHash, "status": status}).Info("get tx receipt")
err = e.d.UpdateTaskStatus(txHash, txHash, status)
if err != nil {
log.WithFields(log.Fields{
"txHash": txHash,
"err": err,
"sync": sync,
}).Error("update task status failed")
}
}
<-ticker.C
}
}
func (e *Executor) ProcessTransaction() {
log.Info("start process transaction")
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
tasks, err := e.d.GetNotExecutedTasks()
if err != nil {
log.WithError(err).Error("get not executed tasks failed")
time.Sleep(time.Millisecond * 500)
continue
}
for _, task := range tasks {
e.Lock()
nonce := e.nonces[common.HexToAddress(e.c.Chain.SenderAddress)]
e.Unlock()
val, _ := new(big.Int).SetString(task.Value, 10)
txHash, sent, err := e.d.BroadcastTx(
common.HexToAddress(task.ToAddress),
nonce,
val,
common.FromHex(task.Calldata),
)
if sent {
e.Lock()
e.nonces[common.HexToAddress(e.c.Chain.SenderAddress)]++
e.Unlock()
}
// 交易成功
if err == nil {
err = e.d.UpdateTaskStatus(task.TaskId, txHash.String(), constant.TransactionPending)
if err != nil {
log.WithFields(log.Fields{"taskId": task.TaskId, "err": err}).Error("update task status failed")
continue
}
log.WithFields(log.Fields{"taskId": task.TaskId, "action": task.Action, "txHash": txHash}).Info("broadcast tx success")
continue
}
// 交易失败
log.WithFields(log.Fields{"taskId": task.TaskId, "err": err}).Error("broadcast tx failed")
err = e.d.UpdateTaskStatus(task.TaskId, "", constant.TransactionBroadcastError)
if err != nil {
log.WithFields(log.Fields{"taskId": task.TaskId, "err": err}).Error("update task status failed")
}
}
<-ticker.C
}
}
......@@ -10,6 +10,7 @@ type CheckUserResponse struct {
IsNewUser bool `json:"isNewUser"`
Keystore string `json:"keystore"`
Token string `json:"token"`
TaskId string `json:"task_id"`
}
type CreateUserRequest struct {
......
......@@ -6,10 +6,23 @@ import (
type User struct {
Id int `gorm:"primaryKey"`
Uid string `gorm:"type:varchar(255);uniqueIndex;not null;column:uid;comment:用户id"`
Platform string `gorm:"type:varchar(255);uniqueIndex:platform_id;not null;column:platform;comment:所在平台,telegram,fingerprint"`
PlatformId string `gorm:"type:varchar(255);uniqueIndex:platform_id;not null;column:platform_id;comment:所在平台id,telegram时id,指纹时hash"`
Address string `json:"type:varchar(255);index;column:address;comment:地址"`
Keystore string `gorm:"type:text;column:keystore;comment:keystore"`
Uid string `gorm:"type:varchar(255);uniqueIndex;not null;comment:用户id"`
Platform string `gorm:"type:varchar(255);uniqueIndex:platform_id;not null;comment:所在平台,telegram,fingerprint"`
PlatformId string `gorm:"type:varchar(255);uniqueIndex:platform_id;not null;comment:所在平台id,telegram时id,指纹时hash"`
Address string `json:"type:varchar(255);index;comment:地址"`
Keystore string `gorm:"type:text;comment:keystore"`
gorm.Model
}
type Task struct {
Id int `gorm:"primaryKey"`
TaskId string `gorm:"type:varchar(255);uniqueIndex;not null;comment:任务id"`
Uid string `gorm:"type:varchar(255);index;not null;comment:用户id"`
Action string `gorm:"type:varchar(255);not null;comment:动作"`
ToAddress string `gorm:"type:varchar(255);not null;comment:目标地址"`
Value string `gorm:"type:varchar(255);not null;comment:金额"`
Calldata string `gorm:"type:varchar(2048);not null;comment:参数"`
TxHash string `gorm:"type:varchar(255);not null;comment:交易hash"`
TxStatus int `gorm:"type:int;not null;column:comment:交易状态"`
gorm.Model
}
# sdk-api
## telegram
验证,请求入库
## email
# sdk-executor
## sms
遍历数据库,执行交易,获取回执
## fingerprint
\ No newline at end of file
# sdk-kv-api
kv 存储服务
\ No newline at end of file
......@@ -17,5 +17,8 @@ func initRouter(e *gin.Engine) {
user.POST("/serverLogin", login)
user.POST("/create", middleware.JWTMiddleware, createUser)
}
{
v1.GET("/task/status", getTaskStatus)
}
}
package server
import (
"sdk_api/constant"
"github.com/gin-gonic/gin"
)
func getTaskStatus(c *gin.Context) {
taskId := c.Query("taskId")
if taskId == "" {
c.JSON(200, withError(constant.InvalidParam))
return
}
status, err := srv.GetTaskStatus(taskId)
if err != nil {
c.JSON(200, withError(constant.InternalError))
return
}
c.JSON(200, withSuccess(gin.H{
"status": status,
}))
}
......@@ -193,7 +193,12 @@ func login(c *gin.Context) {
resp.Keystore = req.Keystore
}
srv.AONServerLogin(address, req.UserId, req.InviterId)
taskId, err := srv.AONServerLogin(address, req.UserId, req.InviterId)
if err != nil {
c.JSON(200, withError(constant.InternalError))
return
}
resp.TaskId = taskId
c.JSON(200, withSuccess(resp))
return
}
......@@ -3,19 +3,16 @@ package service
import (
"sdk_api/config"
"sdk_api/dao"
"sdk_api/gassender"
)
type Service struct {
d *dao.Dao
cfg *config.Config
gs *gassender.GasSender
}
func New(conf *config.Config, da *dao.Dao, g *gassender.GasSender) *Service {
func New(conf *config.Config, da *dao.Dao) *Service {
return &Service{
d: da,
cfg: conf,
gs: g,
}
}
package service
import (
log "github.com/sirupsen/logrus"
)
func (s *Service) GetTaskStatus(taskId string) (status int, err error) {
status, err = s.d.GetTaskStatus(taskId)
if err != nil {
log.WithError(err).Error("get task status failed")
return
}
return
}
package service
import (
"encoding/hex"
"sdk_api/constant"
"sdk_api/contract/aonUser"
dbModel "sdk_api/model/db"
"sdk_api/util"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
......@@ -49,11 +54,49 @@ func (s *Service) SetKeystore(uid, address, keystore string) (ok bool, err error
return true, nil
}
func (s *Service) AONSendGas(address string) {
s.gs.SendAONGas(address, 1)
func (s *Service) AONSendGas(address string) (err error) {
sendGasTask := &dbModel.Task{
TaskId: util.GenUUID(),
Action: "sendGas",
ToAddress: address,
Value: constant.Ether.String(),
}
err = s.d.CreateTask(sendGasTask)
if err != nil {
log.WithError(err).Error("create task failed")
return
}
return
}
func (s *Service) AONServerLogin(address, userId, inviter string) {
s.gs.AonLogin(common.HexToAddress(address), userId, inviter)
func (s *Service) AONServerLogin(address, userId, inviter string) (taskId string, err error) {
abi, _ := aonUser.AonUserMetaData.GetAbi()
calldata, err := abi.Pack("loginByServer", common.HexToAddress(address), userId, inviter)
if err != nil {
log.WithError(err).Error("pack calldata failed")
log.WithFields(log.Fields{
"address": address,
"userId": userId,
"inviter": inviter,
"error": err.Error(),
}).Error("pack calldata failed")
return
}
serverLoginTask := &dbModel.Task{
TaskId: util.GenUUID(),
Uid: userId,
ToAddress: strings.ToLower(s.cfg.Chain.UserContract),
Action: "serverLogin",
Value: constant.ZeroValue.String(),
Calldata: hex.EncodeToString(calldata),
}
err = s.d.CreateTask(serverLoginTask)
if err != nil {
log.WithError(err).Error("create task failed")
return
}
return serverLoginTask.TaskId, nil
}
package util
import (
"github.com/google/uuid"
)
func GenUUID() string {
return uuid.New().String()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment