Commit e81211b2 authored by vicotor's avatar vicotor

add retry for connect kafka

parent 5086d64a
......@@ -82,11 +82,20 @@ func (n *Node) Loop(idx int) {
// monitor kafka
taskCh := make(chan *odysseus.TaskContent, 1000)
ctx, cancel := context.WithCancel(context.Background())
client, err := n.attachKafkaConsumer(ctx, taskCh)
var client sarama.ConsumerGroup
var err error
for {
client, err = n.attachKafkaConsumer(ctx, taskCh)
if err != nil {
log.WithError(err).Error("attach kafka consumer failed")
return
log.WithError(err).Error("attach kafka consumer failed try again later")
time.Sleep(time.Second)
continue
}
break
}
log.WithField("routine", idx).Info("attach kafka consumer success")
postResult := func(task *odysseus.TaskContent, result *odysseus.TaskResponse) error {
d, _ := proto.Marshal(result)
err := utils.Post(task.TaskCallback, d)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment