Commit 470a1620 authored by vicotor's avatar vicotor

fix bug for kafka

parent 5af25f2f
...@@ -31,6 +31,8 @@ type Node struct { ...@@ -31,6 +31,8 @@ type Node struct {
wg sync.WaitGroup wg sync.WaitGroup
status string status string
reg *registry.Registry reg *registry.Registry
receiptCh chan *odysseus.TaskReceipt
} }
func (n *Node) ServiceType() common.ServiceType { func (n *Node) ServiceType() common.ServiceType {
...@@ -70,9 +72,8 @@ func NewNode() *Node { ...@@ -70,9 +72,8 @@ func NewNode() *Node {
rdb: rdb, rdb: rdb,
quit: make(chan struct{}), quit: make(chan struct{}),
conf: config.GetConfig(), conf: config.GetConfig(),
receiptCh: make(chan *odysseus.TaskReceipt, 100000),
} }
brokers := strings.Split(node.conf.Kafka.Brokers, ";")
node.kafkaProducer, _ = utils.NewKafkaProducer(brokers)
node.status = "before running" node.status = "before running"
node.register() node.register()
...@@ -81,12 +82,44 @@ func NewNode() *Node { ...@@ -81,12 +82,44 @@ func NewNode() *Node {
func (n *Node) Start() error { func (n *Node) Start() error {
n.status = "running" n.status = "running"
go n.startProducer()
if n.reg != nil { if n.reg != nil {
go n.reg.Start() go n.reg.Start()
} }
return n.startAllTask() return n.startAllTask()
} }
func (n *Node) startProducer() {
brokers := strings.Split(n.conf.Kafka.Brokers, ";")
running := true
for {
producer, err := utils.NewKafkaProducer(brokers)
if err != nil {
log.WithError(err).Error("create kafka producer failed")
time.Sleep(time.Second)
continue
}
n.kafkaProducer = producer
for running {
select {
case <-n.quit:
return
case receipt, ok := <-n.receiptCh:
if !ok {
return
}
if err := utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic); err != nil {
log.WithError(err).WithField("task-id", receipt.TaskId).Error("fire task receipt failed")
} else {
if err == sarama.ErrClosedClient {
running = false
}
}
}
}
}
}
func (n *Node) Stop() { func (n *Node) Stop() {
close(n.quit) close(n.quit)
n.wg.Wait() n.wg.Wait()
...@@ -113,28 +146,7 @@ func (n *Node) register() { ...@@ -113,28 +146,7 @@ func (n *Node) register() {
n.reg = reg n.reg = reg
} }
func (n *Node) Loop(idx int) { func (n *Node) postResult(task *odysseus.TaskContent, result *odysseus.TaskResponse) error {
defer n.wg.Done()
defer log.WithField("routine", idx).Info("node loop routine exit")
// monitor kafka
taskCh := make(chan *odysseus.TaskContent, 1000)
ctx, cancel := context.WithCancel(context.Background())
var client sarama.ConsumerGroup
var err error
for {
client, err = n.attachKafkaConsumer(ctx, taskCh)
if err != nil {
log.WithError(err).Error("attach kafka consumer failed try again later")
time.Sleep(time.Second)
continue
}
break
}
log.WithField("routine", idx).Info("attach kafka consumer success")
postResult := func(task *odysseus.TaskContent, result *odysseus.TaskResponse) error {
d, _ := proto.Marshal(result) d, _ := proto.Marshal(result)
err := utils.Post(task.TaskCallback, d) err := utils.Post(task.TaskCallback, d)
if err != nil { if err != nil {
...@@ -147,8 +159,9 @@ func (n *Node) Loop(idx int) { ...@@ -147,8 +159,9 @@ func (n *Node) Loop(idx int) {
n.cache.RollbackForFee(uid, fee) n.cache.RollbackForFee(uid, fee)
return err return err
} }
postReceipt := func(task *odysseus.TaskContent, result *odysseus.TaskResponse, err error) error {
func (n *Node) postReceipt(task *odysseus.TaskContent, result *odysseus.TaskResponse, err error) error {
receipt := new(odysseus.TaskReceipt) receipt := new(odysseus.TaskReceipt)
receipt.TaskId = task.TaskId receipt.TaskId = task.TaskId
receipt.TaskTimestamp = task.TaskTimestamp receipt.TaskTimestamp = task.TaskTimestamp
...@@ -169,11 +182,42 @@ func (n *Node) Loop(idx int) { ...@@ -169,11 +182,42 @@ func (n *Node) Loop(idx int) {
default: default:
receipt.TaskResult = "internal error" receipt.TaskResult = "internal error"
} }
return utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic) n.receiptCh <- receipt
} return nil
}
func (n *Node) Loop(idx int) {
defer n.wg.Done()
defer log.WithField("routine", idx).Info("node loop routine exit")
var (
err error
running bool = true
client sarama.ConsumerGroup
)
// monitor kafka
taskCh := make(chan *odysseus.TaskContent, 1000)
ctx, cancel := context.WithCancel(context.Background())
for { for {
client, err = attachKafkaConsumer(ctx, taskCh, n)
if err != nil {
log.WithError(err).Error("attach kafka consumer failed try again later")
time.Sleep(time.Second)
n.status = "attach kafka failed"
continue
}
log.WithField("routine", idx).Info("attach kafka consumer success")
n.status = "running"
for running {
select { select {
case clientErr := <-client.Errors():
log.WithError(clientErr).Error("kafka consumer error")
if clientErr == sarama.ErrClosedClient {
running = false
}
case <-n.quit: case <-n.quit:
cancel() cancel()
client.Close() client.Close()
...@@ -184,7 +228,6 @@ func (n *Node) Loop(idx int) { ...@@ -184,7 +228,6 @@ func (n *Node) Loop(idx int) {
go func(ctx context.Context, task *odysseus.TaskContent) { go func(ctx context.Context, task *odysseus.TaskContent) {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithField("task", task).Info("get task") l.WithField("task", task).Info("get task")
// todo: add parameter for re-dispatch count.
for { for {
worker, err := PopWorker(ctx, n.rdb) worker, err := PopWorker(ctx, n.rdb)
if err == ErrNoWorker || err == ErrTimeout { if err == ErrNoWorker || err == ErrTimeout {
...@@ -196,10 +239,10 @@ func (n *Node) Loop(idx int) { ...@@ -196,10 +239,10 @@ func (n *Node) Loop(idx int) {
TaskError: err.Error(), TaskError: err.Error(),
} }
l.WithError(err).Error("pop worker failed") l.WithError(err).Error("pop worker failed")
if e := postReceipt(task, result, err); e != nil { if e := n.postReceipt(task, result, err); e != nil {
l.WithError(e).Error("post task receipt failed") l.WithError(e).Error("post task receipt failed")
} }
if e := postResult(task, result); e != nil { if e := n.postResult(task, result); e != nil {
l.WithError(e).Error("post task result failed") l.WithError(e).Error("post task result failed")
} }
break break
...@@ -219,12 +262,12 @@ func (n *Node) Loop(idx int) { ...@@ -219,12 +262,12 @@ func (n *Node) Loop(idx int) {
} }
} }
}(fctx, t) }(fctx, t)
}
} }
} }
} }
func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.TaskContent) (sarama.ConsumerGroup, error) { func attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.TaskContent, n *Node) (sarama.ConsumerGroup, error) {
config := sarama.NewConfig() config := sarama.NewConfig()
config.Consumer.Return.Errors = true config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
...@@ -275,7 +318,6 @@ type Consumer struct { ...@@ -275,7 +318,6 @@ type Consumer struct {
// Setup is run at the beginning of a new session, before ConsumeClaim // Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready // Mark the consumer as ready
close(c.ready)
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment