Commit c535c829 authored by Wade's avatar Wade

fix search

parent 0fa9d4df
...@@ -19,6 +19,7 @@ package milvus ...@@ -19,6 +19,7 @@ package milvus
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"os" "os"
...@@ -337,6 +338,476 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error { ...@@ -337,6 +338,476 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
return nil return nil
} }
// // RetrieverOptions for Milvus retrieval.
// type RetrieverOptions struct {
// Count int `json:"count,omitempty"` // Max documents to retrieve.
// MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
// }
// // Retrieve implements the Retriever.Retrieve method.
// func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
// count := 3 // Default.
// metricTypeStr := "L2"
// if req.Options != nil {
// ropt, ok := req.Options.(*RetrieverOptions)
// if !ok {
// return nil, fmt.Errorf("milvus.Retrieve options have type %T, want %T", req.Options, &RetrieverOptions{})
// }
// if ropt.Count > 0 {
// count = ropt.Count
// }
// if ropt.MetricType != "" {
// metricTypeStr = ropt.MetricType
// }
// }
// // Map string metric type to entity.MetricType.
// var metricType entity.MetricType
// switch metricTypeStr {
// case "L2":
// metricType = entity.L2
// case "IP":
// metricType = entity.IP
// default:
// return nil, fmt.Errorf("unsupported metric type: %s", metricTypeStr)
// }
// // Embed query.
// ereq := &ai.EmbedRequest{
// Input: []*ai.Document{req.Query},
// Options: ds.embedderOptions,
// }
// eres, err := ds.embedder.Embed(ctx, ereq)
// if err != nil {
// return nil, fmt.Errorf("milvus retrieve embedding failed: %v", err)
// }
// if len(eres.Embeddings) == 0 {
// return nil, errors.New("no embeddings generated for query")
// }
// queryVector := entity.FloatVector(eres.Embeddings[0].Embedding)
// // Create search parameters.
// searchParams, err := entity.NewIndexHNSWSearchParam(64) // ef
// if err != nil {
// return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
// }
// // Perform vector search to get IDs.
// results, err := ds.client.Search(
// ctx,
// ds.collection,
// []string{}, // partitions
// "", // expr (TODO: add metadata filter if needed)
// []string{}, // Only need IDs for now, no output fields
// []entity.Vector{queryVector},
// vectorField,
// metricType,
// count,
// searchParams,
// )
// if err != nil {
// return nil, fmt.Errorf("milvus search failed: %v", err)
// }
// // Extract IDs from search results.
// var ids []int64
// for _, result := range results {
// for i := 0; i < result.ResultCount; i++ {
// id, err := result.IDs.GetAsInt64(i)
// if err != nil {
// continue
// }
// ids = append(ids, id)
// }
// }
// if len(ids) == 0 {
// return &ai.RetrieverResponse{
// Documents: []*ai.Document{},
// }, nil
// }
// // Construct filter expression for Query (e.g., "id IN [id1, id2, ...]").
// filterExpr := fmt.Sprintf("id IN [%s]", joinInt64s(ids, ","))
// // Perform query to get text and metadata.
// queryResults, err := ds.client.Query(
// ctx,
// ds.collection,
// []string{}, // partitions
// filterExpr, // filter by IDs
// []string{textField, metadataField}, // output fields
// client.WithConsistencyLevel(entity.ConsistencyBounded),
// client.WithLimit(count),
// )
// if err != nil {
// return nil, fmt.Errorf("milvus query failed: %v", err)
// }
// // Process query results.
// var docs []*ai.Document
// for _, result := range queryResults {
// textCol := result.GetColumn(textField)
// metaCol := result.GetColumn(metadataField)
// for i := 0; i < result.Len(); i++ {
// text, err := textCol.GetAsString(i)
// if err != nil {
// continue
// }
// // Assume metadata is stored as JSON or map.
// var metadata map[string]interface{}
// if metaCol != nil {
// // Handle metadata based on its type (e.g., JSON string or map).
// // Assuming metadata is a JSON string; adjust if it's stored differently.
// metaStr, err := metaCol.GetAsString(i)
// if err == nil && metaStr != "" {
// if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
// continue
// }
// }
// }
// doc := ai.DocumentFromText(text, metadata)
// docs = append(docs, doc)
// }
// }
// return &ai.RetrieverResponse{
// Documents: docs,
// }, nil
// }
// // joinInt64s converts a slice of int64 to a comma-separated string.
// func joinInt64s(ids []int64, sep string) string {
// if len(ids) == 0 {
// return ""
// }
// strs := make([]string, len(ids))
// for i, id := range ids {
// strs[i] = fmt.Sprintf("%d", id)
// }
// return strings.Join(strs, sep)
// }
// // RetrieverOptions for Milvus retrieval.
// type RetrieverOptions struct {
// Count int `json:"count,omitempty"` // Max documents to retrieve.
// MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
// }
// // Retrieve implements the Retriever.Retrieve method.
// func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
// count := 3 // Default.
// metricTypeStr := "L2"
// if req.Options != nil {
// ropt, ok := req.Options.(*RetrieverOptions)
// if !ok {
// return nil, fmt.Errorf("milvus.Retrieve options have type %T, want %T", req.Options, &RetrieverOptions{})
// }
// if ropt.Count > 0 {
// count = ropt.Count
// }
// if ropt.MetricType != "" {
// metricTypeStr = ropt.MetricType
// }
// }
// // Map string metric type to entity.MetricType.
// var metricType entity.MetricType
// switch metricTypeStr {
// case "L2":
// metricType = entity.L2
// case "IP":
// metricType = entity.IP
// default:
// return nil, fmt.Errorf("unsupported metric type: %s", metricTypeStr)
// }
// // Embed query.
// ereq := &ai.EmbedRequest{
// Input: []*ai.Document{req.Query},
// Options: ds.embedderOptions,
// }
// eres, err := ds.embedder.Embed(ctx, ereq)
// if err != nil {
// return nil, fmt.Errorf("milvus retrieve embedding failed: %v", err)
// }
// if len(eres.Embeddings) == 0 {
// return nil, errors.New("no embeddings generated for query")
// }
// queryVector := entity.FloatVector(eres.Embeddings[0].Embedding)
// // Create search parameters.
// searchParams, err := entity.NewIndexHNSWSearchParam(64) // ef
// if err != nil {
// return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
// }
// // Perform vector search to get IDs.
// results, err := ds.client.Search(
// ctx,
// ds.collection,
// []string{}, // partitions
// "", // expr (TODO: add metadata filter if needed)
// []string{}, // Only need IDs for now, no output fields
// []entity.Vector{queryVector},
// vectorField,
// metricType,
// count,
// searchParams,
// )
// if err != nil {
// return nil, fmt.Errorf("milvus search failed: %v", err)
// }
// // Extract IDs from search results.
// var ids []int64
// for _, result := range results {
// for i := 0; i < result.ResultCount; i++ {
// id, err := result.IDs.GetAsInt64(i)
// if err != nil {
// continue
// }
// ids = append(ids, id)
// }
// }
// if len(ids) == 0 {
// return &ai.RetrieverResponse{
// Documents: []*ai.Document{},
// }, nil
// }
// // Construct filter expression for Query (e.g., "id IN [id1, id2, ...]").
// filterExpr := fmt.Sprintf("id IN [%s]", joinInt64s(ids, ","))
// // Perform query to get text and metadata.
// queryResults, err := ds.client.Query(
// ctx,
// ds.collection,
// []string{}, // partitions
// filterExpr, // filter by IDs
// []string{textField, metadataField}, // output fields
// client.WithQueryConsistencyLevel(entity.ConsistencyBounded), // Corrected option
// client.WithLimit(count),
// )
// if err != nil {
// return nil, fmt.Errorf("milvus query failed: %v", err)
// }
// // Process query results.
// var docs []*ai.Document
// for _, result := range queryResults {
// textCol := result.GetColumn(textField)
// metaCol := result.GetColumn(metadataField)
// for i := 0; i < result.Len(); i++ {
// text, err := textCol.GetAsString(i)
// if err != nil {
// continue
// }
// // Assume metadata is stored as JSON or map.
// var metadata map[string]interface{}
// if metaCol != nil {
// // Handle metadata based on its type (e.g., JSON string or map).
// // Assuming metadata is a JSON string; adjust if stored differently.
// metaStr, err := metaCol.GetAsString(i)
// if err == nil && metaStr != "" {
// if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
// continue
// }
// }
// }
// doc := ai.DocumentFromText(text, metadata)
// docs = append(docs, doc)
// }
// }
// return &ai.RetrieverResponse{
// Documents: docs,
// }, nil
// }
// // joinInt64s converts a slice of int64 to a comma-separated string.
// func joinInt64s(ids []int64, sep string) string {
// if len(ids) == 0 {
// return ""
// }
// strs := make([]string, len(ids))
// for i, id := range ids {
// strs[i] = fmt.Sprintf("%d", id)
// }
// return strings.Join(strs, sep)
// }
// // RetrieverOptions for Milvus retrieval.
// type RetrieverOptions struct {
// Count int `json:"count,omitempty"` // Max documents to retrieve.
// MetricType string `json:"metric_type,omitempty"` // Similarity metric (e.g., "L2", "IP").
// }
// // Retrieve implements the Retriever.Retrieve method.
// func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai.RetrieverResponse, error) {
// count := 3 // Default.
// metricTypeStr := "L2"
// if req.Options != nil {
// ropt, ok := req.Options.(*RetrieverOptions)
// if !ok {
// return nil, fmt.Errorf("milvus.Retrieve options have type %T, want %T", req.Options, &RetrieverOptions{})
// }
// if ropt.Count > 0 {
// count = ropt.Count
// }
// if ropt.MetricType != "" {
// metricTypeStr = ropt.MetricType
// }
// }
// // Map string metric type to entity.MetricType.
// var metricType entity.MetricType
// switch metricTypeStr {
// case "L2":
// metricType = entity.L2
// case "IP":
// metricType = entity.IP
// default:
// return nil, fmt.Errorf("unsupported metric type: %s", metricTypeStr)
// }
// // Embed query.
// ereq := &ai.EmbedRequest{
// Input: []*ai.Document{req.Query},
// Options: ds.embedderOptions,
// }
// eres, err := ds.embedder.Embed(ctx, ereq)
// if err != nil {
// return nil, fmt.Errorf("milvus retrieve embedding failed: %v", err)
// }
// if len(eres.Embeddings) == 0 {
// return nil, errors.New("no embeddings generated for query")
// }
// queryVector := entity.FloatVector(eres.Embeddings[0].Embedding)
// // Create search parameters.
// searchParams, err := entity.NewIndexHNSWSearchParam(64) // ef
// if err != nil {
// return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
// }
// // Perform vector search to get IDs.
// results, err := ds.client.Search(
// ctx,
// ds.collection,
// []string{}, // partitions
// "", // expr (TODO: add metadata filter if needed)
// []string{}, // Only need IDs for now, no output fields
// []entity.Vector{queryVector},
// vectorField,
// metricType,
// count,
// searchParams,
// )
// if err != nil {
// return nil, fmt.Errorf("milvus search failed: %v", err)
// }
// // Extract IDs from search results.
// var ids []int64
// for _, result := range results {
// for i := 0; i < result.ResultCount; i++ {
// id, err := result.IDs.GetAsInt64(i)
// if err != nil {
// continue
// }
// ids = append(ids, id)
// }
// }
// if len(ids) == 0 {
// return &ai.RetrieverResponse{
// Documents: []*ai.Document{},
// }, nil
// }
// // Construct filter expression for Query (e.g., "id IN [id1, id2, ...]").
// filterExpr := fmt.Sprintf("id IN [%s]", joinInt64s(ids, ","))
// // Perform query to get text and metadata.
// queryOptions := []client.SearchQueryOptionFunc{
// client.WithLimit(int64(count)),
// }
// // Add consistency level if supported by your SDK version.
// // If WithConsistencyLevel is not supported for Query, omit it or check SDK docs.
// // queryOptions = append(queryOptions, client.WithConsistencyLevel(entity.ConsistencyBounded))
// queryResults, err := ds.client.Query(
// ctx,
// ds.collection,
// []string{}, // partitions
// filterExpr, // filter by IDs
// []string{textField, metadataField}, // output fields
// queryOptions...,
// )
// if err != nil {
// return nil, fmt.Errorf("milvus query failed: %v", err)
// }
// // Process query results.
// var docs []*ai.Document
// for _, result := range queryResults {
// textCol := result.GetColumn(textField)
// metaCol := result.GetColumn(metadataField)
// for i := 0; i < result.Len(); i++ {
// text, err := textCol.GetAsString(i)
// if err != nil {
// continue
// }
// // Assume metadata is stored as JSON or map.
// var metadata map[string]interface{}
// if metaCol != nil {
// // Handle metadata based on its type (e.g., JSON string or map).
// // Assuming metadata is a JSON string; adjust if stored differently.
// metaStr, err := metaCol.GetAsString(i)
// if err == nil && metaStr != "" {
// if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
// continue
// }
// }
// }
// doc := ai.DocumentFromText(text, metadata)
// docs = append(docs, doc)
// }
// }
// return &ai.RetrieverResponse{
// Documents: docs,
// }, nil
// }
// // joinInt64s converts a slice of int64 to a comma-separated string.
// func joinInt64s(ids []int64, sep string) string {
// if len(ids) == 0 {
// return ""
// }
// strs := make([]string, len(ids))
// for i, id := range ids {
// strs[i] = fmt.Sprintf("%d", id)
// }
// return strings.Join(strs, sep)
// }
// RetrieverOptions for Milvus retrieval. // RetrieverOptions for Milvus retrieval.
type RetrieverOptions struct { type RetrieverOptions struct {
Count int `json:"count,omitempty"` // Max documents to retrieve. Count int `json:"count,omitempty"` // Max documents to retrieve.
...@@ -391,15 +862,14 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai ...@@ -391,15 +862,14 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err) return nil, fmt.Errorf("NewIndexHNSWSearchParam failed: %v", err)
} }
// TODO 元数据 过滤条件 // Perform vector search to get IDs.
// Perform search.
results, err := ds.client.Search( results, err := ds.client.Search(
ctx, ctx,
ds.collection, ds.collection,
//ds.compostore.collection,
[]string{}, // partitions []string{}, // partitions
"", // expr "", // expr (TODO: add metadata filter if needed)
[]string{textField, metadataField}, // output fields []string{}, // Only need IDs for now, no output fields
[]entity.Vector{queryVector}, []entity.Vector{queryVector},
vectorField, vectorField,
metricType, metricType,
...@@ -410,22 +880,103 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai ...@@ -410,22 +880,103 @@ func (ds *docStore) Retrieve(ctx context.Context, req *ai.RetrieverRequest) (*ai
return nil, fmt.Errorf("milvus search failed: %v", err) return nil, fmt.Errorf("milvus search failed: %v", err)
} }
// Process results. // Extract IDs from search results.
var docs []*ai.Document var ids []int64
for _, result := range results { for _, result := range results {
for i := 0; i < result.ResultCount; i++ { for i := 0; i < result.ResultCount; i++ {
textCol := result.Fields.GetColumn(textField) id, err := result.IDs.GetAsInt64(i)
if err != nil {
continue
}
ids = append(ids, id)
}
}
if len(ids) == 0 {
return &ai.RetrieverResponse{
Documents: []*ai.Document{},
}, nil
}
// Construct filter expression for Query (e.g., "id IN [id1, id2, ...]").
filterExpr := fmt.Sprintf("id IN [%s]", joinInt64s(ids, ","))
// Perform query to get text and metadata.
queryOptions := []client.SearchQueryOptionFunc{
client.WithLimit(int64(count)),
}
// Note: Consistency level omitted due to undefined WithQueryConsistencyLevel.
// If WithConsistencyLevel is supported for Query in your SDK, uncomment below:
// queryOptions = append(queryOptions, client.WithConsistencyLevel(entity.ConsistencyBounded))
queryResults, err := ds.client.Query(
ctx,
ds.collection,
[]string{}, // partitions
filterExpr, // filter by IDs
[]string{textField, metadataField}, // output fields
queryOptions...,
)
if err != nil {
return nil, fmt.Errorf("milvus query failed: %v", err)
}
// Process query results.
var docs []*ai.Document
// Find text and metadata columns in query results.
var textCol, metaCol entity.Column
for _, col := range queryResults {
if col.Name() == textField {
textCol = col
}
if col.Name() == metadataField {
metaCol = col
}
}
// Ensure text column exists.
if textCol == nil {
return nil, fmt.Errorf("text column %s not found in query results", textField)
}
// Iterate over rows (assuming columns have same length).
for i := 0; i < textCol.Len(); i++ {
// Get text value.
text, err := textCol.GetAsString(i) text, err := textCol.GetAsString(i)
if err != nil { if err != nil {
fmt.Printf("Failed to parse text at index %d: %v\n", i, err)
continue continue
} }
// Get metadata value (optional, as metadata column may be missing).
var metadata map[string]interface{} var metadata map[string]interface{}
if metaCol != nil {
metaStr, err := metaCol.GetAsString(i)
if err == nil && metaStr != "" {
if err := json.Unmarshal([]byte(metaStr), &metadata); err != nil {
fmt.Printf("Failed to parse metadata at index %d: %v\n", i, err)
continue
}
}
}
// Create document.
doc := ai.DocumentFromText(text, metadata) doc := ai.DocumentFromText(text, metadata)
docs = append(docs, doc) docs = append(docs, doc)
} }
}
return &ai.RetrieverResponse{ return &ai.RetrieverResponse{
Documents: docs, Documents: docs,
}, nil }, nil
} }
// joinInt64s converts a slice of int64 to a comma-separated string.
func joinInt64s(ids []int64, sep string) string {
if len(ids) == 0 {
return ""
}
strs := make([]string, len(ids))
for i, id := range ids {
strs[i] = fmt.Sprintf("%d", id)
}
return strings.Join(strs, sep)
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment