Commit 81f2a89f authored by vicotor's avatar vicotor

update code

parent f896ec8b
...@@ -43,44 +43,64 @@ func (s *Dao) HandleEvents(chain ChainInterface, logs []types.Log) error { ...@@ -43,44 +43,64 @@ func (s *Dao) HandleEvents(chain ChainInterface, logs []types.Log) error {
cname := chain.Name() cname := chain.Name()
// begin orm transaction // begin orm transaction
ctx := context.Background() //supportSession := s.SupportsTransactions()
ormTx, err := s.BeginTx(ctx) var ctx context.Context = context.Background()
if err != nil { //var tx *Transaction
log.WithField("chain", cname).WithError(err).Error("begin db transaction") //var err error
return err //if supportSession {
} // tx, err = s.BeginTx(context.Background())
// if err != nil {
// supportSession = false
// }
//}
//if tx != nil {
// ctx = tx.ctx
//} else {
// ctx = context.Background()
//}
var ormTxErr error var ormTxErr error
for _, txLog := range logs { for _, txLog := range logs {
if err := s.filterTransferOut(chain, txLog, ormTx); err != nil { if err := s.filterTransferOut(chain, txLog, ctx); err != nil {
ormTxErr = err ormTxErr = err
break break
} }
if err := s.filterTransferIn(chain, txLog, ormTx); err != nil { if err := s.filterTransferIn(chain, txLog, ctx); err != nil {
ormTxErr = err ormTxErr = err
break break
} }
if err := s.filterTokenConfigChanged(chain, txLog, ormTx); err != nil { if err := s.filterTokenConfigChanged(chain, txLog, ctx); err != nil {
ormTxErr = err ormTxErr = err
break break
} }
} }
// Commit or rollback transaction based on error
// commit or rollback.
if ormTxErr != nil { if ormTxErr != nil {
if rbErr := ormTx.Rollback(); rbErr != nil { //if tx != nil {
log.WithField("chain", cname).WithError(rbErr).Error("failed to rollback transaction") // if rbErr := tx.Rollback(); rbErr != nil {
} // log.WithField("chain", cname).WithError(rbErr).Error("failed to rollback transaction")
log.WithField("chain", cname).WithError(ormTxErr).Error("error processing logs, transaction rolled back") // } else {
// log.WithField("chain", cname).WithError(ormTxErr).Error("error processing logs, transaction rolled back")
// }
//} else {
log.WithFields(log.Fields{
"chain": cname,
"error": ormTxErr.Error(),
}).Error("failed to process logs")
//}
} else { } else {
if cmtErr := ormTx.Commit(); cmtErr != nil { //if tx != nil {
log.WithField("chain", cname).WithError(cmtErr).Error("failed to commit transaction") // if cmtErr := tx.Commit(); cmtErr != nil {
} // log.WithField("chain", cname).WithError(cmtErr).Error("failed to commit transaction")
// }
//}
} }
return nil return nil
} }
// filterTransferOut 用户从当前链跨出事件. // filterTransferOut 用户从当前链跨出事件.
func (s *Dao) filterTransferOut(chain ChainInterface, txLog types.Log, tx *Transaction) error { func (s *Dao) filterTransferOut(chain ChainInterface, txLog types.Log, ctx context.Context) error {
if len(txLog.Topics) == 0 { if len(txLog.Topics) == 0 {
return nil return nil
} }
...@@ -107,7 +127,7 @@ func (s *Dao) filterTransferOut(chain ChainInterface, txLog types.Log, tx *Trans ...@@ -107,7 +127,7 @@ func (s *Dao) filterTransferOut(chain ChainInterface, txLog types.Log, tx *Trans
ReceiveAmount: new(big.Int).Sub(event.Amount, event.Fee).Text(10), ReceiveAmount: new(big.Int).Sub(event.Amount, event.Fee).Text(10),
ToChainStatus: int(TransferChainNoProcess), ToChainStatus: int(TransferChainNoProcess),
} }
if err := s.FillOutTransferEventInfo(tx, dbEvent); err != nil { if err := s.FillOutTransferEventInfo(ctx, dbEvent); err != nil {
log.WithField("chain", chain.Name()).WithError(err).Error("fill out transfer event info") log.WithField("chain", chain.Name()).WithError(err).Error("fill out transfer event info")
return err return err
} }
...@@ -117,7 +137,7 @@ func (s *Dao) filterTransferOut(chain ChainInterface, txLog types.Log, tx *Trans ...@@ -117,7 +137,7 @@ func (s *Dao) filterTransferOut(chain ChainInterface, txLog types.Log, tx *Trans
} }
// filterTransferIn 用户从目标链跨入事件及执行结束事件. // filterTransferIn 用户从目标链跨入事件及执行结束事件.
func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transaction) error { func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, ctx context.Context) error {
if len(txLog.Topics) == 0 { if len(txLog.Topics) == 0 {
return nil return nil
} }
...@@ -146,7 +166,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa ...@@ -146,7 +166,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa
ToChainTxHash: strings.ToLower(txLog.TxHash.String()), ToChainTxHash: strings.ToLower(txLog.TxHash.String()),
} }
if err := s.FillInTransferEventInfo(tx, dbEvent); err != nil { if err := s.FillInTransferEventInfo(ctx, dbEvent); err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{ log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(), "error": err.Error(),
}).Error("db fill in transfer event") }).Error("db fill in transfer event")
...@@ -160,7 +180,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa ...@@ -160,7 +180,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log") log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log")
return err return err
} }
if err := s.UpdateBridgeResult(tx, chain.GetChain().ChainId, event.InId.Int64(), TransferChainExecuted); err != nil { if err := s.UpdateBridgeResult(ctx, chain.GetChain().ChainId, event.InId.Int64(), TransferChainExecuted); err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{ log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(), "error": err.Error(),
}).Error("db update transfer in execution event") }).Error("db update transfer in execution event")
...@@ -172,7 +192,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa ...@@ -172,7 +192,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log") log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log")
return err return err
} }
if err := s.UpdateBridgeResult(tx, chain.GetChain().ChainId, event.InId.Int64(), TransferChainRejected); err != nil { if err := s.UpdateBridgeResult(ctx, chain.GetChain().ChainId, event.InId.Int64(), TransferChainRejected); err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{ log.WithField("chain", chain.Name()).WithFields(log.Fields{
"error": err.Error(), "error": err.Error(),
}).Error("db update transfer in execution event") }).Error("db update transfer in execution event")
...@@ -182,7 +202,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa ...@@ -182,7 +202,7 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog types.Log, tx *Transa
return nil return nil
} }
func (s *Dao) filterTokenConfigChanged(chain ChainInterface, txLog types.Log, tx *Transaction) error { func (s *Dao) filterTokenConfigChanged(chain ChainInterface, txLog types.Log, ctx context.Context) error {
if len(txLog.Topics) == 0 { if len(txLog.Topics) == 0 {
return nil return nil
} }
...@@ -211,7 +231,7 @@ func (s *Dao) filterTokenConfigChanged(chain ChainInterface, txLog types.Log, tx ...@@ -211,7 +231,7 @@ func (s *Dao) filterTokenConfigChanged(chain ChainInterface, txLog types.Log, tx
return err return err
} }
err = s.CreateOrUpdateBridgeTokenInfo(tx, info) err = s.CreateOrUpdateBridgeTokenInfo(ctx, info)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"chain": chain.Name(), "chain": chain.Name(),
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"time"
) )
var ( var (
...@@ -19,6 +20,18 @@ type Transaction struct { ...@@ -19,6 +20,18 @@ type Transaction struct {
ctx context.Context ctx context.Context
} }
func (d *Dao) SupportsTransactions() bool {
// 检查是否支持事务
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := d.db.Client().UseSession(ctx, func(sc mongo.SessionContext) error {
return sc.StartTransaction()
})
return err == nil
}
// BeginTx starts a new MongoDB transaction // BeginTx starts a new MongoDB transaction
func (d *Dao) BeginTx(ctx context.Context) (*Transaction, error) { func (d *Dao) BeginTx(ctx context.Context) (*Transaction, error) {
client := d.db.Client() client := d.db.Client()
...@@ -51,7 +64,7 @@ func (tx *Transaction) Rollback() error { ...@@ -51,7 +64,7 @@ func (tx *Transaction) Rollback() error {
return tx.session.AbortTransaction(tx.ctx) return tx.session.AbortTransaction(tx.ctx)
} }
func (d *Dao) FillInTransferEventInfo(tx *Transaction, inEvent *dbModel.BridgeEvent) error { func (d *Dao) FillInTransferEventInfo(ctx context.Context, inEvent *dbModel.BridgeEvent) error {
collection := d.db.Collection(inEvent.TableName()) collection := d.db.Collection(inEvent.TableName())
filter := bson.M{"from_chain": inEvent.FromChain, "out_id": inEvent.OutId} filter := bson.M{"from_chain": inEvent.FromChain, "out_id": inEvent.OutId}
...@@ -66,12 +79,13 @@ func (d *Dao) FillInTransferEventInfo(tx *Transaction, inEvent *dbModel.BridgeEv ...@@ -66,12 +79,13 @@ func (d *Dao) FillInTransferEventInfo(tx *Transaction, inEvent *dbModel.BridgeEv
"to_chain_status": inEvent.ToChainStatus, "to_chain_status": inEvent.ToChainStatus,
}, },
} }
opts := options.Update().SetUpsert(true)
_, err := collection.UpdateOne(tx.ctx, filter, update) _, err := collection.UpdateOne(ctx, filter, update, opts)
return err return err
} }
func (d *Dao) FillOutTransferEventInfo(tx *Transaction, outEvent *dbModel.BridgeEvent) error { func (d *Dao) FillOutTransferEventInfo(ctx context.Context, outEvent *dbModel.BridgeEvent) error {
collection := d.db.Collection(outEvent.TableName()) collection := d.db.Collection(outEvent.TableName())
filter := bson.M{"from_chain": outEvent.FromChain, "out_id": outEvent.OutId} filter := bson.M{"from_chain": outEvent.FromChain, "out_id": outEvent.OutId}
...@@ -92,11 +106,13 @@ func (d *Dao) FillOutTransferEventInfo(tx *Transaction, outEvent *dbModel.Bridge ...@@ -92,11 +106,13 @@ func (d *Dao) FillOutTransferEventInfo(tx *Transaction, outEvent *dbModel.Bridge
}, },
} }
_, err := collection.UpdateOne(tx.ctx, filter, update) opts := options.Update().SetUpsert(true)
_, err := collection.UpdateOne(ctx, filter, update, opts)
return err return err
} }
func (d *Dao) UpdateBridgeResult(tx *Transaction, toChainId int64, inId int64, result constant.TransferStatus) error { func (d *Dao) UpdateBridgeResult(ctx context.Context, toChainId int64, inId int64, result constant.TransferStatus) error {
collection := d.db.Collection(new(dbModel.BridgeEvent).TableName()) collection := d.db.Collection(new(dbModel.BridgeEvent).TableName())
filter := bson.M{"to_chain": toChainId, "in_id": inId} filter := bson.M{"to_chain": toChainId, "in_id": inId}
...@@ -105,12 +121,13 @@ func (d *Dao) UpdateBridgeResult(tx *Transaction, toChainId int64, inId int64, r ...@@ -105,12 +121,13 @@ func (d *Dao) UpdateBridgeResult(tx *Transaction, toChainId int64, inId int64, r
"to_chain_status": int(result), "to_chain_status": int(result),
}, },
} }
opts := options.Update().SetUpsert(true)
_, err := collection.UpdateOne(tx.ctx, filter, update) _, err := collection.UpdateOne(ctx, filter, update, opts)
return err return err
} }
func (d *Dao) CreateOrUpdateBridgeTokenInfo(tx *Transaction, info *dbModel.BridgeTokenInfo) error { func (d *Dao) CreateOrUpdateBridgeTokenInfo(ctx context.Context, info *dbModel.BridgeTokenInfo) error {
collection := d.db.Collection(info.TableName()) collection := d.db.Collection(info.TableName())
filter := bson.M{"chain_id": info.ChainId, "token": info.Token, "to_chain_id": info.ToChainId} filter := bson.M{"chain_id": info.ChainId, "token": info.Token, "to_chain_id": info.ToChainId}
...@@ -119,6 +136,6 @@ func (d *Dao) CreateOrUpdateBridgeTokenInfo(tx *Transaction, info *dbModel.Bridg ...@@ -119,6 +136,6 @@ func (d *Dao) CreateOrUpdateBridgeTokenInfo(tx *Transaction, info *dbModel.Bridg
} }
opts := options.Update().SetUpsert(true) opts := options.Update().SetUpsert(true)
_, err := collection.UpdateOne(tx.ctx, filter, update, opts) _, err := collection.UpdateOne(ctx, filter, update, opts)
return err return err
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment