Commit 059ac74f authored by 贾浩@五瓣科技's avatar 贾浩@五瓣科技

init

parents
Pipeline #748 canceled with stages
.idea
*.iml
out
gen
*.sol
*.txt
.DS_Store
*.exe
FROM golang:1.21-alpine AS base
# Set up dependencies
ENV PACKAGES git openssh-client build-base
# Install dependencies
RUN apk add --update $PACKAGES
# Add source files
RUN mkdir -p ./claim-monitor
COPY ./ ./claim-monitor/
RUN git clone https://code.wuban.net.cn/odysseus/odysseus-protocol.git
FROM base AS build
RUN cd claim-monitor && go mod tidy && go build -v -o /tmp/api ./cmd/api && go build -v -o /tmp/api ./cmd/sync
FROM alpine
WORKDIR /app
COPY ./config.toml /config.toml
VOLUME /config.toml
VOLUME /app
COPY --from=build /tmp/api /usr/bin/api
COPY --from=build /tmp/api /usr/bin/sync
EXPOSE 8080
.PHONY: default all clean dev
GOBIN = $(shell pwd)/build/bin
GOVERSION=$(shell go version | awk '{print $$3}')
GITHASH=$(shell git show -s --format=%H)
GITBRANCH=$(shell git symbolic-ref --short -q HEAD)
BUILDTIME=$(shell git show -s --format=%cd)
default: all
all: clean sync api
BUILD_FLAGS=-ldflags "\
-X 'validator/version.GOVersion=$(GOVERSION)' \
-X 'validator/version.GitHash=$(GITHASH)' \
-X 'validator/version.BuildTime=$(BUILDTIME)' \
-X 'validator/version.GitBranch=$(GITBRANCH)'"
api:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ ./cmd/api
sync:
go build $(BUILD_FLAGS) -v -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/sync
docker:
docker build -t validator:latest -f Dockerfile .
clean:
rm -rf build
package api
package main
import (
"claim-monitor/config"
"claim-monitor/dao"
"claim-monitor/server"
"claim-monitor/service"
"flag"
log "github.com/sirupsen/logrus"
)
func init() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
func main() {
flag.Parse()
conf, err := config.New()
if err != nil {
panic(err)
}
da, err := dao.New(conf)
if err != nil {
panic(err)
}
if conf.Debug {
log.SetLevel(log.DebugLevel)
}
// http server
srv := service.New(conf, da)
server.Run(srv, conf)
}
package main
import (
"claim-monitor/config"
"claim-monitor/dao"
"claim-monitor/sync"
"flag"
"time"
log "github.com/sirupsen/logrus"
)
func init() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
var migrate = flag.Bool("migrate", false, "migrate database")
func main() {
flag.Parse()
conf, err := config.New()
if err != nil {
panic(err)
}
conf.Mysql.Migrate = *migrate
da, err := dao.New(conf)
if err != nil {
panic(err)
}
if conf.Debug {
log.SetLevel(log.DebugLevel)
}
sy := sync.New(conf, da)
for {
sy.Start()
log.Error("sync error, retry after 20 seconds")
time.Sleep(time.Second * 20)
}
}
debug = true
# 单次获取数据量
batch_size = 99
[server]
# api服务监听
listen = '0.0.0.0:8080'
[chain]
# rpc
rpc = 'https://dev.rpc.agicoin.ai'
[mysql]
host = 'db'
port = 3306
user = 'root'
password = 'XN2UARuys3zy4Oux'
database = 'agi'
max_conn = 20
max_idle_conn = 10
\ No newline at end of file
package config
import (
"flag"
"github.com/BurntSushi/toml"
)
type Config struct {
Debug bool `toml:"debug"`
BatchSize int `toml:"batch_size"`
Chain ChainConfig `toml:"chain"`
Mysql MysqlConfig `toml:"mysql"`
Server ServerConfig `toml:"server"`
}
type ChainConfig struct {
RPC string `toml:"rpc"`
DistributionAddress string `toml:"distribution_address"`
}
type MysqlConfig struct {
Host string `toml:"host"`
Port int `toml:"port"`
User string `toml:"user"`
Password string `toml:"password"`
Database string `toml:"database"`
MaxConn int `toml:"max_conn"`
MaxIdleConn int `toml:"max_idle_conn"`
Migrate bool `toml:"migrate"`
}
type ServerConfig struct {
Listen string `toml:"listen"`
}
var confPath = flag.String("c", "config.toml", "config file path")
func New() (config *Config, err error) {
config = new(Config)
_, err = toml.DecodeFile(*confPath, config)
return
}
This diff is collapsed.
package dao
import (
"claim-monitor/config"
dbmodel "claim-monitor/model/db"
"fmt"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"gorm.io/driver/mysql"
_ "gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
)
type Dao struct {
c *config.Config
ethClient *ethclient.Client
ethRPC *rpc.Client
db *gorm.DB
}
func New(_c *config.Config) (dao *Dao, err error) {
dao = &Dao{
c: _c,
}
dao.ethClient, err = ethclient.Dial(_c.Chain.RPC)
if err != nil {
return
}
dao.ethRPC, err = rpc.Dial(_c.Chain.RPC)
if err != nil {
return
}
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True",
_c.Mysql.User, _c.Mysql.Password, _c.Mysql.Host, _c.Mysql.Port, _c.Mysql.Database)
dao.db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
},
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return
}
sqlDB, err := dao.db.DB()
if err != nil {
return
}
sqlDB.SetMaxOpenConns(_c.Mysql.MaxConn)
sqlDB.SetMaxIdleConns(_c.Mysql.MaxIdleConn)
sqlDB.SetConnMaxIdleTime(time.Hour)
if _c.Mysql.Migrate {
err = dao.db.AutoMigrate(&dbmodel.Height{}, &dbmodel.Record{})
if err != nil {
return
}
}
return dao, nil
}
package dao
import (
dbmodel "claim-monitor/model/db"
"strings"
"github.com/shopspring/decimal"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// CreateRecord 创建领取记录
func (d *Dao) CreateRecord(record *dbmodel.Record) (err error) {
record.Address = strings.ToLower(record.Address)
return d.db.Clauses(clause.OnConflict{DoNothing: true}).Create(record).Error
}
// GetRecord 获取领取记录
func (d *Dao) GetRecord(address string, page, pageSize int) (records []*dbmodel.Record, total int64, err error) {
address = strings.ToLower(address)
err = d.db.Model(&dbmodel.Record{}).Where("address = ?", address).Count(&total).Error
if err != nil {
return
}
err = d.db.Model(&dbmodel.Record{}).
Where("address = ?", address).
Order("`id` desc").
Offset((page - 1) * pageSize).
Limit(pageSize).
Find(&records).Error
return
}
func (d *Dao) GetClaimedAmount(address string) (amt *decimal.Decimal, err error) {
address = strings.ToLower(address)
amt = new(decimal.Decimal)
err = d.db.Model(&dbmodel.Record{}).
Where("address = ?", address).
Select("sum(claimed_amount) as claimed_amount").
Scan(amt).Error
return
}
// GetStorageHeight 获取上次缓存的高度
func (d *Dao) GetStorageHeight(key string) (value int, err error) {
storage := new(dbmodel.Height)
err = d.db.Model(storage).Where("`key` = ?", key).First(storage).Error
if err == gorm.ErrRecordNotFound {
return 1, nil
}
return storage.IntValue, err
}
// SetStorageHeight 设置上次缓存的高度
func (d *Dao) SetStorageHeight(key string, intValue int) (err error) {
ret := d.db.Model(&dbmodel.Height{}).Where("`key` = ?", key).Update("int_value", intValue)
if ret.Error != nil {
return
}
if ret.RowsAffected == 0 {
err = d.db.Create(&dbmodel.Height{
Key: key,
IntValue: intValue,
}).Error
}
return
}
package dao
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
func (d *Dao) GetBlockHeight(behindBlock ...int) (height int, err error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
n, err := d.ethClient.BlockNumber(ctx)
if len(behindBlock) > 0 {
n -= uint64(behindBlock[0])
if n < 0 {
n = 0
}
}
return int(n), err
}
func (d *Dao) GetLatestBockHash() (hash string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
block, err := d.ethClient.BlockByNumber(ctx, nil)
if err != nil {
return
}
return block.Hash().Hex(), nil
}
func (d *Dao) GetBlockTime(height int) (timestamp int, err error) {
for i := 0; i < 2; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
block, err := d.ethClient.BlockByNumber(ctx, big.NewInt(int64(height)))
if err == nil {
return int(block.Time()), nil
}
}
return
}
func (d *Dao) GetLogs(beginHeight, endHeight int, topics []string) (logs []types.Log, err error) {
for i := 0; i < 2; i++ {
// 重试2次
logs, err = d.getLogs(beginHeight, endHeight, topics)
if err == nil {
return logs, nil
}
}
return
}
func (d *Dao) getLogs(beginHeight, endHeight int, topics []string) (logs []types.Log, err error) {
q := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(beginHeight)),
ToBlock: big.NewInt(int64(endHeight)),
Topics: [][]common.Hash{{}},
}
for _, topic := range topics {
q.Topics[0] = append(q.Topics[0], common.HexToHash(topic))
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return d.ethClient.FilterLogs(ctx, q)
}
package dao
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
func (d *Dao) RequestMetadata(endpoint string) (isImage bool, format string, data []byte, err error) {
if strings.HasPrefix(endpoint, "ipfs://") {
endpoint = d.c.IPFS.Gateway + endpoint[7:]
}
_url, err := url.Parse(endpoint)
if err != nil {
return
}
req := &http.Request{
Method: http.MethodGet,
URL: _url,
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("request metadata error: status_code %s", resp.Status)
return false, "", nil, nil
}
// 暂时只判断json和image
contentType := resp.Header.Get("Content-Type")
_sps := strings.SplitN(contentType, "/", 2)
if len(_sps) != 2 {
err = fmt.Errorf("invalid content type: %s", contentType)
return
}
// 允许以下mime type
if !strings.HasPrefix(contentType, "image/") &&
!strings.HasPrefix(contentType, "application/json") &&
!strings.HasPrefix(contentType, "text/plain") &&
!strings.HasPrefix(contentType, "text/html") &&
!strings.HasPrefix(contentType, "application/octet-stream") {
err = fmt.Errorf("invalid content type: %s", contentType)
return
}
// 通过文件后缀确定格式
if contentType == "application/octet-stream" {
if strings.HasSuffix(_url.Path, ".jpg") ||
strings.HasSuffix(_url.Path, ".jpeg") ||
strings.HasSuffix(_url.Path, ".png") ||
strings.HasSuffix(_url.Path, ".gif") {
isImage = true
contentType = _url.Path[strings.LastIndex(_url.Path, ".")+1:]
} else {
err = fmt.Errorf("invalid content type: %s", contentType)
return false, "", nil, err
}
}
mime, format := _sps[0], _sps[1]
if mime == "image" {
isImage = true
}
data, err = io.ReadAll(resp.Body)
return
}
version: "3.5"
networks:
default:
name: claim-monitor
services:
db:
image: mysql:8
ports:
- "3306:3306"
volumes:
- ./data:/var/lib/mysql
- ./mysql-config:/etc/mysql
environment:
MYSQL_ROOT_PASSWORD: "XN2UARuys3zy4Oux"
MYSQL_DATABASE: "agi"
sync:
image: claim-monitor:latest
depends_on:
- db
command:
- "/bin/sh"
- "-c"
- "/usr/bin/sync -c config.toml --migrate"
api:
image: claim-monitor:latest
ports:
- "8080:8080"
depends_on:
- db
- sync
command:
- "/bin/sh"
- "-c"
- "/usr/bin/api -c config.toml"
module claim-monitor
go 1.21.4
require (
github.com/BurntSushi/toml v1.3.2
github.com/aws/aws-sdk-go v1.52.5
github.com/disintegration/imaging v1.6.2
github.com/ethereum/go-ethereum v1.14.2
github.com/gin-contrib/cors v1.7.2
github.com/gin-gonic/gin v1.10.0
github.com/sirupsen/logrus v1.9.3
github.com/tidwall/gjson v1.17.1
gorm.io/driver/mysql v1.5.6
gorm.io/gorm v1.25.10
)
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
This diff is collapsed.
package dbmodel
import (
"time"
"github.com/shopspring/decimal"
"gorm.io/gorm"
)
type Record struct {
ID uint `gorm:"primaryKey"`
TxHash string `gorm:"type:varchar(255);uniqueIndex;not null;comment:交易hash"`
Address string `gorm:"type:varchar(255);not null;comment:用户地址"`
ClaimedAmount decimal.Decimal `gorm:"type:decimal(65,0);not null;comment:领取数量"`
TotalAmount decimal.Decimal `gorm:"type:decimal(65,0);not null;comment:总奖励数量"`
Timestamp int64 `gorm:"not null;comment:领取区块时间"`
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt gorm.DeletedAt `gorm:"index"`
}
type Height struct {
Key string `gorm:"primaryKey"`
IntValue int `gorm:"type:int;not null"` // 配置value
}
package http
type GetRecordResponse struct {
Records []Record `json:"records"`
TotalCount int `json:"totalCount"`
}
type Record struct {
TxHash string `json:"txHash"`
Date string `json:"date"`
Timestamp int `json:"timestamp"`
Amount string `json:"amount"`
}
type GetClaimedResponse struct {
Claimed string `json:"claimed"`
}
# claim-monitor
\ No newline at end of file
package server
import (
"claim-monitor/config"
"claim-monitor/service"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
var srv *service.Service
var conf *config.Config
func Run(_srv *service.Service, _conf *config.Config) {
srv = _srv
conf = _conf
if !conf.Debug {
gin.SetMode(gin.ReleaseMode)
}
engine := gin.New()
engine.Use(gin.Recovery())
_cors := cors.DefaultConfig()
_cors.AllowAllOrigins = true
engine.Use(cors.New(_cors))
router(engine)
log.Infof("start http server listening %s", conf.Server.Listen)
if err := engine.Run(conf.Server.Listen); err != nil {
log.Error("http server run error: ", err)
}
}
package server
import (
"strconv"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gin-gonic/gin"
)
func getClaimed(c *gin.Context) {
address := c.Query("address")
if _, err := hexutil.Decode(address); err != nil || len(address) != 42 {
c.JSON(200, withError("invalid params"))
return
}
resp, err := srv.GetClaimed(address)
if err != nil {
c.JSON(200, withError("internal error"))
return
}
c.JSON(200, withSuccess(resp))
}
func getRecord(c *gin.Context) {
address := c.Query("address")
if _, err := hexutil.Decode(address); err != nil || len(address) != 42 {
c.JSON(200, withError("invalid params"))
return
}
pageSize, _ := strconv.Atoi(c.Query("pageSize"))
if pageSize > 100 || pageSize < 1 { // 默认每页100条
pageSize = 100
}
page, _ := strconv.Atoi(c.Query("page"))
if page < 1 {
page = 1
}
resp, err := srv.GetRecord(address, page, pageSize)
if err != nil {
c.JSON(200, withError("internal error"))
return
}
c.JSON(200, withSuccess(resp))
}
func withSuccess(data interface{}) interface{} {
return map[string]interface{}{
"code": 0,
"msg": "ok",
"data": data,
}
}
func withError(msg string) interface{} {
return map[string]interface{}{
"code": 1,
"msg": "",
"error": msg,
}
}
package server
import (
"github.com/gin-gonic/gin"
)
func router(router gin.IRouter) {
api := router.Group("/api/v1")
{
api.GET("/record", getRecord)
api.GET("/claimed", getClaimed)
}
}
package service
import (
"claim-monitor/model/http"
"fmt"
"math/big"
"time"
log "github.com/sirupsen/logrus"
)
func (srv *Service) GetRecord(address string, page, pageSize int) (resp *http.GetRecordResponse, err error) {
resp = new(http.GetRecordResponse)
records, totalCount, err := srv.d.GetRecord(address, page, pageSize)
if err != nil {
log.WithField("address", address).WithError(err).Error("failed to get contract")
return
}
ether := big.NewInt(1000000000000000000)
for _, record := range records {
amt, _ := big.NewFloat(0).Quo(
big.NewFloat(0).SetInt(record.ClaimedAmount.BigInt()),
big.NewFloat(0).SetInt(ether),
).Float64()
resp.Records = append(resp.Records, http.Record{
TxHash: record.TxHash,
Date: time.Unix(record.Timestamp, 0).UTC().Format("2006-01-02 15:04:05"),
Timestamp: int(record.Timestamp),
Amount: fmt.Sprintf("%.6f", amt),
})
}
resp.TotalCount = int(totalCount)
return
}
func (srv *Service) GetClaimed(address string) (resp *http.GetClaimedResponse, err error) {
resp = new(http.GetClaimedResponse)
claimed, err := srv.d.GetClaimedAmount(address)
if err != nil {
log.WithField("address", address).WithError(err).Error("failed to get claimed")
return
}
ether := big.NewInt(1000000000000000000)
amt, _ := big.NewFloat(0).Quo(
big.NewFloat(0).SetInt(claimed.BigInt()),
big.NewFloat(0).SetInt(ether),
).Float64()
resp.Claimed = fmt.Sprintf("%.6f", amt)
return
}
package service
import (
"claim-monitor/config"
"claim-monitor/dao"
)
type Service struct {
c *config.Config
d *dao.Dao
}
func New(_c *config.Config, _d *dao.Dao) (service *Service) {
service = &Service{
c: _c,
d: _d,
}
return service
}
package sync
import (
dbmodel "claim-monitor/model/db"
"strings"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"gorm.io/gorm"
)
func (s *Sync) SyncClaimed(beginHeight, endHeight int) {
if endHeight < 0 {
return
}
if beginHeight < 0 {
beginHeight = 0
}
claimTopics := []string{ClaimedTopic}
logs, err := s.d.GetLogs(beginHeight, endHeight, claimTopics)
if err != nil {
log.WithFields(log.Fields{"begin": beginHeight, "end": endHeight}).WithError(err).Error("rpc: get logs")
return
}
for _, txLog := range logs {
s.FilterClaimed(txLog)
}
}
func (s *Sync) FilterClaimed(txLog types.Log) {
if len(txLog.Topics) == 0 {
return
}
claimedLog, err := s.ca.ParseClaimed(txLog)
if err != nil {
log.WithError(err).Error("parse claimed log")
return
}
claimedAmount := decimal.NewFromBigInt(claimedLog.Amount, 0)
totalAmount := decimal.NewFromBigInt(claimedLog.TotalClaimedAmount, 0)
timestamp, err := s.d.GetBlockTime(int(txLog.BlockNumber))
if err != nil {
log.WithError(err).Error("get block time")
return
}
r := &dbmodel.Record{
TxHash: txLog.TxHash.Hex(),
Address: strings.ToLower(txLog.Address.String()),
ClaimedAmount: claimedAmount,
TotalAmount: totalAmount,
Timestamp: int64(timestamp),
CreatedAt: time.Time{},
UpdatedAt: time.Time{},
DeletedAt: gorm.DeletedAt{},
}
err = s.d.CreateRecord(r)
if err != nil {
log.WithError(err).Error("create record")
return
}
log.WithFields(log.Fields{
"tx": r.TxHash,
"address": r.Address,
"amount": r.ClaimedAmount,
}).Info("sync claimed record")
}
package sync
import (
"claim-monitor/config"
"claim-monitor/contract"
"claim-monitor/dao"
"flag"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
log "github.com/sirupsen/logrus"
)
var (
ClaimedTopic = crypto.Keccak256Hash([]byte("Claimed(address, uint256,uint256)")).Hex()
LastBlockKey = "last_block"
BehindBlock = 1
)
type Sync struct {
c *config.Config
d *dao.Dao
ca *contract.Distribution
}
var manualSync = flag.Int("sync", 0, "sync block height")
func New(_c *config.Config, _d *dao.Dao) (sync *Sync) {
sync = &Sync{
c: _c,
d: _d,
}
ca, err := contract.NewDistribution(common.HexToAddress(sync.c.Chain.DistributionAddress), nil)
if err != nil {
panic(err)
}
sync.ca = ca
return sync
}
func (s *Sync) Start() {
lastHeight, err := s.d.GetStorageHeight(LastBlockKey)
if err != nil {
log.WithError(err).Error("get last block height")
return
}
if lastHeight != 1 {
// 数据库里保存的是已完成的区块, 再次同步时+1
lastHeight++
}
if *manualSync > 0 {
lastHeight = *manualSync
}
log.WithField("height", lastHeight).Info("last sync block height")
var latestHeight int
var beginHeight = lastHeight
var endHeight = beginHeight + s.c.BatchSize
for {
latestHeight, err = s.d.GetBlockHeight(BehindBlock)
if err != nil {
log.WithError(err).Error("get latest block height")
return
}
if (latestHeight-s.c.BatchSize)-beginHeight < s.c.BatchSize+1 {
time.Sleep(20 * time.Second)
continue
}
s.SyncClaimed(beginHeight-s.c.BatchSize, endHeight-s.c.BatchSize)
if err = s.d.SetStorageHeight(LastBlockKey, endHeight); err != nil {
log.WithError(err).Error("set last block height")
}
beginHeight = endHeight + 1
endHeight = beginHeight + s.c.BatchSize
log.WithFields(log.Fields{
"begin height": beginHeight,
"end height": endHeight,
"latest height": latestHeight,
}).Info("sync block height")
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment