Commit a99c5c92 authored by Wade's avatar Wade

fix milvus search

parent c535c829
......@@ -93,7 +93,7 @@ func main() {
// Configure collection
cfg := milvus.CollectionConfig{
Collection: "useridx",
Collection: "knowledage",
Dimension: 768, // Match mock embedder dimension
Embedder: embedder,
EmbedderOptions: map[string]interface{}{}, // Explicitly set as map
......@@ -168,6 +168,19 @@ func main() {
fmt.Println("input-------------------------------", string(inputAsJson))
dRequest := ai.DocumentFromText(input.Content, nil)
response, err := ai.Retrieve(ctx, retriever, ai.WithDocs(dRequest))
if err != nil {
return "", err
}
for _, d := range response.Documents {
fmt.Println("d.Content[0].Text",d.Content[0].Text)
}
return "",nil
resp, err := genkit.Generate(ctx, g,
ai.WithModel(m),
ai.WithPrompt(`Tell silly short jokes about apple`))
......
......@@ -174,6 +174,101 @@ type docStore struct {
embedderOptions map[string]interface{}
}
// // newDocStore creates a docStore.
// func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docStore, error) {
// if m.client == nil {
// return nil, errors.New("milvus.Init not called")
// }
// // Check/create collection.
// exists, err := m.client.HasCollection(ctx, cfg.Collection)
// if err != nil {
// return nil, fmt.Errorf("failed to check collection %q: %v", cfg.Collection, err)
// }
// if !exists {
// // Define schema.
// schema := &entity.Schema{
// CollectionName: cfg.Collection,
// Fields: []*entity.Field{
// {
// Name: idField,
// DataType: entity.FieldTypeInt64,
// PrimaryKey: true,
// AutoID: true,
// },
// {
// Name: vectorField,
// DataType: entity.FieldTypeFloatVector,
// TypeParams: map[string]string{
// "dim": fmt.Sprintf("%d", cfg.Dimension),
// },
// },
// {
// Name: textField,
// DataType: entity.FieldTypeVarChar,
// TypeParams: map[string]string{
// "max_length": "65535",
// },
// },
// {
// Name: metadataField,
// DataType: entity.FieldTypeJSON,
// },
// },
// }
// err = m.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)
// if err != nil {
// return nil, fmt.Errorf("failed to create collection %q: %v", cfg.Collection, err)
// }
// // Create HNSW index.
// index, err := entity.NewIndexHNSW(
// entity.L2,
// 8, // M
// 96, // efConstruction
// )
// if err != nil {
// return nil, fmt.Errorf("entity.NewIndexHNSW: %v", err)
// }
// err = m.client.CreateIndex(ctx, cfg.Collection, vectorField, index, false)
// if err != nil {
// return nil, fmt.Errorf("failed to create index: %v", err)
// }
// }
// // Load collection.
// err = m.client.LoadCollection(ctx, cfg.Collection, false)
// if err != nil {
// return nil, fmt.Errorf("failed to load collection %q: %v", cfg.Collection, err)
// }
// // Convert EmbedderOptions to map[string]interface{}.
// var embedderOptions map[string]interface{}
// if cfg.EmbedderOptions != nil {
// opts, ok := cfg.EmbedderOptions.(map[string]interface{})
// if !ok {
// return nil, fmt.Errorf("EmbedderOptions must be a map[string]interface{}, got %T", cfg.EmbedderOptions)
// }
// embedderOptions = opts
// } else {
// embedderOptions = make(map[string]interface{})
// }
// return &docStore{
// client: m.client,
// collection: cfg.Collection,
// dimension: cfg.Dimension,
// embedder: cfg.Embedder,
// embedderOptions: embedderOptions,
// }, nil
// }
// newDocStore creates a docStore.
func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docStore, error) {
if m.client == nil {
......@@ -186,16 +281,43 @@ func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docSt
return nil, fmt.Errorf("failed to check collection %q: %v", cfg.Collection, err)
}
if !exists {
// Define schema.
// Define schema with textField as primary key for unique constraint.
schema := &entity.Schema{
CollectionName: cfg.Collection,
Fields: []*entity.Field{
// {
// Name: idField, // Optional non-primary ID field
// DataType: entity.FieldTypeInt64,
// //AutoID: true,
// // No PrimaryKey or AutoID, as textField is the primary key
// },
{
Name: vectorField,
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": fmt.Sprintf("%d", cfg.Dimension),
},
},
{
Name: textField,
DataType: entity.FieldTypeVarChar,
PrimaryKey: true, // Enforce unique constraint on text field
TypeParams: map[string]string{
"max_length": "65535", // Maximum length for VARCHAR, adjust if needed
},
},
{
Name: idField,
DataType: entity.FieldTypeInt64,
PrimaryKey: true,
AutoID: true,
Name: metadataField,
DataType: entity.FieldTypeJSON,
},
},
}
// Alternative: Remove idField if not needed
/*
schema := &entity.Schema{
CollectionName: cfg.Collection,
Fields: []*entity.Field{
{
Name: vectorField,
DataType: entity.FieldTypeFloatVector,
......@@ -206,6 +328,7 @@ func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docSt
{
Name: textField,
DataType: entity.FieldTypeVarChar,
PrimaryKey: true, // Enforce unique constraint on text field
TypeParams: map[string]string{
"max_length": "65535",
},
......@@ -216,6 +339,7 @@ func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docSt
},
},
}
*/
err = m.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)
if err != nil {
......@@ -265,6 +389,15 @@ func (m *Milvus) newDocStore(ctx context.Context, cfg *CollectionConfig) (*docSt
}, nil
}
// Indexer returns the indexer for a collection.
func Indexer(g *genkit.Genkit, collection string) ai.Indexer {
return genkit.LookupIndexer(g, provider, collection)
......@@ -278,6 +411,7 @@ func Retriever(g *genkit.Genkit, collection string) ai.Retriever {
/*
更新 删除 很少用到;
*/
// Index implements the Indexer.Index method.
func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
if len(req.Documents) == 0 {
......@@ -339,310 +473,6 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
}
// // RetrieverOptions for Milvus retrieval.
// type RetrieverOptions struct {
// Count int `json:"count,omitempty"` // Max documents to retrieve.
// MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
// }
// // Retrieve implements the Retriever.Retrieve method.
// func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
// count := 3 // Default.
// metricTypeStr := "L2"
// if req.Options != nil {
// ropt, ok := req.Options.(*RetrieverOptions)
// if !ok {
// return nil, fmt.Errorf("milvus.Retrieve options have type %T, want %T", req.Options, &RetrieverOptions{})
// }
// if ropt.Count > 0 {
// count = ropt.Count
// }
// if ropt.MetricType != "" {
// metricTypeStr = ropt.MetricType
// }
// }
// // Map string metric type to entity.MetricType.
// var metricType entity.MetricType
// switch metricTypeStr {
// case "L2":
// metricType = entity.L2
// case "IP":
// metricType = entity.IP
// default:
// return nil, fmt.Errorf("unsupported metric type: %s", metricTypeStr)
// }
// // Embed query.
// ereq := &ai.EmbedRequest{
// Input: []*ai.Document{req.Query},
// Options: ds.embedderOptions,
// }
// eres, err := ds.embedder.Embed(ctx, ereq)
// if err != nil {
// return nil, fmt.Errorf("milvus retrieve embedding failed: %v", err)
// }
// if len(eres.Embeddings) == 0 {
// return nil, errors.New("no embeddings generated for query")
// }
// queryVector := entity.FloatVector(eres.Embeddings[0].Embedding)
// // Create search parameters.
// searchParams, err := entity.NewIndexHNSWSearchParam(64) // ef
// if err != nil {
// return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
// }
// // Perform vector search to get IDs.
// results, err := ds.client.Search(
// ctx,
// ds.collection,
// []string{}, // partitions
// "", // expr (TODO: add metadata filter if needed)
// []string{}, // Only need IDs for now, no output fields
// []entity.Vector{queryVector},
// vectorField,
// metricType,
// count,
// searchParams,
// )
// if err != nil {
// return nil, fmt.Errorf("milvus search failed: %v", err)
// }
// // Extract IDs from search results.
// var ids []int64
// for _, result := range results {
// for i := 0; i < result.ResultCount; i++ {
// id, err := result.IDs.GetAsInt64(i)
// if err != nil {
// continue
// }
// ids = append(ids, id)
// }
// }
// if len(ids) == 0 {
// return &ai.RetrieverResponse{
// Documents: []*ai.Document{},
// }, nil
// }
// // Construct filter expression for Query (e.g., "id IN [id1, id2, ...]").
// filterExpr := fmt.Sprintf("id IN [%s]", joinInt64s(ids, ","))
// // Perform query to get text and metadata.
// queryResults, err := ds.client.Query(
// ctx,
// ds.collection,
// []string{}, // partitions
// filterExpr, // filter by IDs
// []string{textField, metadataField}, // output fields
// client.WithConsistencyLevel(entity.ConsistencyBounded),
// client.WithLimit(count),
// )
// if err != nil {
// return nil, fmt.Errorf("milvus query failed: %v", err)
// }
// // Process query results.
// var docs []*ai.Document
// for _, result := range queryResults {
// textCol := result.GetColumn(textField)
// metaCol := result.GetColumn(metadataField)
// for i := 0; i < result.Len(); i++ {
// text, err := textCol.GetAsString(i)
// if err != nil {
// continue
// }
// // Assume metadata is stored as JSON or map.
// var metadata map[string]interface{}
// if metaCol != nil {
// // Handle metadata based on its type (e.g., JSON string or map).
// // Assuming metadata is a JSON string; adjust if it's stored differently.
// metaStr, err := metaCol.GetAsString(i)
// if err == nil && metaStr != "" {
// if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
// continue
// }
// }
// }
// doc := ai.DocumentFromText(text, metadata)
// docs = append(docs, doc)
// }
// }
// return &ai.RetrieverResponse{
// Documents: docs,
// }, nil
// }
// // joinInt64s converts a slice of int64 to a comma-separated string.
// func joinInt64s(ids []int64, sep string) string {
// if len(ids) == 0 {
// return ""
// }
// strs := make([]string, len(ids))
// for i, id := range ids {
// strs[i] = fmt.Sprintf("%d", id)
// }
// return strings.Join(strs, sep)
// }
// // RetrieverOptions for Milvus retrieval.
// type RetrieverOptions struct {
// Count int `json:"count,omitempty"` // Max documents to retrieve.
// MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
// }
// // Retrieve implements the Retriever.Retrieve method.
// func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
// count := 3 // Default.
// metricTypeStr := "L2"
// if req.Options != nil {
// ropt, ok := req.Options.(*RetrieverOptions)
// if !ok {
// return nil, fmt.Errorf("milvus.Retrieve options have type %T, want %T", req.Options, &RetrieverOptions{})
// }
// if ropt.Count > 0 {
// count = ropt.Count
// }
// if ropt.MetricType != "" {
// metricTypeStr = ropt.MetricType
// }
// }
// // Map string metric type to entity.MetricType.
// var metricType entity.MetricType
// switch metricTypeStr {
// case "L2":
// metricType = entity.L2
// case "IP":
// metricType = entity.IP
// default:
// return nil, fmt.Errorf("unsupported metric type: %s", metricTypeStr)
// }
// // Embed query.
// ereq := &ai.EmbedRequest{
// Input: []*ai.Document{req.Query},
// Options: ds.embedderOptions,
// }
// eres, err := ds.embedder.Embed(ctx, ereq)
// if err != nil {
// return nil, fmt.Errorf("milvus retrieve embedding failed: %v", err)
// }
// if len(eres.Embeddings) == 0 {
// return nil, errors.New("no embeddings generated for query")
// }
// queryVector := entity.FloatVector(eres.Embeddings[0].Embedding)
// // Create search parameters.
// searchParams, err := entity.NewIndexHNSWSearchParam(64) // ef
// if err != nil {
// return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
// }
// // Perform vector search to get IDs.
// results, err := ds.client.Search(
// ctx,
// ds.collection,
// []string{}, // partitions
// "", // expr (TODO: add metadata filter if needed)
// []string{}, // Only need IDs for now, no output fields
// []entity.Vector{queryVector},
// vectorField,
// metricType,
// count,
// searchParams,
// )
// if err != nil {
// return nil, fmt.Errorf("milvus search failed: %v", err)
// }
// // Extract IDs from search results.
// var ids []int64
// for _, result := range results {
// for i := 0; i < result.ResultCount; i++ {
// id, err := result.IDs.GetAsInt64(i)
// if err != nil {
// continue
// }
// ids = append(ids, id)
// }
// }
// if len(ids) == 0 {
// return &ai.RetrieverResponse{
// Documents: []*ai.Document{},
// }, nil
// }
// // Construct filter expression for Query (e.g., "id IN [id1, id2, ...]").
// filterExpr := fmt.Sprintf("id IN [%s]", joinInt64s(ids, ","))
// // Perform query to get text and metadata.
// queryResults, err := ds.client.Query(
// ctx,
// ds.collection,
// []string{}, // partitions
// filterExpr, // filter by IDs
// []string{textField, metadataField}, // output fields
// client.WithQueryConsistencyLevel(entity.ConsistencyBounded), // Corrected option
// client.WithLimit(count),
// )
// if err != nil {
// return nil, fmt.Errorf("milvus query failed: %v", err)
// }
// // Process query results.
// var docs []*ai.Document
// for _, result := range queryResults {
// textCol := result.GetColumn(textField)
// metaCol := result.GetColumn(metadataField)
// for i := 0; i < result.Len(); i++ {
// text, err := textCol.GetAsString(i)
// if err != nil {
// continue
// }
// // Assume metadata is stored as JSON or map.
// var metadata map[string]interface{}
// if metaCol != nil {
// // Handle metadata based on its type (e.g., JSON string or map).
// // Assuming metadata is a JSON string; adjust if stored differently.
// metaStr, err := metaCol.GetAsString(i)
// if err == nil && metaStr != "" {
// if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
// continue
// }
// }
// }
// doc := ai.DocumentFromText(text, metadata)
// docs = append(docs, doc)
// }
// }
// return &ai.RetrieverResponse{
// Documents: docs,
// }, nil
// }
// // joinInt64s converts a slice of int64 to a comma-separated string.
// func joinInt64s(ids []int64, sep string) string {
// if len(ids) == 0 {
// return ""
// }
// strs := make([]string, len(ids))
// for i, id := range ids {
// strs[i] = fmt.Sprintf("%d", id)
// }
// return strings.Join(strs, sep)
// }
......@@ -741,8 +571,8 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
// queryOptions := []client.SearchQueryOptionFunc{
// client.WithLimit(int64(count)),
// }
// // Add consistency level if supported by your SDK version.
// // If WithConsistencyLevel is not supported for Query, omit it or check SDK docs.
// // Note: Consistency level omitted due to undefined WithQueryConsistencyLevel.
// // If WithConsistencyLevel is supported for Query in your SDK, uncomment below:
// // queryOptions = append(queryOptions, client.WithConsistencyLevel(entity.ConsistencyBounded))
// queryResults, err := ds.client.Query(
......@@ -759,30 +589,52 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
// // Process query results.
// var docs []*ai.Document
// for _, result := range queryResults {
// textCol := result.GetColumn(textField)
// metaCol := result.GetColumn(metadataField)
// for i := 0; i < result.Len(); i++ {
// // Find text and metadata columns in query results.
// var textCol, metaCol entity.Column
// for _, col := range queryResults {
// if col.Name() == textField {
// textCol = col
// }
// if col.Name() == metadataField {
// metaCol = col
// }
// }
// // Ensure text column exists.
// if textCol == nil {
// return nil, fmt.Errorf("text column %s not found in query results", textField)
// }
// // Iterate over rows (assuming columns have same length).
// for i := 0; i < textCol.Len(); i++ {
// // Get text value.
// text, err := textCol.GetAsString(i)
// if err != nil {
// fmt.Printf("Failed to parse text at index %d: %v\n", i, err)
// continue
// }
// // Assume metadata is stored as JSON or map.
// // Get metadata value (optional, as metadata column may be missing).
// var metadata map[string]interface{}
// if metaCol != nil {
// // Handle metadata based on its type (e.g., JSON string or map).
// // Assuming metadata is a JSON string; adjust if stored differently.
// metaStr, err := metaCol.GetAsString(i)
// if err == nil && metaStr != "" {
// if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
// fmt.Printf("Failed to parse metadata at index %d: %v\n", i, err)
// continue
// }
// } else if err != nil {
// fmt.Printf("Failed to get metadata string at index %d: %v\n", i, err)
// }
// }
// // Print text and metadata in a format similar to insertion debug log.
// fmt.Printf("Row %d: text=%q, metadata=%v\n", i, text, metadata)
// // Create document.
// doc := ai.DocumentFromText(text, metadata)
// docs = append(docs, doc)
// }
// }
// return &ai.RetrieverResponse{
// Documents: docs,
......@@ -808,6 +660,7 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
// RetrieverOptions for Milvus retrieval.
type RetrieverOptions struct {
Count int `json:"count,omitempty"` // Max documents to retrieve.
......@@ -862,14 +715,13 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
}
// Perform vector search to get IDs.
// Perform vector search to get IDs, text, and metadata.
results, err := ds.client.Search(
ctx,
ds.collection,
//ds.compostore.collection,
[]string{}, // partitions
"", // expr (TODO: add metadata filter if needed)
[]string{}, // Only need IDs for now, no output fields
[]string{textField, metadataField}, // Output fields: text and metadata
[]entity.Vector{queryVector},
vectorField,
metricType,
......@@ -880,51 +732,12 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
return nil, fmt.Errorf("milvus search failed: %v", err)
}
// Extract IDs from search results.
var ids []int64
for _, result := range results {
for i := 0; i < result.ResultCount; i++ {
id, err := result.IDs.GetAsInt64(i)
if err != nil {
continue
}
ids = append(ids, id)
}
}
if len(ids) == 0 {
return &ai.RetrieverResponse{
Documents: []*ai.Document{},
}, nil
}
// Construct filter expression for Query (e.g., "id IN [id1, id2, ...]").
filterExpr := fmt.Sprintf("id IN [%s]", joinInt64s(ids, ","))
// Perform query to get text and metadata.
queryOptions := []client.SearchQueryOptionFunc{
client.WithLimit(int64(count)),
}
// Note: Consistency level omitted due to undefined WithQueryConsistencyLevel.
// If WithConsistencyLevel is supported for Query in your SDK, uncomment below:
// queryOptions = append(queryOptions, client.WithConsistencyLevel(entity.ConsistencyBounded))
queryResults, err := ds.client.Query(
ctx,
ds.collection,
[]string{}, // partitions
filterExpr, // filter by IDs
[]string{textField, metadataField}, // output fields
queryOptions...,
)
if err != nil {
return nil, fmt.Errorf("milvus query failed: %v", err)
}
// Process query results.
// Process search results.
var docs []*ai.Document
// Find text and metadata columns in query results.
for _, result := range results {
// Find text and metadata columns in search results.
var textCol, metaCol entity.Column
for _, col := range queryResults {
for _, col := range result.Fields {
if col.Name() == textField {
textCol = col
}
......@@ -935,11 +748,11 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
// Ensure text column exists.
if textCol == nil {
return nil, fmt.Errorf("text column %s not found in query results", textField)
return nil, fmt.Errorf("text column %s not found in search results", textField)
}
// Iterate over rows (assuming columns have same length).
for i := 0; i < textCol.Len(); i++ {
for i := 0; i < result.ResultCount; i++ {
// Get text value.
text, err := textCol.GetAsString(i)
if err != nil {
......@@ -956,27 +769,22 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
fmt.Printf("Failed to parse metadata at index %d: %v\n", i, err)
continue
}
} else if err != nil {
fmt.Printf("Failed to get metadata string at index %d: %v\n", i, err)
}
}
// Print text and metadata in a format similar to insertion debug log.
// fmt.Printf("Row %d: text=%q, metadata=%v\n", i, text, metadata)
// Create document.
doc := ai.DocumentFromText(text, metadata)
docs = append(docs, doc)
}
}
return &ai.RetrieverResponse{
Documents: docs,
}, nil
}
// joinInt64s converts a slice of int64 to a comma-separated string.
func joinInt64s(ids []int64, sep string) string {
if len(ids) == 0 {
return ""
}
strs := make([]string, len(ids))
for i, id := range ids {
strs[i] = fmt.Sprintf("%d", id)
}
return strings.Join(strs, sep)
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment