Commit 6363dce5 authored by vicotor's avatar vicotor

update code

parent 031f7acd
...@@ -2,41 +2,17 @@ package chain ...@@ -2,41 +2,17 @@ package chain
import ( import (
"code.wuban.net.cn/movabridge/token-bridge/config" "code.wuban.net.cn/movabridge/token-bridge/config"
"code.wuban.net.cn/movabridge/token-bridge/constant"
"code.wuban.net.cn/movabridge/token-bridge/contract/bridge" "code.wuban.net.cn/movabridge/token-bridge/contract/bridge"
"code.wuban.net.cn/movabridge/token-bridge/dao" "code.wuban.net.cn/movabridge/token-bridge/dao"
dbModel "code.wuban.net.cn/movabridge/token-bridge/model/db"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
"math/big"
"strings"
"sync" "sync"
"time" "time"
) )
var ( var _ dao.ChainInterface = (*ChainSync)(nil)
EVENT_TRANSFER_OUT = "TransferOut"
EVENT_TRANSFER_IN = "TransferIn"
EVENT_TRANSFER_IN_CONFIRMATION = "TransferInConfirmation"
EVENT_TRANSFER_IN_REJECTION = "TransferInRejection"
EVENT_TRANSFER_IN_EXECUTION = "TransferInExecution"
)
type ValidatorOp int
func (op ValidatorOp) String() string {
switch op {
case constant.ValidatorStatusConfirmation:
return "TransferInConfirmation"
case constant.ValidatorStatusRejection:
return "TransferInRejection"
default:
return "Unknown"
}
}
type ChainSync struct { type ChainSync struct {
chain *config.ChainConfig chain *config.ChainConfig
...@@ -49,6 +25,26 @@ type ChainSync struct { ...@@ -49,6 +25,26 @@ type ChainSync struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func (s *ChainSync) ParseTransferOut(log types.Log) (*bridge.BridgeContractTransferOut, error) {
return s.bridgeCa.ParseTransferOut(log)
}
func (s *ChainSync) ParseTransferIn(log types.Log) (*bridge.BridgeContractTransferIn, error) {
return s.bridgeCa.ParseTransferIn(log)
}
func (s *ChainSync) ParseTransferInExecution(log types.Log) (*bridge.BridgeContractTransferInExecution, error) {
return s.bridgeCa.ParseTransferInExecution(log)
}
func (s *ChainSync) ParseTransferInRejection(log types.Log) (*bridge.BridgeContractTransferInRejection, error) {
return s.bridgeCa.ParseTransferInRejection(log)
}
func (s *ChainSync) ParseTransferInConfirmation(log types.Log) (*bridge.BridgeContractTransferInConfirmation, error) {
return s.bridgeCa.ParseTransferInConfirmation(log)
}
func NewChainSync(_chain *config.ChainConfig, _d *dao.Dao) (sync *ChainSync) { func NewChainSync(_chain *config.ChainConfig, _d *dao.Dao) (sync *ChainSync) {
bridgeCa, err := bridge.NewBridgeContract(common.HexToAddress(_chain.BridgeContract), nil) bridgeCa, err := bridge.NewBridgeContract(common.HexToAddress(_chain.BridgeContract), nil)
if err != nil { if err != nil {
...@@ -132,6 +128,14 @@ func (s *ChainSync) loop() { ...@@ -132,6 +128,14 @@ func (s *ChainSync) loop() {
} }
} }
func (s *ChainSync) Name() string {
return s.name
}
func (s *ChainSync) GetChain() *config.ChainConfig {
return s.chain
}
func (s *ChainSync) Start() { func (s *ChainSync) Start() {
s.wg.Add(1) s.wg.Add(1)
go s.loop() go s.loop()
...@@ -151,13 +155,12 @@ func (s *ChainSync) SyncLogs(beginHeight, endHeight int64) error { ...@@ -151,13 +155,12 @@ func (s *ChainSync) SyncLogs(beginHeight, endHeight int64) error {
beginHeight = 0 beginHeight = 0
} }
abi, _ := bridge.BridgeContractMetaData.GetAbi()
topics := []string{ topics := []string{
abi.Events["TransferOut"].ID.Hex(), // TransferOut dao.TransferOutEvent.ID.Hex(),
abi.Events["TransferIn"].ID.Hex(), // TransferIn dao.TransferInEvent.ID.Hex(),
abi.Events["TransferInConfirmation"].ID.Hex(), // TransferInConfirmation dao.TransferInConfirmationEvent.ID.Hex(),
abi.Events["TransferInRejection"].ID.Hex(), // TransferInRejection dao.TransferInRejectionEvent.ID.Hex(),
abi.Events["TransferInExecution"].ID.Hex(), // TransferInExecution dao.TransferInExecutionEvent.ID.Hex(),
} }
logs, err := s.d.GetLogs(s.chain, beginHeight, endHeight, topics, []string{ logs, err := s.d.GetLogs(s.chain, beginHeight, endHeight, topics, []string{
...@@ -171,245 +174,6 @@ func (s *ChainSync) SyncLogs(beginHeight, endHeight int64) error { ...@@ -171,245 +174,6 @@ func (s *ChainSync) SyncLogs(beginHeight, endHeight int64) error {
if len(logs) > 0 { if len(logs) > 0 {
log.WithField("chain", s.name).WithFields(log.Fields{"begin": beginHeight, "end": endHeight}).Infof("get %d logs", len(logs)) log.WithField("chain", s.name).WithFields(log.Fields{"begin": beginHeight, "end": endHeight}).Infof("get %d logs", len(logs))
} }
// begin orm transaction
ormTx, err := s.d.BeginTx()
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("begin db transaction")
return err
}
var ormTxErr error
for _, txLog := range logs {
if err := s.FilterTransferOut(txLog, ormTx); err != nil {
ormTxErr = err
break
}
if err := s.FilterTransferIn(txLog, ormTx); err != nil {
ormTxErr = err
break
}
if err := s.FilterValidatorEvents(txLog, ormTx); err != nil {
ormTxErr = err
break
}
}
// Commit or rollback transaction based on error
if ormTxErr != nil {
if rbErr := ormTx.Rollback(); rbErr != nil {
log.WithField("chain", s.name).WithError(rbErr).Error("failed to rollback transaction")
}
log.WithField("chain", s.name).WithError(ormTxErr).Error("error processing logs, transaction rolled back")
} else {
if cmtErr := ormTx.Commit(); cmtErr != nil {
log.WithField("chain", s.name).WithError(cmtErr).Error("failed to commit transaction")
}
}
return nil
}
// FilterTransferOut 用户从当前链跨出事件.
func (s *ChainSync) FilterTransferOut(txLog types.Log, tx *dao.Transaction) error {
if len(txLog.Topics) == 0 {
return nil
}
abi, _ := bridge.BridgeContractMetaData.GetAbi()
if txLog.Topics[0].Hex() == abi.Events["TransferOut"].ID.Hex() {
event, err := s.bridgeCa.ParseTransferOut(txLog)
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("parse TransferOut log")
return err
}
// 防止重复入库.
eventHash := transferOutEventHash(event.FromChainID.Int64(), event.OutId.Int64(), strings.ToLower(txLog.Address.String()))
dbEvent := &dbModel.BridgeEvent{
FromChain: event.FromChainID.Int64(),
OutTimestamp: int64(txLog.BlockTimestamp),
FromContract: strings.ToLower(txLog.Address.String()),
FromAddress: strings.ToLower(event.Sender.String()),
FromToken: strings.ToLower(event.Token.String()),
FromChainTxHash: strings.ToLower(txLog.TxHash.String()),
SendAmount: event.Amount.Text(10),
FeeAmount: event.Fee.Text(10),
ToToken: strings.ToLower(event.ReceiveToken.String()),
ReceiveAmount: new(big.Int).Sub(event.Amount, event.Fee).Text(10),
OutId: event.OutId.Int64(),
Receiver: strings.ToLower(event.Receiver.String()),
ToChain: event.ToChainID.Int64(),
ToChainStatus: constant.TransferChainNoProcess,
Hash: eventHash,
}
err = s.d.CreateBridgeEventTx(tx, dbEvent)
if err != nil {
log.WithField("chain", s.name).WithFields(log.Fields{
"error": err.Error(),
}).Error("db create bridge in event")
return err
}
log.WithField("chain", s.name).WithField("txHash", txLog.TxHash.Hex()).Info("db create, TransferOut event")
}
return nil
}
// FilterTransferIn 用户从目标链跨入事件及执行结束事件.
func (s *ChainSync) FilterTransferIn(txLog types.Log, tx *dao.Transaction) error {
if len(txLog.Topics) == 0 {
return nil
}
abi, _ := bridge.BridgeContractMetaData.GetAbi()
switch txLog.Topics[0].Hex() {
case abi.Events["TransferIn"].ID.Hex():
event, err := s.bridgeCa.ParseTransferIn(txLog)
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("parse TransferIn log")
return err
}
// find out if the event already exists in the database.
dbEvent, err := s.d.GetBridgeEventWithOutInfoTx(tx, event.FromChainID.Int64(), event.OutId.Int64())
if err == dao.ErrRecordNotFound {
log.WithField("chain", s.name).WithField("outId", event.OutId.Int64()).Error("transfer out event not found")
return nil
}
// update the event with the transfer in information.
dbEvent.ToContract = strings.ToLower(txLog.Address.String())
dbEvent.InTimestamp = int64(txLog.BlockTimestamp)
dbEvent.InId = event.InId.Int64()
dbEvent.ToChainTxHash = strings.ToLower(txLog.TxHash.String())
dbEvent.ToChainStatus = constant.TransferChainWaitConfirm
log.WithFields(log.Fields{
"chain": s.name,
"inTimestamp": txLog.BlockTimestamp,
}).Debug("got transfer in event")
if err := s.d.UpdateBridgeWithTransferInTx(tx, dbEvent); err != nil {
log.WithField("chain", s.name).WithFields(log.Fields{
"error": err.Error(),
}).Error("db update transfer in event")
return err
}
return nil
case abi.Events["TransferInExecution"].ID.Hex():
event, err := s.bridgeCa.ParseTransferInExecution(txLog)
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("parse TransferInExecution log")
return err
}
dbEvent, err := s.d.GetBridgeEventWithInInfoTx(tx, s.chain.ChainId, event.InId.Int64())
if err == dao.ErrRecordNotFound {
log.WithField("chain", s.name).WithField("inId", event.InId.Int64()).Error("transfer in event not found")
return nil
}
dbEvent.ToChainStatus = constant.TransferChainExecuted
dbEvent.FinishTxHash = strings.ToLower(txLog.TxHash.String())
if err := s.d.UpdateBridgeResultTx(tx, dbEvent, dbEvent.FinishTxHash, dbEvent.ToChainStatus); err != nil {
log.WithField("chain", s.name).WithFields(log.Fields{
"error": err.Error(),
}).Error("db update transfer in execution event")
return err
}
case abi.Events["TransferInRejection"].ID.Hex():
event, err := s.bridgeCa.ParseTransferInRejection(txLog)
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("parse TransferInExecution log")
return err
}
dbEvent, err := s.d.GetBridgeEventWithInInfoTx(tx, s.chain.ChainId, event.InId.Int64())
if err == dao.ErrRecordNotFound {
log.WithField("chain", s.name).WithField("inId", event.InId.Int64()).Error("transfer in event not found")
return nil
}
dbEvent.ToChainStatus = constant.TransferChainRejected
dbEvent.FinishTxHash = strings.ToLower(txLog.TxHash.String())
if err := s.d.UpdateBridgeResultTx(tx, dbEvent, dbEvent.FinishTxHash, dbEvent.ToChainStatus); err != nil {
log.WithField("chain", s.name).WithFields(log.Fields{
"error": err.Error(),
}).Error("db update transfer in execution event")
return err
}
}
return nil
}
// FilterValidatorEvents 当前链验证者事件.
func (s *ChainSync) FilterValidatorEvents(txLog types.Log, tx *dao.Transaction) error {
if len(txLog.Topics) == 0 {
return nil
}
var (
chainId = s.chain.ChainId
validator = ""
inId = int64(0)
eventHash = ""
txHash = txLog.TxHash.Hex()
valOp ValidatorOp = constant.TransferChainNoProcess
)
isMyselfOp := false
valAddr := strings.ToLower(s.d.GetChainValidatorAddr(s.chain).Hex())
abi, _ := bridge.BridgeContractMetaData.GetAbi()
switch txLog.Topics[0].Hex() {
case abi.Events[EVENT_TRANSFER_IN_CONFIRMATION].ID.Hex():
event, err := s.bridgeCa.ParseTransferInConfirmation(txLog)
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("parse TransferInConfirmation log")
return err
}
valOp = constant.ValidatorStatusConfirmation
eventHash = validatorEventHash(s.chain.ChainId, event.Validator.String(), txLog.TxHash.Hex(), event.InId.Int64(), "TransferInConfirmation")
validator = strings.ToLower(event.Validator.String())
inId = event.InId.Int64()
if validator == valAddr {
isMyselfOp = true
}
case abi.Events[EVENT_TRANSFER_IN_REJECTION].ID.Hex():
event, err := s.bridgeCa.ParseTransferInRejection(txLog)
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("parse TransferInRejection log")
return err
}
eventHash = validatorEventHash(s.chain.ChainId, event.Validator.String(), txLog.TxHash.Hex(), event.InId.Int64(), "TransferInRejection")
validator = strings.ToLower(event.Validator.String())
inId = event.InId.Int64()
valOp = constant.ValidatorStatusRejection
if validator == valAddr {
isMyselfOp = true
}
default:
log.WithField("chain", s.name).Error("unknown event")
return nil
}
err := s.d.CreateValidatorEventTx(tx, eventHash, chainId, validator, txHash, valOp.String(), inId)
if err != nil {
log.WithField("chain", s.name).WithFields(log.Fields{
"error": err.Error(),
}).Error("db create validator event")
return err
}
if isMyselfOp {
event, err := s.d.GetBridgeEventWithInInfoTx(tx, chainId, inId)
if event != nil {
if err = s.d.UpdateBridgeValidatorOperationTx(tx, event, int(valOp)); err != nil {
log.WithError(err).Error("db update validator operation event")
}
} else {
log.WithField("event", txLog.Topics[0].Hex()).WithError(err).Error("not found event for validator event")
}
}
return nil
}
func validatorEventHash(chainId int64, validator string, txHash string, inId int64, event string) string {
hash := sha3.NewLegacyKeccak256()
hash.Write([]byte(fmt.Sprintf("%d%s%s%d%s", chainId, validator, txHash, inId, event)))
return common.BytesToHash(hash.Sum(nil)).String()
}
func transferOutEventHash(fromChain int64, outId int64, fromContract string) string { return s.d.HandleEvents(s, logs)
hash := sha3.NewLegacyKeccak256()
hash.Write([]byte(fmt.Sprintf("%d%d%s", fromChain, outId, fromContract)))
return common.BytesToHash(hash.Sum(nil)).String()
} }
...@@ -21,3 +21,24 @@ const ( ...@@ -21,3 +21,24 @@ const (
ValidatorStatusRejection = 2 ValidatorStatusRejection = 2
ValidatorStatusFailure = 3 ValidatorStatusFailure = 3
) )
const (
EVENT_TRANSFER_OUT = "TransferOut"
EVENT_TRANSFER_IN = "TransferIn"
EVENT_TRANSFER_IN_CONFIRMATION = "TransferInConfirmation"
EVENT_TRANSFER_IN_REJECTION = "TransferInRejection"
EVENT_TRANSFER_IN_EXECUTION = "TransferInExecution"
)
type ValidatorOp int
func (op ValidatorOp) String() string {
switch op {
case ValidatorStatusConfirmation:
return "TransferInConfirmation"
case ValidatorStatusRejection:
return "TransferInRejection"
default:
return "Unknown"
}
}
...@@ -1847,7 +1847,7 @@ func (_BridgeContract *BridgeContractFilterer) ParseTokenConfigChanged(log types ...@@ -1847,7 +1847,7 @@ func (_BridgeContract *BridgeContractFilterer) ParseTokenConfigChanged(log types
return event, nil return event, nil
} }
// BridgeContractTransferInIterator is returned from FilterTransferIn and is used to iterate over the raw logs and unpacked data for TransferIn events raised by the BridgeContract contract. // BridgeContractTransferInIterator is returned from filterTransferIn and is used to iterate over the raw logs and unpacked data for TransferIn events raised by the BridgeContract contract.
type BridgeContractTransferInIterator struct { type BridgeContractTransferInIterator struct {
Event *BridgeContractTransferIn // Event containing the contract specifics and raw log Event *BridgeContractTransferIn // Event containing the contract specifics and raw log
...@@ -2393,7 +2393,7 @@ func (_BridgeContract *BridgeContractFilterer) ParseTransferInRejection(log type ...@@ -2393,7 +2393,7 @@ func (_BridgeContract *BridgeContractFilterer) ParseTransferInRejection(log type
return event, nil return event, nil
} }
// BridgeContractTransferOutIterator is returned from FilterTransferOut and is used to iterate over the raw logs and unpacked data for TransferOut events raised by the BridgeContract contract. // BridgeContractTransferOutIterator is returned from filterTransferOut and is used to iterate over the raw logs and unpacked data for TransferOut events raised by the BridgeContract contract.
type BridgeContractTransferOutIterator struct { type BridgeContractTransferOutIterator struct {
Event *BridgeContractTransferOut // Event containing the contract specifics and raw log Event *BridgeContractTransferOut // Event containing the contract specifics and raw log
......
package dao
import (
"code.wuban.net.cn/movabridge/token-bridge/config"
. "code.wuban.net.cn/movabridge/token-bridge/constant"
"code.wuban.net.cn/movabridge/token-bridge/contract/bridge"
dbModel "code.wuban.net.cn/movabridge/token-bridge/model/db"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
"math/big"
"strings"
)
type ChainInterface interface {
Name() string
GetChain() *config.ChainConfig
ParseTransferOut(log types.Log) (*bridge.BridgeContractTransferOut, error)
ParseTransferIn(log types.Log) (*bridge.BridgeContractTransferIn, error)
ParseTransferInExecution(log types.Log) (*bridge.BridgeContractTransferInExecution, error)
ParseTransferInRejection(log types.Log) (*bridge.BridgeContractTransferInRejection, error)
ParseTransferInConfirmation(log types.Log) (*bridge.BridgeContractTransferInConfirmation, error)
}
var (
bridgeAbi, _ = bridge.BridgeContractMetaData.GetAbi()
TransferOutEvent = bridgeAbi.Events[EVENT_TRANSFER_OUT]
TransferInEvent = bridgeAbi.Events[EVENT_TRANSFER_IN]
TransferInExecutionEvent = bridgeAbi.Events[EVENT_TRANSFER_IN_EXECUTION]
TransferInRejectionEvent = bridgeAbi.Events[EVENT_TRANSFER_IN_REJECTION]
TransferInConfirmationEvent = bridgeAbi.Events[EVENT_TRANSFER_IN_CONFIRMATION]
)
func (s *Dao) HandleEvents(chain ChainInterface, logs []types.Log) error {
s.handleMux.Lock()
defer s.handleMux.Unlock()
cname := chain.Name()
// begin orm transaction
ormTx, err := s.BeginTx()
if err != nil {
log.WithField("chain", cname).WithError(err).Error("begin db transaction")
return err
}
var ormTxErr error
for _, txLog := range logs {
if err := s.filterTransferOut(chain, txLog, ormTx); err != nil {
ormTxErr = err
break
}
if err := s.filterTransferIn(chain, txLog, ormTx); err != nil {
ormTxErr = err
break
}
if err := s.filterValidatorEvents(chain, txLog, ormTx); err != nil {
ormTxErr = err
break
}
}
// Commit or rollback transaction based on error
if ormTxErr != nil {
if rbErr := ormTx.Rollback(); rbErr != nil {
log.WithField("chain", cname).WithError(rbErr).Error("failed to rollback transaction")
}
log.WithField("chain", cname).WithError(ormTxErr).Error("error processing logs, transaction rolled back")
} else {
if cmtErr := ormTx.Commit(); cmtErr != nil {
log.WithField("chain", cname).WithError(cmtErr).Error("failed to commit transaction")
}
}
return nil
}
// filterTransferOut 用户从当前链跨出事件.
func (s *Dao) filterTransferOut(chain ChainInterface, txLog types.Log, tx *Transaction) error {
if len(txLog.Topics) == 0 {
return nil
}
if txLog.Topics[0].Hex() == TransferOutEvent.ID.Hex() {
event, err := chain.ParseTransferOut(txLog)
if err != nil {
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferOut log")
return err
}
// 防止重复入库.
eventHash := s.transferHash(event.FromChainID.Int64(), event.OutId.Int64())
existEvent, _ := s.GetBridgeEventByHashTx(tx, eventHash)
if existEvent == nil {
dbEvent := &dbModel.BridgeEvent{
FromChain: event.FromChainID.Int64(),
OutTimestamp: int64(txLog.BlockTimestamp),
FromContract: strings.ToLower(txLog.Address.String()),
FromAddress: strings.ToLower(event.Sender.String()),
FromToken: strings.ToLower(event.Token.String()),
FromChainTxHash: strings.ToLower(txLog.TxHash.String()),
SendAmount: event.Amount.Text(10),
FeeAmount: event.Fee.Text(10),
ToToken: strings.ToLower(event.ReceiveToken.String()),
ReceiveAmount: new(big.Int).Sub(event.Amount, event.Fee).Text(10),
OutId: event.OutId.Int64(),
Receiver: strings.ToLower(event.Receiver.String()),
ToChain: event.ToChainID.Int64(),
ToChainStatus: TransferChainNoProcess,
Hash: eventHash,
}
err = s.CreateBridgeEventTx(tx, dbEvent)
if err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(),
}).Error("db create bridge in event")
return err
}
log.WithField("chain", chain.Name()).WithField("txHash", txLog.TxHash.Hex()).Info("db create, TransferOut event")
} else {
if existEvent.FromChainTxHash == "" {
// update existing event with missing fields.
existEvent.FromChain = event.FromChainID.Int64()
existEvent.OutTimestamp = int64(txLog.BlockTimestamp)
existEvent.FromContract = strings.ToLower(txLog.Address.String())
existEvent.FromAddress = strings.ToLower(event.Sender.String())
existEvent.FromToken = strings.ToLower(event.Token.String())
existEvent.FromChainTxHash = strings.ToLower(txLog.TxHash.String())
existEvent.SendAmount = event.Amount.Text(10)
existEvent.FeeAmount = event.Fee.Text(10)
existEvent.ToToken = strings.ToLower(event.ReceiveToken.String())
existEvent.ReceiveAmount = new(big.Int).Sub(event.Amount, event.Fee).Text(10)
existEvent.OutId = event.OutId.Int64()
existEvent.Receiver = strings.ToLower(event.Receiver.String())
existEvent.ToChain = event.ToChainID.Int64()
existEvent.Hash = eventHash
err = s.UpdateFullBridgeTx(tx, existEvent)
if err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(),
}).Error("db update full bridge event")
return err
}
} else {
// already exist, do nothing.
}
}
}
return nil
}
// filterTransferIn 用户从目标链跨入事件及执行结束事件.
func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transaction) error {
if len(txLog.Topics) == 0 {
return nil
}
switch txLog.Topics[0].Hex() {
case TransferInEvent.ID.Hex():
event, err := chain.ParseTransferIn(txLog)
if err != nil {
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferIn log")
return err
}
eventHash := s.transferHash(event.FromChainID.Int64(), event.OutId.Int64())
existEvent, _ := s.GetBridgeEventByHashTx(tx, eventHash)
if existEvent == nil {
dbEvent := &dbModel.BridgeEvent{
Hash: eventHash,
ToContract: strings.ToLower(txLog.Address.String()),
InTimestamp: int64(txLog.BlockTimestamp),
InId: event.InId.Int64(),
ToChain: chain.GetChain().ChainId,
ToChainTxHash: strings.ToLower(txLog.TxHash.String()),
ToChainStatus: TransferChainWaitConfirm,
}
err = s.CreateBridgeEventTx(tx, dbEvent)
if err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(),
}).Error("db create bridge in event")
return err
}
log.WithField("chain", chain.Name()).WithField("txHash", txLog.TxHash.Hex()).Info("db create, TransferOut event")
} else {
if existEvent.ToChainTxHash == "" {
// update the event with the transfer in information.
existEvent.ToContract = strings.ToLower(txLog.Address.String())
existEvent.InTimestamp = int64(txLog.BlockTimestamp)
existEvent.InId = event.InId.Int64()
existEvent.ToChainTxHash = strings.ToLower(txLog.TxHash.String())
existEvent.ToChainStatus = TransferChainWaitConfirm
log.WithFields(log.Fields{
"chain": chain.Name(),
"inTimestamp": txLog.BlockTimestamp,
}).Debug("got transfer in event")
if err := s.UpdateFullBridgeTx(tx, existEvent); err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(),
}).Error("db update transfer in event")
return err
}
return nil
} else {
// already exist, do nothing.
return nil
}
}
case TransferInExecutionEvent.ID.Hex():
event, err := chain.ParseTransferInExecution(txLog)
if err != nil {
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log")
return err
}
dbEvent, err := s.GetBridgeEventWithInInfoTx(tx, chain.GetChain().ChainId, event.InId.Int64())
if err == ErrRecordNotFound {
log.WithField("chain", chain.Name()).WithField("inId", event.InId.Int64()).Error("transfer in event not found")
return nil
}
dbEvent.ToChainStatus = TransferChainExecuted
dbEvent.FinishTxHash = strings.ToLower(txLog.TxHash.String())
if err := s.UpdateBridgeResultTx(tx, dbEvent, dbEvent.FinishTxHash, dbEvent.ToChainStatus); err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(),
}).Error("db update transfer in execution event")
return err
}
case TransferInRejectionEvent.ID.Hex():
event, err := chain.ParseTransferInRejection(txLog)
if err != nil {
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log")
return err
}
dbEvent, err := s.GetBridgeEventWithInInfoTx(tx, chain.GetChain().ChainId, event.InId.Int64())
if err == ErrRecordNotFound {
log.WithField("chain", chain.Name()).WithField("inId", event.InId.Int64()).Error("transfer in event not found")
return nil
}
dbEvent.ToChainStatus = TransferChainRejected
dbEvent.FinishTxHash = strings.ToLower(txLog.TxHash.String())
if err := s.UpdateBridgeResultTx(tx, dbEvent, dbEvent.FinishTxHash, dbEvent.ToChainStatus); err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(),
}).Error("db update transfer in execution event")
return err
}
}
return nil
}
// filterValidatorEvents 当前链验证者事件.
func (s *Dao) filterValidatorEvents(chain ChainInterface, txLog types.Log, tx *Transaction) error {
if len(txLog.Topics) == 0 {
return nil
}
var (
chainId = chain.GetChain().ChainId
validator = ""
inId = int64(0)
eventHash = ""
txHash = txLog.TxHash.Hex()
valOp ValidatorOp = TransferChainNoProcess
)
isMyselfOp := false
valAddr := strings.ToLower(s.GetChainValidatorAddr(chain.GetChain()).Hex())
switch txLog.Topics[0].Hex() {
case TransferInConfirmationEvent.ID.Hex():
event, err := chain.ParseTransferInConfirmation(txLog)
if err != nil {
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInConfirmation log")
return err
}
valOp = ValidatorStatusConfirmation
eventHash = validatorEventHash(chainId, event.Validator.String(), txLog.TxHash.Hex(), event.InId.Int64(), "TransferInConfirmation")
validator = strings.ToLower(event.Validator.String())
inId = event.InId.Int64()
if validator == valAddr {
isMyselfOp = true
}
case TransferInRejectionEvent.ID.Hex():
event, err := chain.ParseTransferInRejection(txLog)
if err != nil {
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInRejection log")
return err
}
eventHash = validatorEventHash(chainId, event.Validator.String(), txLog.TxHash.Hex(), event.InId.Int64(), "TransferInRejection")
validator = strings.ToLower(event.Validator.String())
inId = event.InId.Int64()
valOp = ValidatorStatusRejection
if validator == valAddr {
isMyselfOp = true
}
default:
log.WithField("chain", chain.Name()).Error("unknown event")
return nil
}
err := s.CreateValidatorEventTx(tx, txLog.Address.String(), eventHash, chainId, validator, txHash, valOp.String(), inId)
if err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(),
}).Error("db create validator event")
return err
}
if isMyselfOp {
event, err := s.GetBridgeEventWithInInfoTx(tx, chainId, inId)
if event != nil {
if err = s.UpdateBridgeValidatorOperationTx(tx, event, int(valOp)); err != nil {
log.WithError(err).Error("db update validator operation event")
}
} else {
log.WithField("event", txLog.Topics[0].Hex()).WithError(err).Error("not found event for validator event")
}
}
return nil
}
func validatorEventHash(chainId int64, validator string, txHash string, inId int64, event string) string {
hash := sha3.NewLegacyKeccak256()
hash.Write([]byte(fmt.Sprintf("%d%s%s%d%s", chainId, validator, txHash, inId, event)))
return common.BytesToHash(hash.Sum(nil)).String()
}
func (s *Dao) transferHash(fromChainId int64, outId int64) string {
chainInfo, exist := s.chainGroup[fromChainId]
if !exist {
return ""
}
fromContract := chainInfo.conf.BridgeContract
hash := sha3.NewLegacyKeccak256()
hash.Write([]byte(fmt.Sprintf("%d%d%s", fromChainId, outId, fromContract)))
return common.BytesToHash(hash.Sum(nil)).String()
}
func transferOutEventHash(fromChain int64, outId int64, fromContract string) string {
hash := sha3.NewLegacyKeccak256()
hash.Write([]byte(fmt.Sprintf("%d%d%s", fromChain, outId, fromContract)))
return common.BytesToHash(hash.Sum(nil)).String()
}
...@@ -14,19 +14,25 @@ import ( ...@@ -14,19 +14,25 @@ import (
"gorm.io/gorm/schema" "gorm.io/gorm/schema"
) )
type ChainInfo struct {
conf *config.ChainConfig
cli *ethclient.Client
}
type Dao struct { type Dao struct {
c *config.Config c *config.Config
db *gorm.DB db *gorm.DB
ethClient map[int64]*ethclient.Client chainGroup map[int64]ChainInfo
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
handleMux sync.Mutex
} }
func New(_c *config.Config) (dao *Dao, err error) { func New(_c *config.Config) (dao *Dao, err error) {
dao = &Dao{ dao = &Dao{
c: _c, c: _c,
ethClient: make(map[int64]*ethclient.Client), chainGroup: make(map[int64]ChainInfo),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
// Connect to all configured chains // Connect to all configured chains
...@@ -45,7 +51,10 @@ func New(_c *config.Config) (dao *Dao, err error) { ...@@ -45,7 +51,10 @@ func New(_c *config.Config) (dao *Dao, err error) {
// Update the chain ID in the config // Update the chain ID in the config
chainConfig.ChainId = chainId.Int64() chainConfig.ChainId = chainId.Int64()
dao.ethClient[chainId.Int64()] = client dao.chainGroup[chainId.Int64()] = ChainInfo{
conf: chainConfig,
cli: client,
}
fmt.Printf("Connected to %s chain with ID %d\n", name, chainConfig.ChainId) fmt.Printf("Connected to %s chain with ID %d\n", name, chainConfig.ChainId)
} }
...@@ -90,16 +99,16 @@ func (d *Dao) Stop() { ...@@ -90,16 +99,16 @@ func (d *Dao) Stop() {
close(d.quit) close(d.quit)
d.wg.Wait() d.wg.Wait()
for _, client := range d.ethClient { for _, chain := range d.chainGroup {
if client != nil { if chain.cli != nil {
client.Close() chain.cli.Close()
} }
} }
if d.db != nil { if d.db != nil {
sqlDB, _ := d.db.DB() sqlDB, _ := d.db.DB()
sqlDB.Close() sqlDB.Close()
} }
d.ethClient = nil d.chainGroup = nil
d.db = nil d.db = nil
d.c = nil d.c = nil
} }
...@@ -58,6 +58,19 @@ func (d *Dao) GetBridgeEventWithInInfoTx(tx *Transaction, chain int64, inId int6 ...@@ -58,6 +58,19 @@ func (d *Dao) GetBridgeEventWithInInfoTx(tx *Transaction, chain int64, inId int6
return event, err return event, err
} }
func (d *Dao) UpdateFullBridgeTx(tx *Transaction, event *dbModel.BridgeEvent) error {
return tx.tx.Model(event).Where("`id` = ?", event.ID).Updates(event).Error
}
func (d *Dao) GetBridgeEventByHashTx(tx *Transaction, hash string) (event *dbModel.BridgeEvent, err error) {
event = new(dbModel.BridgeEvent)
err = tx.tx.Model(event).Where("`hash` = ?", hash).First(event).Error
if err == gorm.ErrRecordNotFound {
return nil, ErrRecordNotFound
}
return event, err
}
func (d *Dao) UpdateBridgeWithTransferInTx(tx *Transaction, event *dbModel.BridgeEvent) error { func (d *Dao) UpdateBridgeWithTransferInTx(tx *Transaction, event *dbModel.BridgeEvent) error {
return tx.tx.Model(event).Where("`id` = ?", event.ID).Updates(map[string]interface{}{ return tx.tx.Model(event).Where("`id` = ?", event.ID).Updates(map[string]interface{}{
"to_contract": event.ToContract, "to_contract": event.ToContract,
...@@ -81,8 +94,9 @@ func (d *Dao) UpdateBridgeResultTx(tx *Transaction, event *dbModel.BridgeEvent, ...@@ -81,8 +94,9 @@ func (d *Dao) UpdateBridgeResultTx(tx *Transaction, event *dbModel.BridgeEvent,
}).Error }).Error
} }
func (d *Dao) CreateValidatorEventTx(tx *Transaction, hash string, chain int64, validator string, txHash string, eventType string, transferInId int64) error { func (d *Dao) CreateValidatorEventTx(tx *Transaction, contract string, hash string, chain int64, validator string, txHash string, eventType string, transferInId int64) error {
event := &dbModel.ValidatorEvent{ event := &dbModel.ValidatorEvent{
Contract: contract,
ChainId: chain, ChainId: chain,
Validator: validator, Validator: validator,
TxHash: txHash, TxHash: txHash,
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"context" "context"
"errors" "errors"
"github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
"math/big" "math/big"
"time" "time"
...@@ -23,10 +24,14 @@ import ( ...@@ -23,10 +24,14 @@ import (
func (d *Dao) GetBlockHeight(chain *config.ChainConfig, behindBlock ...int) (height int64, err error) { func (d *Dao) GetBlockHeight(chain *config.ChainConfig, behindBlock ...int) (height int64, err error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
if _, ok := d.ethClient[chain.ChainId]; !ok { if _, ok := d.chainGroup[chain.ChainId]; !ok {
return 0, errors.New("chain client not support") return 0, errors.New("chain client not support")
} }
n, err := d.ethClient[chain.ChainId].BlockNumber(ctx) chaininfo, ok := d.chainGroup[chain.ChainId]
if !ok {
return 0, errors.New("chain client not support")
}
n, err := chaininfo.cli.BlockNumber(ctx)
if len(behindBlock) > 0 { if len(behindBlock) > 0 {
n -= uint64(behindBlock[0]) n -= uint64(behindBlock[0])
if n < 0 { if n < 0 {
...@@ -39,10 +44,11 @@ func (d *Dao) GetBlockHeight(chain *config.ChainConfig, behindBlock ...int) (hei ...@@ -39,10 +44,11 @@ func (d *Dao) GetBlockHeight(chain *config.ChainConfig, behindBlock ...int) (hei
func (d *Dao) GetLatestBockHash(chain *config.ChainConfig) (hash string, err error) { func (d *Dao) GetLatestBockHash(chain *config.ChainConfig) (hash string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
if _, ok := d.ethClient[chain.ChainId]; !ok { chainInfo, ok := d.chainGroup[chain.ChainId]
if !ok {
return "", errors.New("chain client not support") return "", errors.New("chain client not support")
} }
block, err := d.ethClient[chain.ChainId].BlockByNumber(ctx, nil) block, err := chainInfo.cli.BlockByNumber(ctx, nil)
if err != nil { if err != nil {
return return
} }
...@@ -50,14 +56,15 @@ func (d *Dao) GetLatestBockHash(chain *config.ChainConfig) (hash string, err err ...@@ -50,14 +56,15 @@ func (d *Dao) GetLatestBockHash(chain *config.ChainConfig) (hash string, err err
} }
func (d *Dao) GetBlockTime(chain *config.ChainConfig, height int) (timestamp int, err error) { func (d *Dao) GetBlockTime(chain *config.ChainConfig, height int) (timestamp int, err error) {
if _, ok := d.ethClient[chain.ChainId]; !ok { chainInfo, ok := d.chainGroup[chain.ChainId]
if !ok {
return 0, errors.New("chain client not support") return 0, errors.New("chain client not support")
} }
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
block, err := d.ethClient[chain.ChainId].BlockByNumber(ctx, big.NewInt(int64(height))) block, err := chainInfo.cli.BlockByNumber(ctx, big.NewInt(int64(height)))
if err == nil { if err == nil {
return int(block.Time()), nil return int(block.Time()), nil
} }
...@@ -66,12 +73,13 @@ func (d *Dao) GetBlockTime(chain *config.ChainConfig, height int) (timestamp int ...@@ -66,12 +73,13 @@ func (d *Dao) GetBlockTime(chain *config.ChainConfig, height int) (timestamp int
} }
func (d *Dao) GetLogs(chain *config.ChainConfig, beginHeight, endHeight int64, topics, addresses []string) (logs []types.Log, err error) { func (d *Dao) GetLogs(chain *config.ChainConfig, beginHeight, endHeight int64, topics, addresses []string) (logs []types.Log, err error) {
if _, ok := d.ethClient[chain.ChainId]; !ok { chainInfo, ok := d.chainGroup[chain.ChainId]
if !ok {
return nil, errors.New("chain client not support") return nil, errors.New("chain client not support")
} }
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
// 重试2次 // 重试2次
logs, err = d.getLogs(chain, beginHeight, endHeight, topics, addresses) logs, err = d.getLogs(chainInfo.cli, beginHeight, endHeight, topics, addresses)
if err == nil { if err == nil {
return logs, nil return logs, nil
} }
...@@ -79,7 +87,7 @@ func (d *Dao) GetLogs(chain *config.ChainConfig, beginHeight, endHeight int64, t ...@@ -79,7 +87,7 @@ func (d *Dao) GetLogs(chain *config.ChainConfig, beginHeight, endHeight int64, t
return return
} }
func (d *Dao) getLogs(chain *config.ChainConfig, beginHeight, endHeight int64, topics []string, addresses []string) (logs []types.Log, err error) { func (d *Dao) getLogs(client *ethclient.Client, beginHeight, endHeight int64, topics []string, addresses []string) (logs []types.Log, err error) {
addrs := make([]common.Address, 0) addrs := make([]common.Address, 0)
for _, addr := range addresses { for _, addr := range addresses {
addrs = append(addrs, common.HexToAddress(addr)) addrs = append(addrs, common.HexToAddress(addr))
...@@ -97,7 +105,7 @@ func (d *Dao) getLogs(chain *config.ChainConfig, beginHeight, endHeight int64, t ...@@ -97,7 +105,7 @@ func (d *Dao) getLogs(chain *config.ChainConfig, beginHeight, endHeight int64, t
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
return d.ethClient[chain.ChainId].FilterLogs(ctx, q) return client.FilterLogs(ctx, q)
} }
func (d *Dao) buildParam(event *dbModel.BridgeEvent) (bridge.BridgesubmitParams, error) { func (d *Dao) buildParam(event *dbModel.BridgeEvent) (bridge.BridgesubmitParams, error) {
...@@ -144,59 +152,49 @@ func (d *Dao) buildParam(event *dbModel.BridgeEvent) (bridge.BridgesubmitParams, ...@@ -144,59 +152,49 @@ func (d *Dao) buildParam(event *dbModel.BridgeEvent) (bridge.BridgesubmitParams,
} }
func (d *Dao) SubmitInTransfer(event *dbModel.BridgeEvent) error { func (d *Dao) SubmitInTransfer(event *dbModel.BridgeEvent) error {
chain, ok := d.chainGroup[event.ToChain]
if _, ok := d.ethClient[event.ToChain]; !ok { if !ok {
return errors.New("chain client not support") return errors.New("chain not support")
}
var chain *config.ChainConfig
for _, c := range d.c.Chains {
if c.ChainId == event.ToChain {
chain = c
break
}
}
if chain == nil {
return errors.New("chain not found in config")
} }
// verify the event is valid. // verify the event is valid.
valid := d.CheckEventValid() valid := d.CheckEventValid()
if !valid { if !valid {
log.WithField("chainId", chain.ChainId).Error("event is not valid") log.WithField("chainId", chain.conf.ChainId).Error("event is not valid")
return errors.New("event is not valid") return errors.New("event is not valid")
} }
ca, err := bridge.NewBridgeContract(common.HexToAddress(chain.BridgeContract), d.ethClient[chain.ChainId]) ca, err := bridge.NewBridgeContract(common.HexToAddress(chain.conf.BridgeContract), chain.cli)
if err != nil { if err != nil {
return err return err
} }
k := chain.ValidatorPrivateKey k := chain.conf.ValidatorPrivateKey
if k == "" { if k == "" {
log.WithField("chain", chain.Name).Warn("validator private key is empty, skip submit in transfer") log.WithField("chain", chain.conf.Name).Warn("validator private key is empty, skip submit in transfer")
return nil return nil
} }
signPrivateKey, err := crypto.HexToECDSA(common.Bytes2Hex(common.FromHex(k))) signPrivateKey, err := crypto.HexToECDSA(common.Bytes2Hex(common.FromHex(k)))
if err != nil { if err != nil {
log.WithField("chainId", chain.ChainId).WithError(err).Error("failed to parse private key") log.WithField("chainId", chain.conf.ChainId).WithError(err).Error("failed to parse private key")
return err return err
} }
opts, err := bind.NewKeyedTransactorWithChainID(signPrivateKey, big.NewInt(int64(chain.ChainId))) opts, err := bind.NewKeyedTransactorWithChainID(signPrivateKey, big.NewInt(int64(chain.conf.ChainId)))
if err != nil { if err != nil {
log.WithField("chainId", chain.ChainId).WithError(err).Error("new keyed transfer failed") log.WithField("chainId", chain.conf.ChainId).WithError(err).Error("new keyed transfer failed")
return err return err
} }
param, err := d.buildParam(event) param, err := d.buildParam(event)
if err != nil { if err != nil {
log.WithField("chainId", chain.ChainId).WithError(err).Error("build param failed") log.WithField("chainId", chain.conf.ChainId).WithError(err).Error("build param failed")
return err return err
} }
if tx, err := ca.SubmitInTransfer(opts, param); err != nil { if tx, err := ca.SubmitInTransfer(opts, param); err != nil {
log.WithField("chainId", chain.ChainId).WithError(err).Error("failed to submit in transfer") log.WithField("chainId", chain.conf.ChainId).WithError(err).Error("failed to submit in transfer")
return err return err
} else { } else {
// update validator status. // update validator status.
log.WithField("chainId", chain.ChainId).Infof("submit in transfer tx hash: %s", tx.Hash().Hex()) log.WithField("chainId", chain.conf.ChainId).Infof("submit in transfer tx hash: %s", tx.Hash().Hex())
return d.UpdateBridgeValidatorOperation(event, constant.ValidatorStatusConfirmation) return d.UpdateBridgeValidatorOperation(event, constant.ValidatorStatusConfirmation)
} }
} }
......
...@@ -36,6 +36,7 @@ type BridgeEvent struct { ...@@ -36,6 +36,7 @@ type BridgeEvent struct {
type ValidatorEvent struct { type ValidatorEvent struct {
ChainId int64 `gorm:"type:int;comment:链ID"` ChainId int64 `gorm:"type:int;comment:链ID"`
Contract string `gorm:"type:varchar(255);comment:合约地址"` // 合约地址
Validator string `gorm:"type:varchar(255);index;comment:验证者地址"` // 验证者地址 Validator string `gorm:"type:varchar(255);index;comment:验证者地址"` // 验证者地址
TxHash string `gorm:"type:varchar(255);index;comment:交易hash"` // 交易hash TxHash string `gorm:"type:varchar(255);index;comment:交易hash"` // 交易hash
Event string `gorm:"type:varchar(255);index;comment:事件类型"` // 事件类型 Event string `gorm:"type:varchar(255);index;comment:事件类型"` // 事件类型
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment