Commit 16d5d242 authored by Wade's avatar Wade

add log

parent 8b3ec242
......@@ -9,7 +9,7 @@ paths:
summary: Store Milvus index data
description: Stores content data for Milvus indexing, including user information and optional metadata.
tags:
- Indexing
- RAG
requestBody:
required: true
content:
......@@ -55,7 +55,7 @@ paths:
summary: Store GraphRAG index data
description: Stores content data for GraphRAG indexing, including user information and optional metadata.
tags:
- Indexing
- RAG
requestBody:
required: true
content:
......
......@@ -48,6 +48,8 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mbleigh/raymond v0.0.0-20250414171441-6b3a58ab9e0a // indirect
github.com/microsoft/kiota-abstractions-go v1.9.2 // indirect
github.com/microsoft/kiota-authentication-azure-go v1.3.0 // indirect
......@@ -60,11 +62,13 @@ require (
github.com/microsoftgraph/msgraph-sdk-go-core v1.3.2 // indirect
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.10-0.20240819025435-512e3b98866a // indirect
github.com/milvus-io/milvus-sdk-go/v2 v2.4.2 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/ollama/ollama v0.6.5 // indirect
github.com/pgvector/pgvector-go v0.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/rs/zerolog v1.34.0 // indirect
github.com/std-uritemplate/std-uritemplate/go/v2 v2.0.3 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/swaggo/files v1.0.1 // indirect
......
......@@ -49,6 +49,7 @@ github.com/cohesion-org/deepseek-go v1.3.1/go.mod h1:bOVyKj38r90UEYZFrmJOzJKPxuA
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
......@@ -101,6 +102,7 @@ github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
github.com/goccy/go-yaml v1.17.1/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/googleapis v1.4.1/go.mod h1:2lpHqI5OcWCtVElxXnPt+s8oJvMpySlOyM6xDCrzib4=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
......@@ -206,11 +208,17 @@ github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUt
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/mbleigh/raymond v0.0.0-20250414171441-6b3a58ab9e0a h1:v2cBA3xWKv2cIOVhnzX/gNgkNXqiHfUgJtA3r61Hf7A=
github.com/mbleigh/raymond v0.0.0-20250414171441-6b3a58ab9e0a/go.mod h1:Y6ghKH+ZijXn5d9E7qGGZBmjitx7iitZdQiIW97EpTU=
......@@ -245,6 +253,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
......@@ -274,6 +284,9 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
......@@ -445,7 +458,10 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
......
package main
import (
"io"
"os"
"runtime"
"strings"
"github.com/natefinch/lumberjack"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// 获取调用者的包名
func getPackageName() string {
pc, _, _, ok := runtime.Caller(2) // 2 表示跳过当前函数和上一层
if !ok {
return "unknown"
}
fullName := runtime.FuncForPC(pc).Name()
parts := strings.Split(fullName, ".")
pkg := strings.Join(parts[:len(parts)-1], ".")
return pkg
}
func loggingInit() {
// debug := flag.Bool("debug", false, "sets log level to debug")
// flag.Parse()
zerolog.SetGlobalLevel(zerolog.InfoLevel)
// if *debug {
// zerolog.SetGlobalLevel(zerolog.DebugLevel)
// }
// // Configure log rotation with lumberjack
lumberjackLogger := &lumberjack.Logger{
Filename: "/var/log/agent_chat.log",
//Filename: "./tweet.log",
MaxSize: 1, // Max size in megabytes before log is rotated
MaxBackups: 3, // Max number of old log files to retain
MaxAge: 28, // Max number of days to retain old log files
Compress: true, // Compress old log files
}
// // Initialize a logger with rotating file output
// logger := zerolog.New(lumberjackLogger).With().Timestamp().Logger()
// log.Logger = logger
//log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout})
multi := io.MultiWriter(zerolog.ConsoleWriter{Out: os.Stdout}, lumberjackLogger)
//zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}
logger := zerolog.New(multi).
Level(zerolog.TraceLevel).
With().
Timestamp().
// Str("package", getPackageName()). // 动态添加包名
Caller().
Int("pid", os.Getpid()).
Logger()
log.Logger = logger
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
log.Debug().Msg("This message appears only when log level set to Debug")
log.Info().Msg("This message appears when log level set to Debug or Info")
//updatePool()
}
......@@ -3,13 +3,14 @@ package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"strings"
"github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/genkit"
"github.com/rs/zerolog"
"github.com/wade-liwei/agentchat/plugins/deepseek"
"github.com/wade-liwei/agentchat/plugins/graphrag"
"github.com/wade-liwei/agentchat/plugins/milvus"
......@@ -21,18 +22,18 @@ import (
httpSwagger "github.com/swaggo/http-swagger"
_ "github.com/wade-liwei/agentchat/docs" // 导入生成的 Swagger 文档
"github.com/rs/zerolog/log"
"github.com/wade-liwei/agentchat/util"
)
type ChatInput struct {
Content string `json:"content,omitempty"`
Model string `json:"model,omitempty"`
APIKey string `json:"apiKey,omitempty"`
From string `json:"from,omitempty"` // 替换 Username
FromID string `json:"from_id,omitempty"` // 替换 UserID
To string `json:"to,"`
ToID string `json:"to_id,omitempty"`
Content string `json:"content,omitempty"`
Model string `json:"model,omitempty"`
APIKey string `json:"apiKey,omitempty"`
From string `json:"from,omitempty"` // 替换 Username
FromID string `json:"from_id,omitempty"` // 替换 UserID
To string `json:"to,"`
ToID string `json:"to_id,omitempty"`
}
// DocumentInput 结构体用于文档索引接口
......@@ -52,26 +53,33 @@ type GraphInput struct {
}
const simpleQaPromptTemplate = `
You're a helpful agent that answers the user's common questions based on the context provided.
You're a helpful agent that answers the user's questions with a tone and style shaped by the specified personality.
Here is the user's query: {{query}}
Here is the context you should use: {{context}}
Here is the context you should use: {{context}} from Milvus
Graph context: {{graph}}
Previous conversation summary: {{summary}}
Please provide the best answer you can.
Personality to adopt: {{personality}}
Please provide a response that aligns with the given personality while leveraging the provided context, graph, and conversation summary.
`
func main() {
ctx := context.Background()
debug := flag.Bool("debug", false, "sets log level to debug")
flag.Parse()
ds := deepseek.DeepSeek{
APIKey: "sk-9f70df871a7c4b8aa566a3c7a0603706",
}
loggingInit()
mil := milvus.Milvus{
Addr: "54.92.111.204:19530", // Milvus gRPC endpoint
if *debug {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
ctx := context.Background()
metrics := []evaluators.MetricConfig{
{
MetricType: evaluators.EvaluatorDeepEqual,
......@@ -84,25 +92,20 @@ func main() {
},
}
graph := graphrag.GraphKnowledge{
Addr: "54.92.111.204:5670",
}
g, err := genkit.Init(ctx, genkit.WithPlugins(
&deepseek.DeepSeek{APIKey: "sk-9f70df871a7c4b8aa566a3c7a0603706"},
&milvus.Milvus{Addr: "54.92.111.204:19530"},
&graphrag.GraphKnowledge{Addr: "54.92.111.204:5670"},
&googlegenai.GoogleAI{APIKey: "AIzaSyCoYBOmnwRWlH_-nT25lpn8pMg3T18Q0uI"},
&evaluators.GenkitEval{Metrics: metrics}))
g, err := genkit.Init(ctx, genkit.WithPlugins(&ds, &mil, &graph, &googlegenai.GoogleAI{APIKey: "AIzaSyCoYBOmnwRWlH_-nT25lpn8pMg3T18Q0uI"}, &evaluators.GenkitEval{Metrics: metrics}))
if err != nil {
log.Fatal(err)
log.Fatal().Msg(err.Error())
}
// m := ds.DefineModel(g,
// deepseek.ModelDefinition{
// Name: "deepseek-chat", // Choose an appropriate model
// Type: "chat", // Must be chat for tool support
// },
// nil)
embedder := googlegenai.GoogleAIEmbedder(g, "embedding-001")
if embedder == nil {
log.Fatal("embedder is not defined")
log.Fatal().Msg(err.Error())
}
// Configure collection
......@@ -116,37 +119,43 @@ func main() {
// Define indexer and retriever
indexer, retriever, err := milvus.DefineIndexerAndRetriever(ctx, g, cfg)
if err != nil {
log.Fatalf("DefineIndexerAndRetriever failed: %v", err)
log.Fatal().Msgf("DefineIndexerAndRetriever failed: %v", err)
}
_ = retriever
// 定义文档索引流
genkit.DefineFlow(g, "index/document", func(ctx context.Context, input *DocumentInput) (string, error) {
genkit.DefineFlow(g, "index/document", func(ctx context.Context, input *DocumentInput) (Response, error) {
if input.Metadata == nil{
input.Metadata =make(map[string]any)
if input.Metadata == nil {
input.Metadata = make(map[string]any)
}
input.Metadata[util.UserIdKey]=input.UserID
input.Metadata[util.UserNameKey]= input.Username
input.Metadata[util.UserIdKey] = input.UserID
input.Metadata[util.UserNameKey] = input.Username
doc := ai.DocumentFromText(input.Content, input.Metadata)
err := indexer.Index(ctx, &ai.IndexerRequest{
Documents: []*ai.Document{doc},
})
if err != nil {
return "", fmt.Errorf("index document: %w", err)
return Response{
Code: 500,
Msg: err.Error(),
}, nil
}
return "Document indexed successfully", nil
return Response{
Code: 200,
Msg: "Document indexed successfully",
}, nil
})
graphIndexer, graphRetriever, err := graphrag.DefineIndexerAndRetriever(ctx, g)
_ = graphRetriever
if err != nil {
log.Fatal().Msgf("graphrag.DefineIndexerAndRetriever failed: %v", err)
}
genkit.DefineFlow(g, "index/graph", func(ctx context.Context, input *GraphInput) (string, error) {
genkit.DefineFlow(g, "index/graph", func(ctx context.Context, input *GraphInput) (Response, error) {
opt := graphrag.IndexReqOption{
UserId: input.UserID,
......@@ -159,7 +168,10 @@ func main() {
// Generate random docName.
docName, err := graphrag.GenerateRandomDocName(8)
if err != nil {
return "", fmt.Errorf("generate random docName for document %w", err)
return Response{
Code: 500,
Msg: fmt.Sprintf("generate random docName for document %w", err),
}, nil
}
input.Metadata[graphrag.DocNameKey] = docName
resDocName = docName
......@@ -175,9 +187,16 @@ func main() {
Options: &opt,
})
if err != nil {
return "", fmt.Errorf("index document: %w", err)
return Response{
Code: 500,
Msg: fmt.Sprintf("index document: %w", err),
}, nil
}
return fmt.Sprintf("Document indexed successfully, docname %s", resDocName), nil
return Response{
Code: 200,
Msg: fmt.Sprintf("Document indexed successfully, docname %s", resDocName),
}, nil
})
simpleQaPrompt, err := genkit.DefinePrompt(g, "simpleQaPrompt",
......@@ -187,17 +206,16 @@ func main() {
ai.WithOutputFormat(ai.OutputFormatText),
)
if err != nil {
log.Fatal(err)
log.Fatal().Msg(err.Error())
}
qa,err :=InitQAStore()
if err != nil{
log.Fatalf("InitQAStore failed: %v", err)
qa, err := InitQAStore()
if err != nil {
log.Fatal().Msgf("InitQAStore failed: %v", err)
}
// Define a simple flow that generates jokes about a given topic
genkit.DefineFlow(g, "chat", func(ctx context.Context, input *ChatInput) (string, error) {
genkit.DefineFlow(g, "chat", func(ctx context.Context, input *ChatInput) (Response, error) {
inputAsJson, err := json.Marshal(input)
......@@ -205,71 +223,84 @@ func main() {
return "", err
}
fmt.Println("input-------------------------------", string(inputAsJson))
log.Info().Msgf("input--------%s", string(inputAsJson))
qa.WriteQA(context.Background(),QA{
FromID: &input.FromID, //*string // 可空的 from_id
From: &input.From, //*string // 可空的 from
Question: &input.Content, //*string // 可空的问题
//Answer: //*string // 可空的答案
//Summary //*string // 可空的摘要
To: &input.To, //*string // 可空的 to
ToID: &input.ToID, //*string // 可空的 to_id
idx, lastQa, lastok, err := qa.WriteAndGetLatestQA(context.Background(), QA{
FromID: &input.FromID,
From: &input.From,
Question: &input.Content,
To: &input.To,
ToID: &input.ToID,
})
//qa.GetLatestQA(context.Background(),&input.FromID)
metaData := make(map[string]any)
metaData[util.UserIdKey]= input.ToID
metaData[util.UserNameKey]= input.To
dRequest := ai.DocumentFromText(input.Content, metaData)
response, err := ai.Retrieve(ctx, retriever, ai.WithDocs(dRequest))
if err != nil {
return "", err
}
for _, d := range response.Documents {
fmt.Println("d.Content[0].Text", d.Content[0].Text)
}
qaAsJson, err := json.Marshal(lastQa)
graphResponse, err := ai.Retrieve(ctx, graphRetriever, ai.WithDocs(dRequest))
if err != nil {
return "", err
}
for _, d := range graphResponse.Documents {
fmt.Println("d.Content[0].Text", d.Content[0].Text)
}
log.Info().Msgf("qaAsJson--------%s", string(qaAsJson))
promptInput := &simpleQaPromptInput{
Query: input.Content,
}
var sb strings.Builder
for _, d := range response.Documents {
sb.WriteString(d.Content[0].Text)
sb.WriteByte('\n')
if lastok && lastQa.Summary != nil {
promptInput.Summary = *lastQa.Summary
}
metaData := make(map[string]any)
metaData[util.UserIdKey] = input.ToID
metaData[util.UserNameKey] = input.To
dRequest := ai.DocumentFromText(input.Content, metaData)
response, err := ai.Retrieve(ctx, retriever, ai.WithDocs(dRequest))
if err != nil {
log.Error().Msgf("milvus Retrieve err.Error() %s", err.Error())
} else {
var sb strings.Builder
for _, d := range response.Documents {
sb.WriteString(d.Content[0].Text)
sb.WriteByte('\n')
}
promptInput := &simpleQaPromptInput{
Query: input.Content,
Context: sb.String(),
promptInput.Context = sb.String()
log.Info().Msgf("promptInput.Context: %s", promptInput.Context)
}
fmt.Println("sb.String():",sb.String(),"input.Content:",input.Content)
graphResponse, err := ai.Retrieve(ctx, graphRetriever, ai.WithDocs(dRequest))
if err != nil {
log.Error().Msgf("graph Retrieve err.Error() %s", err.Error())
} else {
var sb strings.Builder
for _, d := range graphResponse.Documents {
sb.WriteString(d.Content[0].Text)
sb.WriteByte('\n')
}
promptInput.Graph = sb.String()
log.Info().Msgf("promptInput.Graph : %s", promptInput.Graph)
}
resp, err := simpleQaPrompt.Execute(ctx, ai.WithInput(promptInput))
if err != nil {
return "", err
return Response{
Code: 500,
Msg: fmt.Sprintf("index document: %w", err),
}, nil
}
return resp.Text(), nil
//ai.WithPrompt(promptInput))
//ai.WithPrompt(`Tell silly short jokes about apple`)
qa.UpdateQAFields(context.Background(), idx, "", resp.Text())
return Response{
Data: resp.Text(),
Code: 200,
Msg: fmt.Sprintf("Document indexed successfully, docname %s", resDocName),
}, nil
})
// 配置限速器:每秒 10 次请求,突发容量 20,最大并发 5
......@@ -294,11 +325,19 @@ func main() {
// 启动服务器,监听
log.Printf("Server starting on 0.0.0.0:8000")
if err := server.Start(ctx, "0.0.0.0:8000", mux); err != nil {
log.Fatalf("Server failed: %v", err)
log.Fatal().Msgf("Server failed: %v", err)
}
}
type simpleQaPromptInput struct {
Query string `json:"query"`
Context string `json:"context"`
Graph string `json:"graph"`
Summary string `json:"summary"`
}
type Response struct {
Data string `json:"data"`
Code int `json:"code"`
Msg string `json:"msg"`
}
......@@ -815,8 +815,8 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
return nil, fmt.Errorf("req.Query.Metadata must be not nil, got type %T", req.Options)
}
for k,v := range req.Query.Metadata {
fmt.Println("k",k,"v",v)
for k, v := range req.Query.Metadata {
fmt.Println("k", k, "v", v)
}
// Extract username and user_id from req.Query.Metadata
......
......@@ -386,108 +386,105 @@ type docStore struct {
// }, nil
// }
// newDocStore creates a docStore.
func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docStore, error) {
if m.client == nil {
return nil, errors.New("milvus.Init not called")
}
// Check/create collection.
exists, err := m.client.HasCollection(ctx, cfg.Collection)
if err != nil {
return nil, fmt.Errorf("failed to check collection %q: %v", cfg.Collection, err)
}
if !exists {
// Define schema with textField as primary key, plus user_id and username fields.
schema := &entity.Schema{
CollectionName: cfg.Collection,
Fields: []*entity.Field{
{
Name: vectorField,
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": fmt.Sprintf("%d", cfg.Dimension),
},
},
{
Name: textField,
DataType: entity.FieldTypeVarChar,
PrimaryKey: true, // Enforce unique constraint on text field
TypeParams: map[string]string{
"max_length": "65535", // Maximum length for VARCHAR
},
},
{
Name: metadataField,
DataType: entity.FieldTypeJSON,
},
{
Name: "user_id",
DataType: entity.FieldTypeVarChar,
TypeParams: map[string]string{
"max_length": "128", // Reasonable length for user_id
},
},
{
Name: "username",
DataType: entity.FieldTypeVarChar,
TypeParams: map[string]string{
"max_length": "128", // Reasonable length for username
},
},
},
}
err = m.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)
if err != nil {
return nil, fmt.Errorf("failed to create collection %q: %v", cfg.Collection, err)
}
// Create HNSW index for vectorField.
index, err := entity.NewIndexHNSW(
entity.L2,
8, // M
96, // efConstruction
)
if err != nil {
return nil, fmt.Errorf("entity.NewIndexHNSW: %v", err)
}
err = m.client.CreateIndex(ctx, cfg.Collection, vectorField, index, false)
if err != nil {
return nil, fmt.Errorf("failed to create index: %v", err)
}
}
// Load collection.
err = m.client.LoadCollection(ctx, cfg.Collection, false)
if err != nil {
return nil, fmt.Errorf("failed to load collection %q: %v", cfg.Collection, err)
}
// Convert EmbedderOptions to map[string]interface{}.
var embedderOptions map[string]interface{}
if cfg.EmbedderOptions != nil {
opts, ok := cfg.EmbedderOptions.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("EmbedderOptions must be a map[string]interface{}, got %T", cfg.EmbedderOptions)
}
embedderOptions = opts
} else {
embedderOptions = make(map[string]interface{})
}
return &docStore{
client: m.client,
collection: cfg.Collection,
dimension: cfg.Dimension,
embedder: cfg.Embedder,
embedderOptions: embedderOptions,
}, nil
}
if m.client == nil {
return nil, errors.New("milvus.Init not called")
}
// Check/create collection.
exists, err := m.client.HasCollection(ctx, cfg.Collection)
if err != nil {
return nil, fmt.Errorf("failed to check collection %q: %v", cfg.Collection, err)
}
if !exists {
// Define schema with textField as primary key, plus user_id and username fields.
schema := &entity.Schema{
CollectionName: cfg.Collection,
Fields: []*entity.Field{
{
Name: vectorField,
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": fmt.Sprintf("%d", cfg.Dimension),
},
},
{
Name: textField,
DataType: entity.FieldTypeVarChar,
PrimaryKey: true, // Enforce unique constraint on text field
TypeParams: map[string]string{
"max_length": "65535", // Maximum length for VARCHAR
},
},
{
Name: metadataField,
DataType: entity.FieldTypeJSON,
},
{
Name: "user_id",
DataType: entity.FieldTypeVarChar,
TypeParams: map[string]string{
"max_length": "128", // Reasonable length for user_id
},
},
{
Name: "username",
DataType: entity.FieldTypeVarChar,
TypeParams: map[string]string{
"max_length": "128", // Reasonable length for username
},
},
},
}
err = m.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)
if err != nil {
return nil, fmt.Errorf("failed to create collection %q: %v", cfg.Collection, err)
}
// Create HNSW index for vectorField.
index, err := entity.NewIndexHNSW(
entity.L2,
8, // M
96, // efConstruction
)
if err != nil {
return nil, fmt.Errorf("entity.NewIndexHNSW: %v", err)
}
err = m.client.CreateIndex(ctx, cfg.Collection, vectorField, index, false)
if err != nil {
return nil, fmt.Errorf("failed to create index: %v", err)
}
}
// Load collection.
err = m.client.LoadCollection(ctx, cfg.Collection, false)
if err != nil {
return nil, fmt.Errorf("failed to load collection %q: %v", cfg.Collection, err)
}
// Convert EmbedderOptions to map[string]interface{}.
var embedderOptions map[string]interface{}
if cfg.EmbedderOptions != nil {
opts, ok := cfg.EmbedderOptions.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("EmbedderOptions must be a map[string]interface{}, got %T", cfg.EmbedderOptions)
}
embedderOptions = opts
} else {
embedderOptions = make(map[string]interface{})
}
return &docStore{
client: m.client,
collection: cfg.Collection,
dimension: cfg.Dimension,
embedder: cfg.Embedder,
embedderOptions: embedderOptions,
}, nil
}
// Indexer returns the indexer for a collection.
func Indexer(g *genkit.Genkit, collection string) ai.Indexer {
......@@ -509,8 +506,6 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
return nil
}
// Embed documents.
ereq := &ai.EmbedRequest{
Input: req.Documents,
......@@ -535,17 +530,16 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
// If ok, we don't use the User struct since the requirement is to error on non-nil
return fmt.Errorf("req.Query.Metadata must be not nil, got type %T", req.Options)
}
// Extract username and user_id from req.Query.Metadata
userName, ok := doc.Metadata[util.UserNameKey].(string)
if !ok {
return fmt.Errorf("req.Query.Metadata must provide username key")
return fmt.Errorf("req.Query.Metadata must provide username key")
}
userId, ok := doc.Metadata[util.UserIdKey].(string)
if !ok {
return fmt.Errorf("req.Query.Metadata must provide user_id key")
return fmt.Errorf("req.Query.Metadata must provide user_id key")
}
var sb strings.Builder
for _, p := range doc.Content {
......@@ -569,7 +563,7 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
rows = append(rows, row)
// Debug: Log row contents.
fmt.Printf("Row %d: vector_len=%d, text=%q,userId=%s,username=%s,metadata=%v\n", i, len(emb.Embedding), text,userId,userName,metadata)
fmt.Printf("Row %d: vector_len=%d, text=%q,userId=%s,username=%s,metadata=%v\n", i, len(emb.Embedding), text, userId, userName, metadata)
}
// Debug: Log total rows.
......@@ -584,7 +578,6 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
return nil
}
// RetrieverOptions for Milvus retrieval.
type RetrieverOptions struct {
Count int `json:"count,omitempty"` // Max documents to retrieve.
......@@ -594,13 +587,11 @@ type RetrieverOptions struct {
// // Retrieve implements the Retriever.Retrieve method.
// func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
// if req.Query.Metadata == nil {
// // If ok, we don't use the User struct since the requirement is to error on non-nil
// return nil, fmt.Errorf("req.Query.Metadata must be not nil, got type %T", req.Options)
// }
// // Extract username and user_id from req.Query.Metadata
// userName, ok := req.Query.Metadata[util.UserNameKey].(string)
// if !ok {
......@@ -611,7 +602,6 @@ type RetrieverOptions struct {
// return nil, fmt.Errorf("req.Query.Metadata must provide user_id key")
// }
// count := 3 // Default.
// metricTypeStr := "L2"
// if req.Options != nil {
......@@ -731,147 +721,144 @@ type RetrieverOptions struct {
// }, nil
// }
// Retrieve implements the Retriever.Retrieve method.
func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
if req.Query.Metadata == nil {
return nil, fmt.Errorf("req.Query.Metadata must be not nil, got type %T", req.Query.Metadata)
}
// Extract username and user_id from req.Query.Metadata
userName, ok := req.Query.Metadata[util.UserNameKey].(string)
if !ok {
return nil, fmt.Errorf("req.Query.Metadata must provide username key")
}
userId, ok := req.Query.Metadata[util.UserIdKey].(string)
if !ok {
return nil, fmt.Errorf("req.Query.Metadata must provide user_id key")
}
count := 3 // Default.
metricTypeStr := "L2"
if req.Options != nil {
ropt, ok := req.Options.(*RetrieverOptions)
if !ok {
return nil, fmt.Errorf("milvus.Retrieve options have type %T, want %T", req.Options, &RetrieverOptions{})
}
if ropt.Count > 0 {
count = ropt.Count
}
if ropt.MetricType != "" {
metricTypeStr = ropt.MetricType
}
}
// Retrieve implements the Retriever.Retrieve method.
func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
if req.Query.Metadata == nil {
return nil, fmt.Errorf("req.Query.Metadata must be not nil, got type %T", req.Query.Metadata)
}
// Extract username and user_id from req.Query.Metadata
userName, ok := req.Query.Metadata[util.UserNameKey].(string)
if !ok {
return nil, fmt.Errorf("req.Query.Metadata must provide username key")
}
userId, ok := req.Query.Metadata[util.UserIdKey].(string)
if !ok {
return nil, fmt.Errorf("req.Query.Metadata must provide user_id key")
}
count := 3 // Default.
metricTypeStr := "L2"
if req.Options != nil {
ropt, ok := req.Options.(*RetrieverOptions)
if !ok {
return nil, fmt.Errorf("milvus.Retrieve options have type %T, want %T", req.Options, &RetrieverOptions{})
}
if ropt.Count > 0 {
count = ropt.Count
}
if ropt.MetricType != "" {
metricTypeStr = ropt.MetricType
}
}
// Map string metric type to entity.MetricType.
var metricType entity.MetricType
switch metricTypeStr {
case "L2":
metricType = entity.L2
case "IP":
metricType = entity.IP
default:
return nil, fmt.Errorf("unsupported metric type: %s", metricTypeStr)
}
// Embed query.
ereq := &ai.EmbedRequest{
Input: []*ai.Document{req.Query},
Options: ds.embedderOptions,
}
eres, err := ds.embedder.Embed(ctx, ereq)
if err != nil {
return nil, fmt.Errorf("milvus retrieve embedding failed: %v", err)
}
if len(eres.Embeddings) == 0 {
return nil, errors.New("no embeddings generated for query")
}
queryVector := entity.FloatVector(eres.Embeddings[0].Embedding)
// Create search parameters.
searchParams, err := entity.NewIndexHNSWSearchParam(64) // ef
if err != nil {
return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
}
// Define filter expression for user_id
expr := fmt.Sprintf("user_id == %q", userId)
// Perform vector search to get IDs, text, and metadata.
results, err := ds.client.Search(
ctx,
ds.collection,
[]string{}, // partitions
expr, // Filter by user_id
[]string{textField, metadataField}, // Output fields: text and metadata
[]entity.Vector{queryVector},
vectorField,
metricType,
count,
searchParams,
)
if err != nil {
return nil, fmt.Errorf("milvus search failed: %v", err)
}
// Process search results.
var docs []*ai.Document
for _, result := range results {
// Find text and metadata columns in search results.
var textCol, metaCol entity.Column
for _, col := range result.Fields {
if col.Name() == textField {
textCol = col
}
if col.Name() == metadataField {
metaCol = col
}
}
// Ensure text column exists.
if textCol == nil {
return nil, fmt.Errorf("text column %s not found in search results", textField)
}
// Iterate over rows (assuming columns have same length).
for i := 0; i < result.ResultCount; i++ {
// Get text value.
text, err := textCol.GetAsString(i)
if err != nil {
fmt.Printf("Failed to parse text at index %d: %v\n", i, err)
continue
}
// Get metadata value (optional, as metadata column may be missing).
var metadata map[string]interface{}
if metaCol != nil {
metaStr, err := metaCol.GetAsString(i)
if err == nil && metaStr != "" {
if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
fmt.Printf("Failed to parse metadata at index %d: %v\n", i, err)
continue
}
} else if err != nil {
fmt.Printf("Failed to get metadata string at index %d: %v\n", i, err)
}
}
// Ensure metadata includes user_id and username from query
if metadata == nil {
metadata = make(map[string]interface{})
}
metadata[util.UserIdKey] = userId
metadata[util.UserNameKey] = userName
// Create document.
doc := ai.DocumentFromText(text, metadata)
docs = append(docs, doc)
}
}
return &ai.RetrieverResponse{
Documents: docs,
}, nil
}
\ No newline at end of file
// Map string metric type to entity.MetricType.
var metricType entity.MetricType
switch metricTypeStr {
case "L2":
metricType = entity.L2
case "IP":
metricType = entity.IP
default:
return nil, fmt.Errorf("unsupported metric type: %s", metricTypeStr)
}
// Embed query.
ereq := &ai.EmbedRequest{
Input: []*ai.Document{req.Query},
Options: ds.embedderOptions,
}
eres, err := ds.embedder.Embed(ctx, ereq)
if err != nil {
return nil, fmt.Errorf("milvus retrieve embedding failed: %v", err)
}
if len(eres.Embeddings) == 0 {
return nil, errors.New("no embeddings generated for query")
}
queryVector := entity.FloatVector(eres.Embeddings[0].Embedding)
// Create search parameters.
searchParams, err := entity.NewIndexHNSWSearchParam(64) // ef
if err != nil {
return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
}
// Define filter expression for user_id
expr := fmt.Sprintf("user_id == %q", userId)
// Perform vector search to get IDs, text, and metadata.
results, err := ds.client.Search(
ctx,
ds.collection,
[]string{}, // partitions
expr, // Filter by user_id
[]string{textField, metadataField}, // Output fields: text and metadata
[]entity.Vector{queryVector},
vectorField,
metricType,
count,
searchParams,
)
if err != nil {
return nil, fmt.Errorf("milvus search failed: %v", err)
}
// Process search results.
var docs []*ai.Document
for _, result := range results {
// Find text and metadata columns in search results.
var textCol, metaCol entity.Column
for _, col := range result.Fields {
if col.Name() == textField {
textCol = col
}
if col.Name() == metadataField {
metaCol = col
}
}
// Ensure text column exists.
if textCol == nil {
return nil, fmt.Errorf("text column %s not found in search results", textField)
}
// Iterate over rows (assuming columns have same length).
for i := 0; i < result.ResultCount; i++ {
// Get text value.
text, err := textCol.GetAsString(i)
if err != nil {
fmt.Printf("Failed to parse text at index %d: %v\n", i, err)
continue
}
// Get metadata value (optional, as metadata column may be missing).
var metadata map[string]interface{}
if metaCol != nil {
metaStr, err := metaCol.GetAsString(i)
if err == nil && metaStr != "" {
if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
fmt.Printf("Failed to parse metadata at index %d: %v\n", i, err)
continue
}
} else if err != nil {
fmt.Printf("Failed to get metadata string at index %d: %v\n", i, err)
}
}
// Ensure metadata includes user_id and username from query
if metadata == nil {
metadata = make(map[string]interface{})
}
metadata[util.UserIdKey] = userId
metadata[util.UserNameKey] = userName
// Create document.
doc := ai.DocumentFromText(text, metadata)
docs = append(docs, doc)
}
}
return &ai.RetrieverResponse{
Documents: docs,
}, nil
}
......@@ -32,6 +32,10 @@ type QAStore interface {
GetLatestQA(ctx context.Context, fromID *string) ([]QA, error)
// WriteQA 插入或更新 qa 表记录
WriteQA(ctx context.Context, qa QA) (int64, error)
// WriteAndGetLatestQA 先查询最新记录,再写入 QA 记录,返回写入的 ID、第一条查询结果、是否查询到结果和错误
WriteAndGetLatestQA(ctx context.Context, qa QA) (int64, *QA, bool, error)
// UpdateQAFields 根据 idx 更新 qa 表中指定记录的 summary 和 answer 字段
UpdateQAFields(ctx context.Context, idx int64, summary, answer string) error
}
// qaStore 是 QAStore 接口的实现
......@@ -166,6 +170,54 @@ func (s *qaStore) WriteQA(ctx context.Context, qa QA) (int64, error) {
return newID, nil
}
func (s *qaStore) WriteAndGetLatestQA(ctx context.Context, qa QA) (int64, *QA, bool, error) {
// 先查询最新记录
results, err := s.GetLatestQA(ctx, qa.FromID)
if err != nil {
return 0, nil, false, fmt.Errorf("get latest qa: %w", err)
}
// 检查查询结果是否为空
var latestQA *QA
hasResult := len(results) > 0
if hasResult {
latestQA = &results[0] // 取第一条记录
} else {
latestQA = nil // 空结果
}
// 写入记录
id, err := s.WriteQA(ctx, qa)
if err != nil {
return 0, latestQA, hasResult, fmt.Errorf("write qa: %w", err)
}
// 返回写入的 ID、第一条查询结果和 hasResult
return id, latestQA, hasResult, nil
}
// UpdateQAFields 根据 idx 更新 qa 表中指定记录的 summary 和 answer 字段
func (s *qaStore) UpdateQAFields(ctx context.Context, idx int64, summary, answer string) error {
query := `
UPDATE qa
SET summary = $1, answer = $2
WHERE id = $3`
result, err := s.db.ExecContext(ctx, query, summary, answer, idx)
if err != nil {
return fmt.Errorf("update qa fields: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("no record found with id %d", idx)
}
return nil
}
// 辅助函数:处理指针类型的空值
func stringPtr(s string) *string {
return &s
......@@ -177,3 +229,624 @@ func derefString(p *string) interface{} {
}
return *p
}
// package main
// import (
// "context"
// "database/sql"
// "flag"
// "fmt"
// "time"
// _ "github.com/lib/pq"
// )
// var (
// connString = flag.String("dbconn", "", "database connection string")
// )
// type QA struct {
// ID int64 // 主键
// CreatedAt time.Time // 创建时间
// FromID *string // 可空的 from_id
// From *string // 可空的 from
// Question *string // 可空的问题
// Answer *string // 可空的答案
// Summary *string // 可空的摘要
// To *string // 可空的 to
// ToID *string // 可空的 to_id
// }
// // QAStore 定义 DAO 接口
// type QAStore interface {
// // GetLatestQA 从 qa_latest_from_id 视图读取指定 from_id 的最新记录
// GetLatestQA(ctx context.Context, fromID *string) ([]QA, error)
// // WriteQA 插入或更新 qa 表记录
// WriteQA(ctx context.Context, qa QA) (int64, error)
// // WriteAndGetLatestQA 先查询最新记录,再写入 QA 记录,返回写入的 ID、第一条查询结果、是否查询到结果和错误
// WriteAndGetLatestQA(ctx context.Context, qa QA) (int64, *QA, bool, error)
// // UpdateSummary 根据 idx 更新 qa 表中指定记录的 summary 字段
// UpdateSummary(ctx context.Context, idx int64, summary string) error
// }
// // qaStore 是 QAStore 接口的实现
// type qaStore struct {
// db *sql.DB
// }
// // NewQAStore 创建新的 QAStore 实例
// func NewQAStore(db *sql.DB) QAStore {
// return &qaStore{db: db}
// }
// // 初始化数据库连接并返回 QAStore
// func InitQAStore() (QAStore, error) {
// // Supabase 提供的连接字符串
// connString := "postgresql://postgres.awcfgdodiuqnlsobcivq:P99IU9NEoDRPsBfb@aws-0-ap-southeast-1.pooler.supabase.com:5432/postgres"
// // 打开数据库连接
// db, err := sql.Open("postgres", connString)
// if err != nil {
// return nil, fmt.Errorf("open database: %w", err)
// }
// // 测试数据库连接
// if err := db.Ping(); err != nil {
// db.Close()
// return nil, fmt.Errorf("ping database: %w", err)
// }
// // 返回 QAStore 实例
// return NewQAStore(db), nil
// }
// func (s *qaStore) GetLatestQA(ctx context.Context, fromID *string) ([]QA, error) {
// query := `
// SELECT id, created_at, question, answer, summary, "from", "to", from_id, to_id
// FROM qa_latest_from_id
// WHERE from_id = $1 OR (from_id IS NULL AND $1 IS NULL)`
// args := []interface{}{fromID}
// if fromID == nil {
// args = []interface{}{nil}
// }
// rows, err := s.db.QueryContext(ctx, query, args...)
// if err != nil {
// return nil, fmt.Errorf("query qa_latest_from_id: %w", err)
// }
// defer rows.Close()
// var results []QA
// for rows.Next() {
// var qa QA
// var question, answer, summary, from, to, fromIDVal, toIDVal sql.NullString
// if err := rows.Scan(&qa.ID, &qa.CreatedAt, &question, &answer, &summary, &from, &to, &fromIDVal, &toIDVal); err != nil {
// return nil, fmt.Errorf("scan row: %w", err)
// }
// if question.Valid {
// qa.Question = &question.String
// }
// if answer.Valid {
// qa.Answer = &answer.String
// }
// if summary.Valid {
// qa.Summary = &summary.String
// }
// if from.Valid {
// qa.From = &from.String
// }
// if to.Valid {
// qa.To = &to.String
// }
// if fromIDVal.Valid {
// qa.FromID = &fromIDVal.String
// }
// if toIDVal.Valid {
// qa.ToID = &toIDVal.String
// }
// results = append(results, qa)
// }
// if err := rows.Err(); err != nil {
// return nil, fmt.Errorf("row iteration: %w", err)
// }
// return results, nil
// }
// func (s *qaStore) WriteQA(ctx context.Context, qa QA) (int64, error) {
// if qa.ID != 0 {
// // 更新记录
// query := `
// UPDATE qa
// SET question = $1, answer = $2, summary = $3, "from" = $4, "to" = $5, from_id = $6, to_id = $7
// WHERE id = $8
// RETURNING id`
// var updatedID int64
// err := s.db.QueryRowContext(ctx, query,
// derefString(qa.Question),
// derefString(qa.Answer),
// derefString(qa.Summary),
// derefString(qa.From),
// derefString(qa.To),
// derefString(qa.FromID),
// derefString(qa.ToID),
// qa.ID,
// ).Scan(&updatedID)
// if err == sql.ErrNoRows {
// return 0, fmt.Errorf("no record found with id %d", qa.ID)
// }
// if err != nil {
// return 0, fmt.Errorf("update qa: %w", err)
// }
// return updatedID, nil
// }
// // 插入新记录
// query := `
// INSERT INTO qa (question, answer, summary, "from", "to", from_id, to_id)
// VALUES ($1, $2, $3, $4, $5, $6, $7)
// RETURNING id`
// var newID int64
// err := s.db.QueryRowContext(ctx, query,
// derefString(qa.Question),
// derefString(qa.Answer),
// derefString(qa.Summary),
// derefString(qa.From),
// derefString(qa.To),
// derefString(qa.FromID),
// derefString(qa.ToID),
// ).Scan(&newID)
// if err != nil {
// return 0, fmt.Errorf("insert qa: %w", err)
// }
// return newID, nil
// }
// func (s *qaStore) WriteAndGetLatestQA(ctx context.Context, qa QA) (int64, *QA, bool, error) {
// // 先查询最新记录
// results, err := s.GetLatestQA(ctx, qa.FromID)
// if err != nil {
// return 0, nil, false, fmt.Errorf("get latest qa: %w", err)
// }
// // 检查查询结果是否为空
// var latestQA *QA
// hasResult := len(results) > 0
// if hasResult {
// latestQA = &results[0] // 取第一条记录
// } else {
// latestQA = nil // 空结果
// }
// // 写入记录
// id, err := s.WriteQA(ctx, qa)
// if err != nil {
// return 0, latestQA, hasResult, fmt.Errorf("write qa: %w", err)
// }
// // 返回写入的 ID、第一条查询结果和 hasResult
// return id, latestQA, hasResult, nil
// }
// // UpdateSummary 根据 idx 更新 qa 表中指定记录的 summary 字段
// func (s *qaStore) UpdateSummary(ctx context.Context, idx int64, summary string) error {
// query := `
// UPDATE qa
// SET summary = $1
// WHERE id = $2`
// result, err := s.db.ExecContext(ctx, query, summary, idx)
// if err != nil {
// return fmt.Errorf("update summary: %w", err)
// }
// rowsAffected, err := result.RowsAffected()
// if err != nil {
// return fmt.Errorf("check rows affected: %w", err)
// }
// if rowsAffected == 0 {
// return fmt.Errorf("no record found with id %d", idx)
// }
// return nil
// }
// // 辅助函数:处理指针类型的空值
// func stringPtr(s string) *string {
// return &s
// }
// func derefString(p *string) interface{} {
// if p == nil {
// return nil
// }
// return *p
// }
// package main
// import (
// "context"
// "database/sql"
// "flag"
// "fmt"
// "time"
// _ "github.com/lib/pq"
// )
// var (
// connString = flag.String("dbconn", "", "database connection string")
// )
// type QA struct {
// ID int64 // 主键
// CreatedAt time.Time // 创建时间
// FromID *string // 可空的 from_id
// From *string // 可空的 from
// Question *string // 可空的问题
// Answer *string // 可空的答案
// Summary *string // 可空的摘要
// To *string // 可空的 to
// ToID *string // 可空的 to_id
// }
// // QAStore 定义 DAO 接口
// type QAStore interface {
// // GetLatestQA 从 qa_latest_from_id 视图读取指定 from_id 的最新记录
// GetLatestQA(ctx context.Context, fromID *string) ([]QA, error)
// // WriteQA 插入或更新 qa 表记录
// WriteQA(ctx context.Context, qa QA) (int64, error)
// // WriteAndGetLatestQA 先查询最新记录,再写入 QA 记录,返回写入的 ID、第一条查询结果、是否查询到结果和错误
// WriteAndGetLatestQA(ctx context.Context, qa QA) (int64, *QA, bool, error)
// }
// // qaStore 是 QAStore 接口的实现
// type qaStore struct {
// db *sql.DB
// }
// // NewQAStore 创建新的 QAStore 实例
// func NewQAStore(db *sql.DB) QAStore {
// return &qaStore{db: db}
// }
// // 初始化数据库连接并返回 QAStore
// func InitQAStore() (QAStore, error) {
// // Supabase 提供的连接字符串
// connString := "postgresql://postgres.awcfgdodiuqnlsobcivq:P99IU9NEoDRPsBfb@aws-0-ap-southeast-1.pooler.supabase.com:5432/postgres"
// // 打开数据库连接
// db, err := sql.Open("postgres", connString)
// if err != nil {
// return nil, fmt.Errorf("open database: %w", err)
// }
// // 测试数据库连接
// if err := db.Ping(); err != nil {
// db.Close()
// return nil, fmt.Errorf("ping database: %w", err)
// }
// // 返回 QAStore 实例
// return NewQAStore(db), nil
// }
// func (s *qaStore) GetLatestQA(ctx context.Context, fromID *string) ([]QA, error) {
// query := `
// SELECT id, created_at, question, answer, summary, "from", "to", from_id, to_id
// FROM qa_latest_from_id
// WHERE from_id = $1 OR (from_id IS NULL AND $1 IS NULL)`
// args := []interface{}{fromID}
// if fromID == nil {
// args = []interface{}{nil}
// }
// rows, err := s.db.QueryContext(ctx, query, args...)
// if err != nil {
// return nil, fmt.Errorf("query qa_latest_from_id: %w", err)
// }
// defer rows.Close()
// var results []QA
// for rows.Next() {
// var qa QA
// var question, answer, summary, from, to, fromIDVal, toIDVal sql.NullString
// if err := rows.Scan(&qa.ID, &qa.CreatedAt, &question, &answer, &summary, &from, &to, &fromIDVal, &toIDVal); err != nil {
// return nil, fmt.Errorf("scan row: %w", err)
// }
// if question.Valid {
// qa.Question = &question.String
// }
// if answer.Valid {
// qa.Answer = &answer.String
// }
// if summary.Valid {
// qa.Summary = &summary.String
// }
// if from.Valid {
// qa.From = &from.String
// }
// if to.Valid {
// qa.To = &to.String
// }
// if fromIDVal.Valid {
// qa.FromID = &fromIDVal.String
// }
// if toIDVal.Valid {
// qa.ToID = &toIDVal.String
// }
// results = append(results, qa)
// }
// if err := rows.Err(); err != nil {
// return nil, fmt.Errorf("row iteration: %w", err)
// }
// return results, nil
// }
// func (s *qaStore) WriteQA(ctx context.Context, qa QA) (int64, error) {
// if qa.ID != 0 {
// // 更新记录
// query := `
// UPDATE qa
// SET question = $1, answer = $2, summary = $3, "from" = $4, "to" = $5, from_id = $6, to_id = $7
// WHERE id = $8
// RETURNING id`
// var updatedID int64
// err := s.db.QueryRowContext(ctx, query,
// derefString(qa.Question),
// derefString(qa.Answer),
// derefString(qa.Summary),
// derefString(qa.From),
// derefString(qa.To),
// derefString(qa.FromID),
// derefString(qa.ToID),
// qa.ID,
// ).Scan(&updatedID)
// if err == sql.ErrNoRows {
// return 0, fmt.Errorf("no record found with id %d", qa.ID)
// }
// if err != nil {
// return 0, fmt.Errorf("update qa: %w", err)
// }
// return updatedID, nil
// }
// // 插入新记录
// query := `
// INSERT INTO qa (question, answer, summary, "from", "to", from_id, to_id)
// VALUES ($1, $2, $3, $4, $5, $6, $7)
// RETURNING id`
// var newID int64
// err := s.db.QueryRowContext(ctx, query,
// derefString(qa.Question),
// derefString(qa.Answer),
// derefString(qa.Summary),
// derefString(qa.From),
// derefString(qa.To),
// derefString(qa.FromID),
// derefString(qa.ToID),
// ).Scan(&newID)
// if err != nil {
// return 0, fmt.Errorf("insert qa: %w", err)
// }
// return newID, nil
// }
// // WriteAndGetLatestQA 先查询最新记录,再写入 QA 记录,返回写入的 ID、第一条查询结果、是否查询到结果和错误
// func (s *qaStore) WriteAndGetLatestQA(ctx context.Context, qa QA) (int64, *QA, bool, error) {
// // 先查询最新记录
// results, err := s.GetLatestQA(ctx, qa.FromID)
// if err != nil {
// return 0, nil, false, fmt.Errorf("get latest qa: %w", err)
// }
// // 检查查询结果是否为空
// var latestQA *QA
// hasResult := len(results) > 0
// if hasResult {
// latestQA = &results[0] // 取第一条记录
// } else {
// latestQA = nil // 空结果
// }
// // 写入记录
// id, err := s.WriteQA(ctx, qa)
// if err != nil {
// return 0, latestQA, hasResult, fmt.Errorf("write qa: %w", err)
// }
// // 返回写入的 ID、第一条查询结果和 hasResult
// return id, latestQA, hasResult, nil
// }
// // 辅助函数:处理指针类型的空值
// func stringPtr(s string) *string {
// return &s
// }
// func derefString(p *string) interface{} {
// if p == nil {
// return nil
// }
// return *p
// }
// package main
// import (
// "context"
// "database/sql"
// "flag"
// "fmt"
// "time"
// _ "github.com/lib/pq"
// )
// var (
// connString = flag.String("dbconn", "", "database connection string")
// )
// type QA struct {
// ID int64 // 主键
// CreatedAt time.Time // 创建时间
// FromID *string // 可空的 from_id
// From *string // 可空的 from
// Question *string // 可空的问题
// Answer *string // 可空的答案
// Summary *string // 可空的摘要
// To *string // 可空的 to
// ToID *string // 可空的 to_id
// }
// // QAStore 定义 DAO 接口
// type QAStore interface {
// // GetLatestQA 从 qa_latest_from_id 视图读取指定 from_id 的最新记录
// GetLatestQA(ctx context.Context, fromID *string) ([]QA, error)
// // WriteQA 插入或更新 qa 表记录
// WriteQA(ctx context.Context, qa QA) (int64, error)
// }
// // qaStore 是 QAStore 接口的实现
// type qaStore struct {
// db *sql.DB
// }
// // NewQAStore 创建新的 QAStore 实例
// func NewQAStore(db *sql.DB) QAStore {
// return &qaStore{db: db}
// }
// // 初始化数据库连接并返回 QAStore
// func InitQAStore() (QAStore, error) {
// // Supabase 提供的连接字符串
// connString := "postgresql://postgres.awcfgdodiuqnlsobcivq:P99IU9NEoDRPsBfb@aws-0-ap-southeast-1.pooler.supabase.com:5432/postgres"
// // 打开数据库连接
// db, err := sql.Open("postgres", connString)
// if err != nil {
// return nil, fmt.Errorf("open database: %w", err)
// }
// // 测试数据库连接
// if err := db.Ping(); err != nil {
// db.Close()
// return nil, fmt.Errorf("ping database: %w", err)
// }
// // 返回 QAStore 实例
// return NewQAStore(db), nil
// }
// func (s *qaStore) GetLatestQA(ctx context.Context, fromID *string) ([]QA, error) {
// query := `
// SELECT id, created_at, question, answer, summary, "from", "to", from_id, to_id
// FROM qa_latest_from_id
// WHERE from_id = $1 OR (from_id IS NULL AND $1 IS NULL)`
// args := []interface{}{fromID}
// if fromID == nil {
// args = []interface{}{nil}
// }
// rows, err := s.db.QueryContext(ctx, query, args...)
// if err != nil {
// return nil, fmt.Errorf("query qa_latest_from_id: %w", err)
// }
// defer rows.Close()
// var results []QA
// for rows.Next() {
// var qa QA
// var question, answer, summary, from, to, fromIDVal, toIDVal sql.NullString
// if err := rows.Scan(&qa.ID, &qa.CreatedAt, &question, &answer, &summary, &from, &to, &fromIDVal, &toIDVal); err != nil {
// return nil, fmt.Errorf("scan row: %w", err)
// }
// if question.Valid {
// qa.Question = &question.String
// }
// if answer.Valid {
// qa.Answer = &answer.String
// }
// if summary.Valid {
// qa.Summary = &summary.String
// }
// if from.Valid {
// qa.From = &from.String
// }
// if to.Valid {
// qa.To = &to.String
// }
// if fromIDVal.Valid {
// qa.FromID = &fromIDVal.String
// }
// if toIDVal.Valid {
// qa.ToID = &toIDVal.String
// }
// results = append(results, qa)
// }
// if err := rows.Err(); err != nil {
// return nil, fmt.Errorf("row iteration: %w", err)
// }
// return results, nil
// }
// func (s *qaStore) WriteQA(ctx context.Context, qa QA) (int64, error) {
// if qa.ID != 0 {
// // 更新记录
// query := `
// UPDATE qa
// SET question = $1, answer = $2, summary = $3, "from" = $4, "to" = $5, from_id = $6, to_id = $7
// WHERE id = $8
// RETURNING id`
// var updatedID int64
// err := s.db.QueryRowContext(ctx, query,
// derefString(qa.Question),
// derefString(qa.Answer),
// derefString(qa.Summary),
// derefString(qa.From),
// derefString(qa.To),
// derefString(qa.FromID),
// derefString(qa.ToID),
// qa.ID,
// ).Scan(&updatedID)
// if err == sql.ErrNoRows {
// return 0, fmt.Errorf("no record found with id %d", qa.ID)
// }
// if err != nil {
// return 0, fmt.Errorf("update qa: %w", err)
// }
// return updatedID, nil
// }
// // 插入新记录
// query := `
// INSERT INTO qa (question, answer, summary, "from", "to", from_id, to_id)
// VALUES ($1, $2, $3, $4, $5, $6, $7)
// RETURNING id`
// var newID int64
// err := s.db.QueryRowContext(ctx, query,
// derefString(qa.Question),
// derefString(qa.Answer),
// derefString(qa.Summary),
// derefString(qa.From),
// derefString(qa.To),
// derefString(qa.FromID),
// derefString(qa.ToID),
// ).Scan(&newID)
// if err != nil {
// return 0, fmt.Errorf("insert qa: %w", err)
// }
// return newID, nil
// }
// // 辅助函数:处理指针类型的空值
// func stringPtr(s string) *string {
// return &s
// }
// func derefString(p *string) interface{} {
// if p == nil {
// return nil
// }
// return *p
// }
package util
const UserNameKey = "username"
const UserIdKey = "user_id"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment