Commit 3fde79b9 authored by vicotor's avatar vicotor

add producer for test

parent e14edcb0
.PHONY: default scheduler all clean fmt docker .PHONY: default scheduler producer all clean fmt docker
GOBIN = $(shell pwd)/build/bin GOBIN = $(shell pwd)/build/bin
TAG ?= latest TAG ?= latest
...@@ -10,7 +10,7 @@ AppName := scheduler ...@@ -10,7 +10,7 @@ AppName := scheduler
default: scheduler default: scheduler
all: scheduler all: scheduler producer
BUILD_FLAGS = -tags netgo -ldflags "\ BUILD_FLAGS = -tags netgo -ldflags "\
-X github.com/odysseus/scheduler/versions.AppName=${AppName} \ -X github.com/odysseus/scheduler/versions.AppName=${AppName} \
...@@ -25,6 +25,10 @@ scheduler: ...@@ -25,6 +25,10 @@ scheduler:
go build $(BUILD_FLAGS) -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/scheduler go build $(BUILD_FLAGS) -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/scheduler
@echo "Done building." @echo "Done building."
producer:
go build $(BUILD_FLAGS) -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/producer
@echo "Done building."
clean: clean:
rm -fr build/* rm -fr build/*
docker: docker:
......
package main
import (
"runtime"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
Execute()
}
package main
import (
"github.com/IBM/sarama"
"github.com/gogo/protobuf/proto"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/utils"
log "github.com/sirupsen/logrus"
"strconv"
"strings"
"time"
)
// NewKafka Create a new KafkaProducer.
func NewKafka(brokers []string) (sarama.AsyncProducer, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Net.ResolveCanonicalBootstrapServers = false
producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
if err != nil {
return nil, err
}
go func() {
for _ = range producer.Errors() {
//log.Printf("Failed to send log entry to kafka: %v\n", err)
}
}()
return producer, nil
}
func Fire(producer sarama.AsyncProducer, task *odysseus.TaskContent, topic string) error {
// Check time for partition key
var partitionKey sarama.StringEncoder
partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02"))
b, err := proto.Marshal(task)
if err != nil {
return err
}
value := sarama.ByteEncoder(b)
producer.Input() <- &sarama.ProducerMessage{
Key: partitionKey,
Topic: topic,
Value: value,
}
return nil
}
func produceTask() {
addr := config.GetConfig().Kafka.Brokers
borkers := strings.Split(addr, ";")
producer, err := NewKafka(borkers)
if err != nil {
panic(err)
}
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()
for {
select {
case <-ticker.C:
task := makeTask()
if err := Fire(producer, task, config.GetConfig().Kafka.Topic); err != nil {
log.WithError(err).Error("fire task failed")
}
}
}
}
func makeTask() *odysseus.TaskContent {
task := &odysseus.TaskContent{
TaskId: strconv.FormatInt(utils.GetSnowflakeId(), 10),
TaskType: odysseus.TaskType_CustomTask,
TaskCmd: "echo \"hello world\"",
TaskParam: []byte("give me five"),
TaskTimestamp: uint64(time.Now().Unix()),
TaskCallback: "http://localhost:8080/callback",
TaskUid: "1111111111",
TaskFee: "120000",
}
return task
}
package main
import (
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/odysseus/scheduler/config"
"github.com/rifflock/lfshook"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"time"
"os"
"sync"
)
var logLevel string
var logPath string
// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Use: "producer",
Short: "A simulate producer command-line interface",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
runProducer()
},
}
func Execute() {
if err := RootCmd.Execute(); err != nil {
log.Fatal(err)
os.Exit(-1)
}
}
func init() {
cobra.OnInitialize(initConfig)
RootCmd.PersistentFlags().StringVar(&logLevel, "loglevel", "info", "log level")
RootCmd.PersistentFlags().StringVar(&logPath, "logpath", "", "log path")
}
// initConfig reads in config file and ENV variables if set.
func initConfig() {
InitLog()
viper.SetConfigName("config") // name of config file (without extension)
viper.AddConfigPath(".")
viper.AutomaticEnv() // read in environment variables that match
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {
//log.Info("Using config file:", viper.ConfigFileUsed())
} else {
log.WithField("error", err).Fatal("Read config failed")
return
}
conf, err := config.ParseConfig(viper.ConfigFileUsed())
if err != nil {
log.WithField("error", err).Fatal("parse config failed")
}
log.Infof("start with config: %+v", conf)
}
func runProducer() {
produceTask()
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
func getLogLevel(level string) log.Level {
switch level {
case "info":
return log.InfoLevel
case "debug":
return log.DebugLevel
case "error":
return log.ErrorLevel
default:
return log.InfoLevel
}
}
func InitLog() {
// standard setting
log.SetLevel(getLogLevel(logLevel))
log.SetFormatter(&log.TextFormatter{FullTimestamp: true, TimestampFormat: "2006-01-02 15:04:05.000"})
// file system logger setting
if logPath != "" {
localFilesystemLogger(logPath)
}
}
func logWriter(logPath string) *rotatelogs.RotateLogs {
logFullPath := logPath
logwriter, err := rotatelogs.New(
logFullPath+".%Y%m%d",
rotatelogs.WithLinkName(logFullPath),
rotatelogs.WithRotationSize(100*1024*1024), // 100MB
rotatelogs.WithRotationTime(24*time.Hour),
)
if err != nil {
panic(err)
}
return logwriter
}
func localFilesystemLogger(logPath string) {
lfHook := lfshook.NewHook(logWriter(logPath), &log.TextFormatter{FullTimestamp: true, TimestampFormat: "2006-01-02 15:04:05.000"})
log.AddHook(lfHook)
}
endpoint="127.0.0.1:10002" endpoint="127.0.0.1:10002"
metrics_port = 28012 metrics_port = 28012
routines = 10 routines = 1
[redis] [redis]
addr="127.0.0.1:6379" addr="127.0.0.1:6379"
...@@ -8,5 +8,6 @@ password="123456" ...@@ -8,5 +8,6 @@ password="123456"
db=0 db=0
[kafka] [kafka]
brokers="192.168.1.220:9092" #brokers="192.168.1.220:9092"
brokers="192.168.1.108:9092"
topic="pbaigc" topic="pbaigc"
...@@ -12,7 +12,7 @@ type RedisConfig struct { ...@@ -12,7 +12,7 @@ type RedisConfig struct {
DbIndex int `json:"db_index" toml:"db_index"` DbIndex int `json:"db_index" toml:"db_index"`
} }
type KafkaConfig struct { type KafkaConfig struct {
Brokers string `json:"addr" toml:"addr"` Brokers string `json:"brokers" toml:"brokers"`
Topic string `json:"topic" toml:"topic"` Topic string `json:"topic" toml:"topic"`
} }
......
...@@ -93,7 +93,8 @@ func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.Ta ...@@ -93,7 +93,8 @@ func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.Ta
config := sarama.NewConfig() config := sarama.NewConfig()
config.Consumer.Return.Errors = true config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Consumer.Offsets.Initial = sarama.OffsetOldest
//config.Consumer.Offsets.Initial = sarama.OffsetNewest
// split broker to list // split broker to list
brokers := strings.Split(n.conf.Kafka.Brokers, ";") brokers := strings.Split(n.conf.Kafka.Brokers, ";")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment