Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
ai-api-mgr
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Odysseus
ai-api-mgr
Commits
f04f45a6
Commit
f04f45a6
authored
Mar 01, 2024
by
Your Name
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
use slog.Attrs
parent
d95a2004
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
240 additions
and
69 deletions
+240
-69
main.go
main.go
+240
-69
No files found.
main.go
View file @
f04f45a6
...
...
@@ -71,6 +71,8 @@ var producerMessagesBytes = make(chan bytesAndHeader, 1000)
// Channel to send messages from the Kafka consumer to the /consumer endpoint
var
consumerMessages
=
make
(
chan
string
)
const
TaskIdAtrr
=
"Task-Id"
func
kafkaProducerBytes
()
{
// Create a new Sarama configuration for the Kafka producer.
...
...
@@ -83,7 +85,10 @@ func kafkaProducerBytes() {
// Create a new Kafka producer using the specified configuration and broker addresses.
producer
,
err
=
sarama
.
NewAsyncProducer
(
kafkaBrokers
,
config
)
if
err
!=
nil
{
slog
.
Error
(
"sarama.NewAsyncProducer"
,
"err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaProducerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"NewAsyncProducer"
,
err
.
Error
()))
...
)
//slog.Error("sarama.NewAsyncProducer", "err", err.Error())
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
...
...
@@ -128,8 +133,10 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
consumer
,
err
=
sarama
.
NewConsumer
(
kafkaBrokers
,
config
)
if
err
!=
nil
{
slog
.
Error
(
"kafkaConsumerBytes"
,
"err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"NewConsumer"
,
err
.
Error
()))
...
)
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
}
...
...
@@ -145,7 +152,10 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
reqConsumer
,
err
=
consumer
.
ConsumePartition
(
req
,
0
,
sarama
.
OffsetNewest
)
if
err
!=
nil
{
slog
.
Error
(
"consumer.ConsumePartition"
,
"err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"ConsumePartition"
,
err
.
Error
()))
...
)
//slog.Error("consumer.ConsumePartition", "err", err.Error())
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
...
...
@@ -160,23 +170,30 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
case
message
:=
<-
reqConsumer
.
Messages
()
:
if
message
==
nil
{
slog
.
Error
(
"kafka consumer"
,
"topic"
,
req
,
"message"
,
"is nil"
)
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"topic"
,
req
),
slog
.
String
(
"message"
,
err
.
Error
()))
...
)
//slog.Error("kafka consumer", "topic", req, "message", "is nil")
continue
}
if
message
.
Value
==
nil
{
slog
.
Error
(
"kafka consumer"
,
"topic"
,
req
,
"message vaule"
,
"is nil"
)
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"topic"
,
req
),
slog
.
String
(
"value"
,
err
.
Error
()))
...
)
//slog.Error("kafka consumer", "topic", req, "message vaule", "is nil")
continue
}
pbMsg
:=
pbUpstream
.
TaskContent
{}
if
err
:=
gogoPbProto
.
Unmarshal
(
message
.
Value
,
&
pbMsg
);
err
!=
nil
{
slog
.
Error
(
"kafka consumer"
,
"topic"
,
req
,
"gogoPbProto.Unmarshal err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"topic"
,
req
),
slog
.
String
(
"pbUnmarshal"
,
err
.
Error
()))
...
)
//slog.Error("kafka consumer", "topic", req, "gogoPbProto.Unmarshal err", err.Error())
continue
}
slog
.
Info
(
"kafka consumer"
,
"topic"
,
req
,
"pbMsg.TaskId"
,
pbMsg
.
TaskId
)
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
pbMsg
.
TaskId
))
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelInfo
,
"<- kafka consumer"
,
append
(
baseAttributes
,
slog
.
String
(
"topic"
,
req
))
...
)
select
{
case
<-
done
:
...
...
@@ -200,8 +217,10 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
consumer
,
err
=
sarama
.
NewConsumer
(
kafkaBrokers
,
config
)
if
err
!=
nil
{
slog
.
Error
(
"kafkaConsumerBytes"
,
"err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"NewConsumer"
,
err
.
Error
()))
...
)
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
}
...
...
@@ -232,7 +251,8 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
resConsumer
,
err
=
consumer
.
ConsumePartition
(
resTopic
,
0
,
sarama
.
OffsetNewest
)
if
err
!=
nil
{
slog
.
Error
(
"consumer.ConsumePartition"
,
"err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"ConsumePartition"
,
err
.
Error
()),
slog
.
String
(
"topic"
,
resTopic
))
...
)
//slog.Error("consumer.ConsumePartition", "err", err.Error())
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
...
...
@@ -253,23 +273,35 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
case
message
:=
<-
resConsumer
.
Messages
()
:
if
message
==
nil
{
slog
.
Error
(
"kafka consumer"
,
"topic"
,
resTopic
,
"message"
,
"is nil"
)
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"message"
,
err
.
Error
()),
slog
.
String
(
"topic"
,
resTopic
))
...
)
//slog.Error("kafka consumer", "topic", resTopic, "message", "is nil")
continue
}
if
message
.
Value
==
nil
{
slog
.
Error
(
"kafka consumer"
,
"topic"
,
resTopic
,
"message vaule"
,
"is nil"
)
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"value"
,
err
.
Error
()),
slog
.
String
(
"topic"
,
resTopic
))
...
)
//slog.Error("kafka consumer", "topic", resTopic, "message vaule", "is nil")
continue
}
pbMsg
:=
pbUpstream
.
TaskReceipt
{}
if
err
:=
gogoPbProto
.
Unmarshal
(
message
.
Value
,
&
pbMsg
);
err
!=
nil
{
slog
.
Error
(
"kafka consumer"
,
"topic"
,
resTopic
,
"gogoPbProto.Unmarshal err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"kafkaConsumerBytes"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"pbUnmarshal"
,
err
.
Error
()),
slog
.
String
(
"topic"
,
resTopic
))
...
)
//slog.Error("kafka consumer", "topic", resTopic, "gogoPbProto.Unmarshal err", err.Error())
continue
}
slog
.
Info
(
"kafka consumer"
,
"topic"
,
resTopic
,
"pbMsg.TaskId"
,
pbMsg
.
TaskId
)
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
pbMsg
.
TaskId
))
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelInfo
,
"<- kafka consumer"
,
append
(
baseAttributes
,
slog
.
String
(
"topic"
,
resTopic
))
...
)
//slog.Info("kafka consumer", "topic", resTopic, "pbMsg.TaskId", pbMsg.TaskId)
select
{
case
<-
done
:
...
...
@@ -297,8 +329,10 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
for
{
sender
,
err
=
qdb
.
NewLineSender
(
ctx
,
addrOpt
)
if
err
!=
nil
{
slog
.
Error
(
"qdb.NewLineSender"
,
"err"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"batchToQuestDb"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"NewLineSender"
,
err
.
Error
()))
...
)
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
}
...
...
@@ -313,7 +347,9 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
nanoseconds
:=
int64
(
task
.
TaskTimestamp
)
seconds
:=
nanoseconds
/
1e9
slog
.
Debug
(
"questdb <- resStream"
,
"task.TaskId"
,
task
.
TaskId
,
"task.TaskUid"
,
task
.
TaskUid
)
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
task
.
TaskId
))
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelDebug
,
"questdb <- resStream"
,
append
(
baseAttributes
,
slog
.
String
(
"TaskUid"
,
task
.
TaskUid
))
...
)
for
{
...
...
@@ -334,7 +370,9 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
err
=
sender
.
Flush
(
ctx
)
if
err
!=
nil
{
slog
.
Error
(
"task := <-resStream"
,
"error"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"batchToQuestDb"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"<-resStream"
,
err
.
Error
()))
...
)
//slog.Error("task := <-resStream", "error", err.Error())
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
...
...
@@ -346,11 +384,17 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
taskFeeAsInt
,
err
:=
strconv
.
Atoi
(
task
.
TaskFee
)
if
err
!=
nil
{
slog
.
Error
(
"task.TaskFee string to int"
,
"error"
,
err
)
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"batchToQuestDb"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"<-reqStream"
,
err
.
Error
()))
...
)
//slog.Error("task.TaskFee string to int", "error", err)
continue
}
slog
.
Debug
(
"questdb <- reqStream"
,
"task.TaskId"
,
task
.
TaskId
,
"task.TaskUid"
,
task
.
TaskUid
)
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
task
.
TaskId
))
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelDebug
,
"questdb <- reqStream"
,
append
(
baseAttributes
,
slog
.
String
(
"task.TaskUid"
,
task
.
TaskUid
))
...
)
//slog.Debug("questdb <- reqStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid)
nanoseconds
:=
int64
(
task
.
TaskTimestamp
)
seconds
:=
nanoseconds
/
1e9
...
...
@@ -369,7 +413,10 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
err
=
sender
.
Flush
(
ctx
)
if
err
!=
nil
{
slog
.
Error
(
"task := <-reqStream"
,
"error"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"batchToQuestDb"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"sender.Flush"
,
err
.
Error
()))
...
)
//slog.Error("task := <-reqStream", "error", err.Error())
time
.
Sleep
(
time
.
Second
*
3
)
}
else
{
break
...
...
@@ -464,7 +511,8 @@ func createTable(ipAddr, tableSql string) error {
return
err
}
slog
.
Info
(
"createTable"
,
"response body"
,
body
)
_
=
body
//slog.Info("createTable", "response-body", body)
return
nil
}
...
...
@@ -510,12 +558,10 @@ func main() {
flag
.
Parse
()
fmt
.
Println
(
"log server"
,
logserver
)
levelVar
:=
&
slog
.
LevelVar
{}
levelVar
.
Set
(
slog
.
Level
(
logLevel
))
slog
.
Info
(
"log level"
,
"level"
,
levelVar
.
String
())
slog
.
Info
(
"log level"
,
"level"
,
levelVar
.
String
()
,
"log server"
,
logserver
,
"version"
,
version
.
VERSION
)
// slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
// Level: levelVar,
...
...
@@ -531,31 +577,84 @@ func main() {
slog
.
SetDefault
(
slog
.
New
(
slogloki
.
Option
{
Level
:
levelVar
,
Client
:
client
}
.
NewLokiHandler
())
.
With
(
"api"
,
version
.
VERSION
))
//slog.SetDefault(slog.New(slogloki.Option{Level: levelVar, Client: client}.NewLokiHandler()).With("api", "v0.0.15")
)
defer
client
.
Stop
(
)
//logger := slog.New(slogloki.Option{Level: slog.LevelDebug, Client: client}.NewLokiHandler())
// logger = logger.With("release", "v1.0.0")
// for i := 0; i < 10000; i++ {
// //logger.Debug("debug log server", "i", i)
// slog.Debug("debug log server", "i", i)
// time.Sleep(time.Second)
// }
// logger.Error("A message")
questAttributes
:=
[]
slog
.
Attr
{
slog
.
String
(
"addr"
,
questAddr
),
}
defer
client
.
Stop
()
kafkaAttributes
:=
[]
slog
.
Attr
{
slog
.
String
(
"addr"
,
kafkaBroker
),
slog
.
String
(
"task-topic"
,
aigcProduceTopic
),
slog
.
String
(
"bill-topic"
,
aigcConsumerTopic
),
}
slog
.
Warn
(
"start param"
,
"quest"
,
slog
.
GroupValue
(
slog
.
String
(
"Addr"
,
questAddr
)))
slog
.
Warn
(
"start param"
,
"kafka"
,
slog
.
GroupValue
(
slog
.
String
(
"Addr"
,
kafkaBroker
)),
slog
.
Group
(
"topic"
,
"aigcProduceTopic"
,
aigcProduceTopic
,
"aigcConsumerTopic"
,
aigcConsumerTopic
))
slog
.
Warn
(
"start param"
,
"callback"
,
slog
.
GroupValue
(
slog
.
String
(
"Addr"
,
callbackAddrP
)))
mysqlAttributes
:=
[]
slog
.
Attr
{
slog
.
String
(
"addr"
,
mysqlAddr
),
slog
.
Int
(
"port"
,
mysqlPort
),
slog
.
String
(
"DbName"
,
mysqlDbName
),
slog
.
String
(
"User"
,
mysqlUser
),
slog
.
String
(
"PassWd"
,
mysqlPassWd
),
}
slog
.
Warn
(
"start param"
,
"listenIpPort"
,
slog
.
GroupValue
(
slog
.
String
(
"Addr"
,
listenIpPort
)))
slog
.
Warn
(
"start param"
,
"redis"
,
slog
.
GroupValue
(
slog
.
String
(
"Addr"
,
redisAddr
),
slog
.
String
(
"PassWd"
,
redisPassWd
)))
slog
.
Warn
(
"start param"
,
"mysql"
,
slog
.
GroupValue
(
slog
.
String
(
"Addr"
,
mysqlAddr
),
slog
.
Int
(
"port"
,
mysqlPort
),
slog
.
String
(
"DbName"
,
mysqlDbName
),
slog
.
String
(
"User"
,
mysqlUser
),
slog
.
String
(
"PassWd"
,
mysqlPassWd
)))
//, slog.GroupValue(slog.Int("port",mysqlPort),slog.String("mysqlDbName",mysqlDbName),slog.GroupValue(slog.String("mysqlUser",mysqlUser)))))
redisAttributes
:=
[]
slog
.
Attr
{
slog
.
String
(
"addr"
,
redisAddr
),
slog
.
String
(
"passWd"
,
redisPassWd
),
}
slog
.
Warn
(
"start param"
,
"!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!"
,
withBillDb
)
slog
.
Warn
(
"start param"
,
"!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!"
,
withBillDb
)
slog
.
Warn
(
"start param"
,
"!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!"
,
withBillDb
)
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
"------------------------------"
))
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
"callback"
,
callbackAddrP
))
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
"listenIpPort"
,
listenIpPort
))
baseAttributes
=
append
(
baseAttributes
,
slog
.
Bool
(
"withBillDB"
,
withBillDb
))
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
"version"
,
version
.
VERSION
))
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
"buildTime"
,
version
.
BUILD_TIME
))
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
"gitBranch"
,
version
.
GIT_BRANCH
))
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
"gitCommit"
,
version
.
COMMIT_SHA1
))
attributes
:=
append
(
[]
slog
.
Attr
{
{
Key
:
"kafka"
,
Value
:
slog
.
GroupValue
(
kafkaAttributes
...
),
},
{
Key
:
"questDB"
,
Value
:
slog
.
GroupValue
(
questAttributes
...
),
},
{
Key
:
"mysql"
,
Value
:
slog
.
GroupValue
(
mysqlAttributes
...
),
},
{
Key
:
"redis"
,
Value
:
slog
.
GroupValue
(
redisAttributes
...
),
},
},
baseAttributes
...
,
)
// baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
//newAttributes := append(baseAttributes, slog.String("match_path", c.Route().Path))
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelWarn
,
"start param"
,
attributes
...
)
// slog.Warn("start param", "quest", slog.GroupValue(slog.String("Addr", questAddr)))
// slog.Warn("start param", "kafka", slog.GroupValue(slog.String("Addr", kafkaBroker)), slog.Group("topic", "aigcProduceTopic", aigcProduceTopic, "aigcConsumerTopic", aigcConsumerTopic))
// slog.Warn("start param", "callback", slog.GroupValue(slog.String("Addr", callbackAddrP)))
// slog.Warn("start param", "listenIpPort", slog.GroupValue(slog.String("Addr", listenIpPort)))
// slog.Warn("start param", "redis", slog.GroupValue(slog.String("Addr", redisAddr), slog.String("PassWd", redisPassWd)))
// slog.Warn("start param", "mysql", slog.GroupValue(slog.String("Addr", mysqlAddr),
// slog.Int("port", mysqlPort), slog.String("DbName", mysqlDbName), slog.String("User", mysqlUser), slog.String("PassWd", mysqlPassWd))) //, slog.GroupValue(slog.Int("port",mysqlPort),slog.String("mysqlDbName",mysqlDbName),slog.GroupValue(slog.String("mysqlUser",mysqlUser)))))
// slog.Warn("start param", "With-Bill-DB", withBillDb)
//slog.Warn("start param", "!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!", withBillDb)
// slog.Warn("start param", "!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!", withBillDb)
// slog.Warn("start param", "!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!", withBillDb)
//go Registry(redisAddr, redisPassWd)
...
...
@@ -591,6 +690,22 @@ func main() {
// Create a new instance of the Fiber web framework.
app
:=
fiber
.
New
()
cfg
:=
slogfiber
.
Config
{
WithUserAgent
:
true
,
WithRequestID
:
true
,
WithRequestBody
:
false
,
WithRequestHeader
:
true
,
WithResponseBody
:
false
,
WithResponseHeader
:
true
,
WithSpanID
:
true
,
WithTraceID
:
true
,
}
_
=
cfg
//app := fiber.New()
//app.Use(slogfiber.NewWithConfig(slog.Default(), cfg))
app
.
Use
(
slogfiber
.
New
(
slog
.
Default
()))
app
.
Use
(
recover
.
New
())
...
...
@@ -599,16 +714,16 @@ func main() {
callbackGroup
:=
app
.
Group
(
"/callback"
)
apiGroupV1
:=
apiGroup
.
Group
(
"/v1"
)
apiGroupV1
.
Post
(
"/*"
,
Api
And
JWT
)
apiGroupV1
.
Post
(
"/*"
,
Api
Or
JWT
)
jwtGroupV1
:=
jwtGroup
.
Group
(
"/v1"
)
jwtGroupV1
.
Post
(
"/*"
,
Api
And
JWT
)
jwtGroupV1
.
Post
(
"/*"
,
Api
Or
JWT
)
callbackGroupV1
:=
callbackGroup
.
Group
(
"/v1"
)
callbackGroupV1
.
Post
(
"/"
,
func
(
c
*
fiber
.
Ctx
)
error
{
slog
.
Debug
(
"callback"
,
"path"
,
c
.
Route
()
.
Path
)
slog
.
LogAttrs
(
c
.
UserContext
(),
slog
.
LevelInfo
,
"callback"
,
slog
.
String
(
"path"
,
c
.
Route
()
.
Path
)
)
body
:=
c
.
Body
()
...
...
@@ -618,7 +733,10 @@ func main() {
return
c
.
SendString
(
fmt
.
Sprintf
(
"callback Unmarshal error %v"
,
err
.
Error
()))
}
slog
.
Debug
(
"callback"
,
"task-id"
,
resbody
.
TaskId
,
"result"
,
resbody
.
TaskIsSucceed
,
"uid"
,
resbody
.
TaskUid
)
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
resbody
.
TaskId
))
slog
.
LogAttrs
(
c
.
UserContext
(),
slog
.
LevelDebug
,
"callback"
,
append
(
baseAttributes
,
slog
.
Bool
(
"TaskIsSucceed"
,
resbody
.
TaskIsSucceed
),
slog
.
String
(
"TaskUid"
,
resbody
.
TaskUid
))
...
)
res
(
resbody
)
...
...
@@ -634,17 +752,23 @@ func main() {
replanceQueryTask
=
&
task
if
err
:=
app
.
Listen
(
listenIpPort
);
err
!=
nil
{
slog
.
Error
(
"app.Listen"
,
"err"
,
err
.
Error
()
)
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"fiber listen"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"err"
,
err
.
Error
()))
...
)
}
}
var
replanceQueryTask
*
model
.
TaskType
func
ApiAndJWT
(
c
*
fiber
.
Ctx
)
error
{
slog
.
Debug
(
"ApiAndJWT"
,
"path"
,
c
.
Route
()
.
Path
)
func
ApiOrJWT
(
c
*
fiber
.
Ctx
)
error
{
reqHeaders
:=
c
.
GetReqHeaders
()
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
reqHeaders
[
TaskIdAtrr
][
0
]))
newAttributes
:=
append
(
baseAttributes
,
slog
.
String
(
"match_path"
,
c
.
Route
()
.
Path
))
slog
.
LogAttrs
(
c
.
UserContext
(),
slog
.
LevelInfo
,
"new Api or JWT reuqest"
,
newAttributes
...
)
//slog.Debug("ApiAndJWT", "path", c.Route().Path)
//return c.SendString(reqHeaders["Task-Id"][0])
routePath
:=
c
.
Route
()
.
Path
...
...
@@ -665,7 +789,22 @@ func ApiAndJWT(c *fiber.Ctx) error {
uidAsInt
,
err
:=
strconv
.
Atoi
(
uid
[
0
])
slog
.
Info
(
"query db param"
,
"pathInDB"
,
pathInDB
,
"uid"
,
int64
(
uidAsInt
))
queryDbAttributes
:=
[]
slog
.
Attr
{
slog
.
String
(
"path"
,
pathInDB
),
slog
.
Int
(
"uid"
,
uidAsInt
),
}
attributes
:=
append
(
[]
slog
.
Attr
{
{
Key
:
"param"
,
Value
:
slog
.
GroupValue
(
queryDbAttributes
...
),
},
},
baseAttributes
...
,
)
slog
.
LogAttrs
(
c
.
UserContext
(),
slog
.
LevelDebug
,
"query db"
,
attributes
...
)
var
task
*
model
.
TaskType
...
...
@@ -696,24 +835,43 @@ func ApiAndJWT(c *fiber.Ctx) error {
ContainerPubkey
:
[]
byte
(
task
.
PublicKey
),
}
slog
.
Debug
(
"msg to kafka"
,
"TaskId"
,
pbMsg
.
TaskId
,
"TaskType"
,
pbMsg
.
TaskType
,
"TaskKind"
,
pbMsg
.
TaskKind
,
"TaskCmd"
,
pbMsg
.
TaskCmd
,
"TaskParam"
,
pbMsg
.
TaskParam
,
"TaskTimestamp"
,
pbMsg
.
TaskTimestamp
,
"TaskCallback"
,
pbMsg
.
TaskCallback
,
"TaskUid"
,
pbMsg
.
TaskUid
,
"TaskFee"
,
pbMsg
.
TaskFee
,
"TaskInLen"
,
pbMsg
.
TaskInLen
,
"TaskWorkload"
,
pbMsg
.
TaskWorkload
,
"ContainerPubkey"
,
pbMsg
.
ContainerPubkey
)
msgToKafkaAttributes
:=
[]
slog
.
Attr
{
slog
.
String
(
"TaskId"
,
pbMsg
.
TaskId
),
slog
.
Uint64
(
"TaskType"
,
pbMsg
.
TaskType
),
slog
.
Int
(
"TaskType"
,
int
(
pbMsg
.
TaskKind
)),
slog
.
String
(
"TaskCmd"
,
pbMsg
.
TaskCmd
),
}
kafkaattributes
:=
append
(
[]
slog
.
Attr
{
{
Key
:
"pb msg"
,
Value
:
slog
.
GroupValue
(
msgToKafkaAttributes
...
),
},
},
baseAttributes
...
,
)
slog
.
LogAttrs
(
c
.
UserContext
(),
slog
.
LevelDebug
,
"-> kafka producer"
,
kafkaattributes
...
)
// slog.Debug("msg to kafka",
// "TaskId", pbMsg.TaskId,
// "TaskType", pbMsg.TaskType,
// "TaskKind", pbMsg.TaskKind,
// "TaskCmd", pbMsg.TaskCmd,
// "TaskParam", pbMsg.TaskParam,
// "TaskTimestamp", pbMsg.TaskTimestamp,
// "TaskCallback", pbMsg.TaskCallback,
// "TaskUid", pbMsg.TaskUid,
// "TaskFee", pbMsg.TaskFee,
// "TaskInLen", pbMsg.TaskInLen,
// "TaskWorkload", pbMsg.TaskWorkload,
// "ContainerPubkey", pbMsg.ContainerPubkey)
pbBytes
,
err
:=
gogoPbProto
.
Marshal
(
&
pbMsg
)
if
err
!=
nil
{
slog
.
Error
(
"pbUpstream.TaskContent"
,
"gogoPbProto.Marshal"
,
err
.
Error
()
)
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"ApiOrJWT"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
"pbMarshal"
,
err
.
Error
()))
...
)
return
c
.
SendString
(
fmt
.
Sprintf
(
"pb error: %v"
,
err
.
Error
()))
}
...
...
@@ -736,7 +894,11 @@ func ApiAndJWT(c *fiber.Ctx) error {
}
}
slog
.
Info
(
"task info"
,
"taskid"
,
pbMsg
.
TaskId
,
"asyncMode"
,
asyncMode
)
//slog.Info("client", "taskid", pbMsg.TaskId, "asyncMode", asyncMode)
asyncModeAttributes
:=
append
(
baseAttributes
,
slog
.
Bool
(
"async"
,
asyncMode
))
slog
.
LogAttrs
(
c
.
UserContext
(),
slog
.
LevelInfo
,
"client request mode"
,
asyncModeAttributes
...
)
if
asyncMode
{
//time.Sleep(10 * time.Second)
...
...
@@ -768,7 +930,13 @@ func syncModeF(c *fiber.Ctx, taskid string) error {
wait
:=
req
(
taskid
)
resAsPb
:=
<-
wait
slog
.
Debug
(
"resAsPb.TaskResultHeader"
,
"resAsPb.TaskResultHeader"
,
resAsPb
.
TaskResultHeader
)
baseAttributes
:=
[]
slog
.
Attr
{}
baseAttributes
=
append
(
baseAttributes
,
slog
.
String
(
TaskIdAtrr
,
taskid
))
newAttributes
:=
append
(
baseAttributes
,
slog
.
String
(
"TaskResultHeader"
,
string
(
resAsPb
.
TaskResultHeader
)))
slog
.
LogAttrs
(
c
.
UserContext
(),
slog
.
LevelInfo
,
"new Api or JWT reuqest"
,
newAttributes
...
)
//slog.Debug("resAsPb.TaskResultHeader", "resAsPb.TaskResultHeader", resAsPb.TaskResultHeader)
if
resAsPb
.
TaskResultHeader
!=
nil
{
if
len
(
resAsPb
.
TaskResultHeader
)
!=
0
{
...
...
@@ -776,7 +944,10 @@ func syncModeF(c *fiber.Ctx, taskid string) error {
headers
:=
make
(
map
[
string
][]
string
)
if
err
:=
json
.
Unmarshal
(
resAsPb
.
TaskResultHeader
,
&
headers
);
err
!=
nil
{
slog
.
Error
(
"syncModeF"
,
"json.Unmarshal(resAsPb.TaskResultHeader"
,
err
.
Error
())
slog
.
LogAttrs
(
context
.
Background
(),
slog
.
LevelError
,
"syncModeF"
,
append
([]
slog
.
Attr
{},
slog
.
String
(
TaskIdAtrr
,
taskid
),
slog
.
String
(
"json.Unmarshal"
,
err
.
Error
()))
...
)
//slog.Error("syncModeF", "json.Unmarshal(resAsPb.TaskResultHeader", err.Error())
return
c
.
SendString
(
fmt
.
Sprintf
(
"json.Unmarshal(resAsPb.TaskResultHeader error: %v"
,
err
.
Error
()))
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment