Commit e410aba4 authored by Wade's avatar Wade

add user field

parent 654f62a4
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"strings"
"sync" "sync"
"github.com/firebase/genkit/go/ai" "github.com/firebase/genkit/go/ai"
...@@ -387,216 +388,219 @@ type docStore struct { ...@@ -387,216 +388,219 @@ type docStore struct {
// package graphrag package graphrag
// import ( import (
// "context" "context"
// "fmt" "fmt"
// "github.com/milvus-io/milvus-sdk-go/v2/entity" "github.com/milvus-io/milvus-sdk-go/v2/entity"
// "github.com/pkg/errors" "github.com/pkg/errors"
// ) )
// // newDocStore creates a docStore. // newDocStore creates a docStore.
// func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docStore, error) { func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docStore, error) {
// if m.client == nil { if m.client == nil {
// return nil, errors.New("milvus.Init not called") return nil, errors.New("milvus.Init not called")
// } }
// // Check/create collection. // Check/create collection.
// exists, err := m.client.HasCollection(ctx, cfg.Collection) exists, err := m.client.HasCollection(ctx, cfg.Collection)
// if err != nil { if err != nil {
// return nil, fmt.Errorf("failed to check collection %q: %v", cfg.Collection, err) return nil, fmt.Errorf("failed to check collection %q: %v", cfg.Collection, err)
// } }
// if !exists { if !exists {
// // Define schema with textField as primary key, plus user_id and username fields. // Define schema with textField as primary key, plus user_id and username fields.
// schema := &entity.Schema{ schema := &entity.Schema{
// CollectionName: cfg.Collection, CollectionName: cfg.Collection,
// Fields: []*entity.Field{ Fields: []*entity.Field{
// { {
// Name: vectorField, Name: vectorField,
// DataType: entity.FieldTypeFloatVector, DataType: entity.FieldTypeFloatVector,
// TypeParams: map[string]string{ TypeParams: map[string]string{
// "dim": fmt.Sprintf("%d", cfg.Dimension), "dim": fmt.Sprintf("%d", cfg.Dimension),
// }, },
// }, },
// { {
// Name: textField, Name: textField,
// DataType: entity.FieldTypeVarChar, DataType: entity.FieldTypeVarChar,
// PrimaryKey: true, // Enforce unique constraint on text field PrimaryKey: true, // Enforce unique constraint on text field
// TypeParams: map[string]string{ TypeParams: map[string]string{
// "max_length": "65535", // Maximum length for VARCHAR "max_length": "65535", // Maximum length for VARCHAR
// }, },
// }, },
// { {
// Name: metadataField, Name: metadataField,
// DataType: entity.FieldTypeJSON, DataType: entity.FieldTypeJSON,
// }, },
// { {
// Name: "user_id", Name: "user_id",
// DataType: entity.FieldTypeVarChar, DataType: entity.FieldTypeVarChar,
// TypeParams: map[string]string{ TypeParams: map[string]string{
// "max_length": "128", // Reasonable length for user_id "max_length": "128", // Reasonable length for user_id
// }, },
// }, },
// { {
// Name: "username", Name: "username",
// DataType: entity.FieldTypeVarChar, DataType: entity.FieldTypeVarChar,
// TypeParams: map[string]string{ TypeParams: map[string]string{
// "max_length": "128", // Reasonable length for username "max_length": "128", // Reasonable length for username
// }, },
// }, },
// }, },
// } }
err = m.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)
if err != nil {
return nil, fmt.Errorf("failed to create collection %q: %v", cfg.Collection, err)
}
// err = m.client.CreateCollection(ctx, schema, entity.DefaultShardNumber) // Create HNSW index for vectorField.
// if err != nil { index, err := entity.NewIndexHNSW(
// return nil, fmt.Errorf("failed to create collection %q: %v", cfg.Collection, err) entity.L2,
// } 8, // M
96, // efConstruction
)
if err != nil {
return nil, fmt.Errorf("entity.NewIndexHNSW: %v", err)
}
// // Create HNSW index for vectorField. err = m.client.CreateIndex(ctx, cfg.Collection, vectorField, index, false)
// index, err := entity.NewIndexHNSW( if err != nil {
// entity.L2, return nil, fmt.Errorf("failed to create index: %v", err)
// 8, // M }
// 96, // efConstruction }
// )
// if err != nil {
// return nil, fmt.Errorf("entity.NewIndexHNSW: %v", err)
// }
// err = m.client.CreateIndex(ctx, cfg.Collection, vectorField, index, false) // Load collection.
// if err != nil { err = m.client.LoadCollection(ctx, cfg.Collection, false)
// return nil, fmt.Errorf("failed to create index: %v", err) if err != nil {
// } return nil, fmt.Errorf("failed to load collection %q: %v", cfg.Collection, err)
// } }
// // Load collection. // Convert EmbedderOptions to map[string]interface{}.
// err = m.client.LoadCollection(ctx, cfg.Collection, false) var embedderOptions map[string]interface{}
// if err != nil { if cfg.EmbedderOptions != nil {
// return nil, fmt.Errorf("failed to load collection %q: %v", cfg.Collection, err) opts, ok := cfg.EmbedderOptions.(map[string]interface{})
// } if !ok {
return nil, fmt.Errorf("EmbedderOptions must be a map[string]interface{}, got %T", cfg.EmbedderOptions)
}
embedderOptions = opts
} else {
embedderOptions = make(map[string]interface{})
}
// // Convert EmbedderOptions to map[string]interface{}. return &docStore{
// var embedderOptions map[string]interface{} client: m.client,
// if cfg.EmbedderOptions != nil { collection: cfg.Collection,
// opts, ok := cfg.EmbedderOptions.(map[string]interface{}) dimension: cfg.Dimension,
// if !ok { embedder: cfg.Embedder,
// return nil, fmt.Errorf("EmbedderOptions must be a map[string]interface{}, got %T", cfg.EmbedderOptions) embedderOptions: embedderOptions,
// } }, nil
// embedderOptions = opts }
// } else {
// embedderOptions = make(map[string]interface{})
// }
// return &docStore{
// client: m.client,
// collection: cfg.Collection,
// dimension: cfg.Dimension,
// embedder: cfg.Embedder,
// embedderOptions: embedderOptions,
// }, nil
// }
// Indexer returns the indexer for a collection.
func Indexer(g *genkit.Genkit, collection string) ai.Indexer {
return genkit.LookupIndexer(g, provider, collection)
}
// // Indexer returns the indexer for a collection. // Retriever returns the retriever for a collection.
// func Indexer(g *genkit.Genkit, collection string) ai.Indexer { func Retriever(g *genkit.Genkit, collection string) ai.Retriever {
// return genkit.LookupIndexer(g, provider, collection) return genkit.LookupRetriever(g, provider, collection)
// } }
// // Retriever returns the retriever for a collection. /*
// func Retriever(g *genkit.Genkit, collection string) ai.Retriever { 更新 删除 很少用到;
// return genkit.LookupRetriever(g, provider, collection) */
// }
// /* // Index implements the Indexer.Index method.
// 更新 删除 很少用到; func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
// */ if len(req.Documents) == 0 {
return nil
}
// // Index implements the Indexer.Index method.
// func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
// if len(req.Documents) == 0 {
// return nil
// }
// // Embed documents.
// ereq := &ai.EmbedRequest{
// Input: req.Documents,
// Options: ds.embedderOptions,
// }
// eres, err := ds.embedder.Embed(ctx, ereq)
// if err != nil {
// return fmt.Errorf("milvus index embedding failed: %w", err)
// }
// // Validate embedding count matches document count. // Embed documents.
// if len(eres.Embeddings) != len(req.Documents) { ereq := &ai.EmbedRequest{
// return fmt.Errorf("mismatch: got %d embeddings for %d documents", len(eres.Embeddings), len(req.Documents)) Input: req.Documents,
// } Options: ds.embedderOptions,
}
eres, err := ds.embedder.Embed(ctx, ereq)
if err != nil {
return fmt.Errorf("milvus index embedding failed: %w", err)
}
// // Prepare row-based data. // Validate embedding count matches document count.
// var rows []interface{} if len(eres.Embeddings) != len(req.Documents) {
// for i, emb := range eres.Embeddings { return fmt.Errorf("mismatch: got %d embeddings for %d documents", len(eres.Embeddings), len(req.Documents))
// doc := req.Documents[i] }
// if doc.Metadata == nil { // Prepare row-based data.
// // If ok, we don't use the User struct since the requirement is to error on non-nil var rows []interface{}
// return nil, fmt.Errorf("req.Query.Metadata must be not nil, got type %T", req.Options) for i, emb := range eres.Embeddings {
// } doc := req.Documents[i]
// // Extract username and user_id from req.Query.Metadata if doc.Metadata == nil {
// userName, ok := doc.Metadata[util.UserNameKey].(string) // If ok, we don't use the User struct since the requirement is to error on non-nil
// if !ok { return nil, fmt.Errorf("req.Query.Metadata must be not nil, got type %T", req.Options)
// return nil, fmt.Errorf("req.Query.Metadata must provide username key") }
// }
// userId, ok := doc.Metadata[util.UserIdKey].(string)
// if !ok {
// return nil, fmt.Errorf("req.Query.Metadata must provide user_id key")
// }
// Extract username and user_id from req.Query.Metadata
userName, ok := doc.Metadata[util.UserNameKey].(string)
if !ok {
return nil, fmt.Errorf("req.Query.Metadata must provide username key")
}
userId, ok := doc.Metadata[util.UserIdKey].(string)
if !ok {
return nil, fmt.Errorf("req.Query.Metadata must provide user_id key")
}
// var sb strings.Builder
// for _, p := range doc.Content {
// if p.IsText() {
// sb.WriteString(p.Text)
// }
// }
// text := sb.String()
// metadata := doc.Metadata
// if metadata == nil {
// metadata = make(map[string]interface{})
// }
// // Create row with explicit metadata field. var sb strings.Builder
// row := make(map[string]interface{}) for _, p := range doc.Content {
// row["vector"] = emb.Embedding // []float32 if p.IsText() {
// row["text"] = text sb.WriteString(p.Text)
// row["user_id"] = userId }
// row["username"] = userName }
// row["metadata"] = metadata // Explicitly set metadata as JSON-compatible map text := sb.String()
// rows = append(rows, row) metadata := doc.Metadata
if metadata == nil {
metadata = make(map[string]interface{})
}
// // Debug: Log row contents. // Create row with explicit metadata field.
// fmt.Printf("Row %d: vector_len=%d, text=%q,userId=%s,username=%s,metadata=%v\n", i, len(emb.Embedding), text,userId,userName metadata) row := make(map[string]interface{})
// } row["vector"] = emb.Embedding // []float32
row["text"] = text
row["user_id"] = userId
row["username"] = userName
row["metadata"] = metadata // Explicitly set metadata as JSON-compatible map
rows = append(rows, row)
// // Debug: Log total rows. // Debug: Log row contents.
// fmt.Printf("Inserting %d rows into collection %q\n", len(rows), ds.collection) fmt.Printf("Row %d: vector_len=%d, text=%q,userId=%s,username=%s,metadata=%v\n", i, len(emb.Embedding), text,userId,userName metadata)
}
// // Insert rows into Milvus. // Debug: Log total rows.
// _, err = ds.client.InsertRows(ctx, ds.collection, "", rows) fmt.Printf("Inserting %d rows into collection %q\n", len(rows), ds.collection)
// if err != nil {
// return fmt.Errorf("milvus insert rows failed: %w", err)
// }
// return nil // Insert rows into Milvus.
// } _, err = ds.client.InsertRows(ctx, ds.collection, "", rows)
if err != nil {
return fmt.Errorf("milvus insert rows failed: %w", err)
}
// // RetrieverOptions for Milvus retrieval. return nil
// type RetrieverOptions struct { }
// Count int `json:"count,omitempty"` // Max documents to retrieve.
// MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
// } // RetrieverOptions for Milvus retrieval.
type RetrieverOptions struct {
Count int `json:"count,omitempty"` // Max documents to retrieve.
MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
}
// // Retrieve implements the Retriever.Retrieve method. // // Retrieve implements the Retriever.Retrieve method.
// func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) { // func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
...@@ -740,13 +744,6 @@ type docStore struct { ...@@ -740,13 +744,6 @@ type docStore struct {
// RetrieverOptions for Milvus retrieval.
type RetrieverOptions struct {
Count int `json:"count,omitempty"` // Max documents to retrieve.
MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
}
// Retrieve implements the Retriever.Retrieve method. // Retrieve implements the Retriever.Retrieve method.
func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) { func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
...@@ -876,8 +873,8 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai ...@@ -876,8 +873,8 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
if metadata == nil { if metadata == nil {
metadata = make(map[string]interface{}) metadata = make(map[string]interface{})
} }
metadata["user_id"] = userId metadata[util.UserIdKey] = userId
metadata["username"] = userName metadata[util.UserNameKey] = userName
// Create document. // Create document.
doc := ai.DocumentFromText(text, metadata) doc := ai.DocumentFromText(text, metadata)
...@@ -889,4 +886,3 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai ...@@ -889,4 +886,3 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
Documents: docs, Documents: docs,
}, nil }, nil
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment