Commit 307d2e28 authored by vicotor's avatar vicotor

update code for task

parent 29a2b9e0
......@@ -7,15 +7,14 @@ import (
"code.wuban.net.cn/movabridge/token-bridge/dao"
dbModel "code.wuban.net.cn/movabridge/token-bridge/model/db"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
"math/big"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
log "github.com/sirupsen/logrus"
)
type ChainSync struct {
......@@ -26,6 +25,7 @@ type ChainSync struct {
bridgeCa *bridge.BridgeContract
quit chan struct{}
stopOnce sync.Once
wg sync.WaitGroup
}
func NewChainSync(_chain *config.ChainConfig, _d *dao.Dao) (sync *ChainSync) {
......@@ -45,7 +45,9 @@ func NewChainSync(_chain *config.ChainConfig, _d *dao.Dao) (sync *ChainSync) {
return sync
}
func (s *ChainSync) Start() {
func (s *ChainSync) loop() {
defer s.wg.Done()
lastHeight, err := s.d.GetStorageHeight(s.heightKey)
if err != nil {
if err == dao.ErrRecordNotFound {
......@@ -109,11 +111,14 @@ func (s *ChainSync) Start() {
}
}
func (s *ChainSync) Start() {
s.wg.Add(1)
go s.loop()
}
func (s *ChainSync) Stop() {
s.stopOnce.Do(func() {
log.WithField("chain", s.name).Info("stopping chain sync")
close(s.quit)
})
s.wg.Wait()
}
func (s *ChainSync) SyncLogs(beginHeight, endHeight int64) error {
......
......@@ -37,7 +37,7 @@ func main() {
for _, chainConfig := range conf.Chains {
syncer := chain.NewChainSync(chainConfig, d)
go syncer.Start()
syncer.Start()
syncers = append(syncers, syncer)
}
......
debug = true
[[chains]]
name = "cad"
rpc = "https://1rpc.io/sepolia"
initial_height = 1
batch_block = 100
name = "hpb"
rpc = "https://hpbnode.com"
initial_height = 23240125
batch_block = 2
confirm_block_count = 2
bridge_contract = "0x19Bd3121fEC07F047ac991e7b35C265a2B1F51eE"
validator_private_key = "af426ee077b1eb602fd011714cccf4398d0fc879fd3001a4c648487b1c3e7d2b"
bridge_contract = "0x9a06d0CfAFc19a4bfe0ecd5f8bC20A26a88fA227"
validator_private_key = "fc35cdedfab10b7218ae68b45146736bc66513452000f1fa411ff7a9c1f33439"
[[chains]]
name = "mova"
rpc = "https://pegasus.rpc.caduceus.foundation"
initial_height = 1
batch_block = 100
name = "bit"
rpc = "https://rpc.mova.bitheart.org"
initial_height = 1123988
batch_block = 2
confirm_block_count = 2
bridge_contract = "0xC160b598505c034A820f19e1C8b83ee5d2805A41"
validator_private_key = "af426ee077b1eb602fd011714cccf4398d0fc879fd3001a4c648487b1c3e7d2b"
bridge_contract = "0x9a06d0CfAFc19a4bfe0ecd5f8bC20A26a88fA227"
validator_private_key = "fc35cdedfab10b7218ae68b45146736bc66513452000f1fa411ff7a9c1f33439"
[mysql]
host = "bridgedb"
......
......@@ -5,6 +5,7 @@ import (
dbModel "code.wuban.net.cn/movabridge/token-bridge/model/db"
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
......@@ -17,12 +18,15 @@ type Dao struct {
c *config.Config
db *gorm.DB
ethClient map[int64]*ethclient.Client
quit chan struct{}
wg sync.WaitGroup
}
func New(_c *config.Config) (dao *Dao, err error) {
dao = &Dao{
c: _c,
ethClient: make(map[int64]*ethclient.Client),
quit: make(chan struct{}),
}
// Connect to all configured chains
......@@ -76,3 +80,27 @@ func New(_c *config.Config) (dao *Dao, err error) {
return dao, nil
}
func (d *Dao) Start() {
d.wg.Add(1)
go d.HandleTasks()
}
func (d *Dao) Stop() {
close(d.quit)
d.wg.Wait()
for _, client := range d.ethClient {
if client != nil {
client.Close()
}
}
if d.db != nil {
sqlDB, _ := d.db.DB()
sqlDB.Close()
}
d.ethClient = nil
d.db = nil
d.c = nil
}
......@@ -26,6 +26,18 @@ func (d *Dao) SetStorageHeight(key string, intValue int64) (err error) {
}).Error
}
func (d *Dao) GetUnprocessedBridgeEvents(limit int) (events []*dbModel.BridgeEvent, err error) {
err = d.db.Model(&dbModel.BridgeEvent{}).
Where("`to_chain_status` = ? AND `validator_status` = ?", 0, 0).
Order("created_at ASC").
Limit(limit).
Find(&events).Error
if err == gorm.ErrRecordNotFound {
return nil, ErrRecordNotFound
}
return events, err
}
// func (d *Dao) CreateBridgeEvent(event *dbModel.BridgeEvent) (err error) {
// return d.db.Clauses(clause.OnConflict{DoNothing: true}).Create(event).Error
// }
......
......@@ -196,3 +196,49 @@ func (d *Dao) CheckEventValid() bool {
// This is a placeholder implementation.
return true
}
func (d *Dao) HandleTasks() {
defer d.wg.Done()
ticker := time.NewTicker(5 * time.Second)
maxCount := 5
defer ticker.Stop()
for {
select {
case <-d.quit:
log.Info("stopping Dao task.")
return
case <-ticker.C:
// select unprocessed bridge event with maxCount.
events, err := d.GetUnprocessedBridgeEvents(maxCount)
if err != nil {
log.WithError(err).Error("failed to get unprocessed bridge events")
continue
}
if len(events) == 0 {
log.Info("no unprocessed bridge events found")
continue
}
log.Infof("found %d unprocessed bridge events", len(events))
for _, event := range events {
log.WithFields(log.Fields{
"fromChain": event.FromChain,
"txhash": event.FromChainTxHash,
}).Info("processing bridge event")
if err := d.SubmitInTransfer(event); err != nil {
log.WithError(err).WithFields(log.Fields{
"fromChain": event.FromChain,
"txhash": event.FromChainTxHash,
}).Error("failed to submit in transfer")
} else {
log.WithFields(log.Fields{
"fromChain": event.FromChain,
"txhash": event.FromChainTxHash,
}).Info("successfully submitted in transfer")
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment