Commit 657295ea authored by vicotor's avatar vicotor

add task retry interval param.

parent 8d7b2819
debug = false
task_loop_interval = 30
task_retry_interval = 30
aeskey = "/app/keys/aes.key"
otpkey = "/app/keys/val.otp"
......
......@@ -5,12 +5,12 @@ import (
)
type Config struct {
Debug bool
TaskLoopInterval int `toml:"task_loop_interval"`
AesKeyPath string `toml:"aeskey"`
OTPKeyPath string `toml:"otpkey"`
Chains map[string]*ChainConfig `toml:"chains"`
MySQL MySQLConfig
Debug bool
TaskRetryInterval int `toml:"task_retry_interval"`
AesKeyPath string `toml:"aeskey"`
OTPKeyPath string `toml:"otpkey"`
Chains map[string]*ChainConfig `toml:"chains"`
MySQL MySQLConfig
}
type ChainConfig struct {
......
......@@ -29,8 +29,8 @@ func (d *Dao) SetStorageHeight(key string, intValue int64) (err error) {
func (d *Dao) GetUnprocessedBridgeEvents(limit int, offset int) (events []*dbModel.BridgeEvent, err error) {
err = d.db.Model(&dbModel.BridgeEvent{}).
Where("`from_chain_tx_hash` != '' AND `to_chain_status` < ? AND `validator_status` = ?", 2, constant.ValidatorStatusNoPrecess).
Order("created_at ASC").
Where("`from_chain_tx_hash` != '' AND `to_chain_status` < ? AND `validator_status` = ?", constant.TransferChainExecuted, constant.ValidatorStatusNoPrecess).
Order("updated_at ASC").
Limit(limit).
Offset(offset).
Find(&events).Error
......
......@@ -183,7 +183,7 @@ func (d *Dao) SubmitInTransfer(event *dbModel.BridgeEvent) error {
if tx, err := ca.SubmitInTransfer(opts, param); err != nil {
log.WithField("chainId", chain.conf.ChainId).WithError(err).Error("failed to submit in transfer")
return err
return d.UpdateBridgeValidatorOperation(event, constant.ValidatorStatusNoPrecess)
} else {
// wait tx result and update validator status.
receipt, err := bind.WaitMined(context.Background(), chain.cli, tx)
......@@ -218,7 +218,7 @@ func (d *Dao) CheckEventValid() bool {
func (d *Dao) HandleTasks() {
defer d.wg.Done()
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(10 * time.Second)
maxCount := 30
offset := 0
defer ticker.Stop()
......@@ -232,9 +232,6 @@ func (d *Dao) HandleTasks() {
if d.validatorPk == nil {
continue
}
if d.c.TaskLoopInterval >= 1 {
ticker.Reset(time.Duration(d.c.TaskLoopInterval) * time.Second)
}
// select unprocessed bridge event with maxCount.
events, err := d.GetUnprocessedBridgeEvents(maxCount, offset)
if err != nil {
......@@ -261,6 +258,11 @@ func (d *Dao) HandleTasks() {
continue
}
if event.UpdatedAt.Compare(event.CreatedAt) > 0 && (int(event.UpdatedAt.Sub(time.Now()).Seconds()) < d.c.TaskRetryInterval) {
// skip recently failed tasks.
continue
}
if err := d.SubmitInTransfer(event); err != nil {
log.WithError(err).WithFields(log.Fields{
"fromChain": event.FromChain,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment