Commit f3a3c7aa authored by vicotor's avatar vicotor

update code

parent ccf08a43
......@@ -114,7 +114,6 @@ func (s *ChainSync) loop() {
return
case <-tm.C:
var endHeight = beginHeight + int64(s.chain.BatchBlock)
latestHeight, err = s.d.GetBlockHeight(s.chain, s.chain.BehindBlock)
if err != nil {
log.WithField("chain", s.name).WithError(err).Error("get latest block height")
......@@ -133,12 +132,12 @@ func (s *ChainSync) loop() {
} else {
s.syncmode = true
}
log.WithField("chain", s.name).WithFields(log.Fields{
"begin height": beginHeight,
"block time": blockTime,
"sync mode": s.syncmode,
"sinceTime": time.Since(blockTime).String(),
}).Debug("begin block time")
//log.WithField("chain", s.name).WithFields(log.Fields{
// "begin height": beginHeight,
// "block time": blockTime,
// "sync mode": s.syncmode,
// "sinceTime": time.Since(blockTime).String(),
//}).Trace("begin block time")
}
if latestHeight < endHeight {
......@@ -161,7 +160,7 @@ func (s *ChainSync) loop() {
"end height": endHeight,
"latest height": latestHeight,
"diff height": latestHeight - endHeight,
}).Debug("backend block")
}).Trace("backend block")
beginHeight = endHeight + 1
if s.syncmode {
......
......@@ -39,27 +39,61 @@ var (
func (s *Dao) HandleEvents(chain ChainInterface, logs []types.Log) error {
cname := chain.Name()
// sort logs by topic.
var allTransferOut = make([]types.Log, 0)
var allTransferIn = make([]types.Log, 0)
var allTokenConfigChanged = make([]types.Log, 0)
var allExecuted = make([]types.Log, 0)
var allRejected = make([]types.Log, 0)
var allConfirmed = make([]types.Log, 0)
// refetch all logs
var txs []common.Hash
var txexist = make(map[common.Hash]bool)
for _, lg := range logs {
if _, ok := txexist[lg.TxHash]; !ok {
txs = append(txs, lg.TxHash)
txexist[lg.TxHash] = true
switch lg.Topics[0].String() {
case TransferOutEvent.ID.Hex():
allTransferOut = append(allTransferOut, lg)
case TransferInEvent.ID.Hex():
allTransferIn = append(allTransferIn, lg)
case TokenConfigChangedEvent.ID.Hex():
allTokenConfigChanged = append(allTokenConfigChanged, lg)
case TransferInExecutionEvent.ID.Hex():
allExecuted = append(allExecuted, lg)
case TransferInRejectionEvent.ID.Hex():
allRejected = append(allRejected, lg)
case TransferInConfirmationEvent.ID.Hex():
allConfirmed = append(allConfirmed, lg)
}
}
var allLogs []*types.Log
client := s.chainGroup[chain.GetChain().ChainId].cli
for _, tx := range txs {
receipt, err := client.TransactionReceipt(context.Background(), tx)
if err != nil {
log.WithField("chain", cname).WithError(err).Error("get tx receipt failed")
return err
var allLogs []types.Log
allLogs = append(allLogs, allTransferOut...)
allLogs = append(allLogs, allTransferIn...)
allLogs = append(allLogs, allTokenConfigChanged...)
allLogs = append(allLogs, allRejected...)
allLogs = append(allLogs, allConfirmed...)
allLogs = append(allLogs, allExecuted...)
// disable the code.
if false {
var txs []common.Hash
var txexist = make(map[common.Hash]bool)
for _, lg := range logs {
if _, ok := txexist[lg.TxHash]; !ok {
txs = append(txs, lg.TxHash)
txexist[lg.TxHash] = true
}
}
var allLogs []*types.Log
client := s.chainGroup[chain.GetChain().ChainId].cli
for _, tx := range txs {
receipt, err := client.TransactionReceipt(context.Background(), tx)
if err != nil {
log.WithField("chain", cname).WithError(err).Error("get tx receipt failed")
return err
}
allLogs = append(allLogs, receipt.Logs...)
}
allLogs = append(allLogs, receipt.Logs...)
}
s.handleMux.Lock()
defer s.handleMux.Unlock()
// begin orm transaction
......@@ -80,7 +114,8 @@ func (s *Dao) HandleEvents(chain ChainInterface, logs []types.Log) error {
//}
var ormTxErr error
for _, txLog := range allLogs {
for _, tlog := range allLogs {
txLog := &tlog
if err := s.filterTransferOut(chain, txLog, ctx); err != nil {
ormTxErr = err
break
......@@ -131,6 +166,11 @@ func (s *Dao) filterTransferOut(chain ChainInterface, txLog *types.Log, ctx cont
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferOut log")
return err
}
log.WithFields(log.Fields{
"chain": chain.Name(),
"from_chain": event.FromChainID.Int64(),
"out_id": event.OutId.Int64(),
}).Info("process transfer out event")
blocktime, _ := s.GetBlockTime(chain.GetChain(), int64(txLog.BlockNumber))
dbEvent := &dbModel.BridgeEvent{
FromChain: event.FromChainID.Int64(),
......@@ -169,6 +209,13 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog *types.Log, ctx conte
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferIn log")
return err
}
log.WithFields(log.Fields{
"chain": chain.Name(),
"from_chain": event.FromChainID.Int64(),
"out_id": event.OutId.Int64(),
"to_chain": chain.GetChain().ChainId,
"in_id": event.InId.Int64(),
}).Info("process transfer in event")
blocktime, _ := s.GetBlockTime(chain.GetChain(), int64(txLog.BlockNumber))
dbEvent := &dbModel.BridgeEvent{
......@@ -201,6 +248,11 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog *types.Log, ctx conte
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log")
return err
}
log.WithFields(log.Fields{
"chain": chain.Name(),
"to_chain": chain.GetChain().ChainId,
"in_id": event.InId.Int64(),
}).Info("process transfer in execution event")
dbevent, err := s.GetBridgeEventByInId(ctx, chain.GetChain().ChainId, event.InId.Int64())
if err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
......@@ -231,6 +283,11 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog *types.Log, ctx conte
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInConfirmation log")
return err
}
log.WithFields(log.Fields{
"chain": chain.Name(),
"to_chain": chain.GetChain().ChainId,
"in_id": event.InId.Int64(),
}).Info("process transfer in confirmation event")
dbevent, err := s.GetBridgeEventByInId(ctx, chain.GetChain().ChainId, event.InId.Int64())
if err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
......@@ -258,6 +315,11 @@ func (s *Dao) filterTransferIn(chain ChainInterface, txLog *types.Log, ctx conte
log.WithField("chain", chain.Name()).WithError(err).Error("parse TransferInExecution log")
return err
}
log.WithFields(log.Fields{
"chain": chain.Name(),
"to_chain": chain.GetChain().ChainId,
"in_id": event.InId.Int64(),
}).Info("process transfer in rejection event")
dbevent, err := s.GetBridgeEventByInId(ctx, chain.GetChain().ChainId, event.InId.Int64())
if err != nil {
log.WithField("chain", chain.Name()).WithFields(log.Fields{
......
......@@ -168,7 +168,8 @@ func (s *Dao) AddValidatorOp(ctx context.Context, toChainId int64, inId int64, v
}
filter := bson.M{"to_chain": toChainId, "in_id": inId}
_, err := collection.UpdateOne(ctx, filter, update)
opts := options.Update().SetUpsert(true)
_, err := collection.UpdateOne(ctx, filter, update, opts)
return err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment