Commit ddcc501a authored by Wade's avatar Wade

graph idx ok

parent 6ea40366
......@@ -19,10 +19,12 @@ package graphrag
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"strconv"
"strings"
......@@ -137,6 +139,51 @@ func (c *Client) AddDocument(spaceID string, req DocumentRequest) (*http.Respons
return resp, nil
}
// SyncDocumentsRequest defines the request body for the sync documents endpoint.
type SyncDocumentsRequest struct {
DocIDs []string `json:"doc_ids"`
}
// SyncDocuments sends a POST request to sync documents for the given spaceID.
func (c *Client) SyncDocuments(spaceID string, docIDs []string) (success bool, err error) {
url := fmt.Sprintf("%s/knowledge/%s/document/sync", c.BaseURL, spaceID)
reqBody := SyncDocumentsRequest{
DocIDs: docIDs,
}
body, err := json.Marshal(reqBody)
if err != nil {
return false, fmt.Errorf("failed to marshal request: %w", err)
}
httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
if err != nil {
return false, fmt.Errorf("failed to create request: %w", err)
}
httpReq.Header.Set("Accept", "application/json")
httpReq.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(httpReq)
if err != nil {
return false, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return false, fmt.Errorf("failed to read response body: %w", err)
}
if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(respBody))
}
return success, nil
}
// SyncBatchDocument 同步批量处理文档
func (c *Client) SyncBatchDocument(spaceID string, req []SyncBatchRequest) (*http.Response, error) {
url := fmt.Sprintf("%s/knowledge/%s/document/sync_batch", c.BaseURL, spaceID)
......@@ -280,21 +327,69 @@ func Retriever(g *genkit.Genkit, spaceID string) ai.Retriever {
return genkit.LookupRetriever(g, provider, spaceID)
}
// generateRandomDocName generates a random alphanumeric string of the specified length.
func generateRandomDocName(length int) (string, error) {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
var result strings.Builder
result.Grow(length)
for i := 0; i < length; i++ {
idx, err := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
if err != nil {
return "", fmt.Errorf("failed to generate random index: %w", err)
}
result.WriteByte(charset[idx.Int64()])
}
return result.String(), nil
}
// ParseJSONResponse parses a JSON byte slice and extracts the success boolean and data fields as a string.
func ParseJSONResponse(jsonBytes []byte) (success bool, data string, err error) {
// Define struct to capture only the needed fields
type jsonResponse struct {
Success bool `json:"success"`
Data int `json:"data"` // Use string to capture JSON string data
}
var resp jsonResponse
if err := json.Unmarshal(jsonBytes, &resp); err != nil {
return false, "", fmt.Errorf("failed to unmarshal JSON: %w", err)
}
return resp.Success, fmt.Sprintf("%d",resp.Data), nil
}
// Index implements the Indexer.Index method.
func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
if len(req.Documents) == 0 {
return nil
}
userid := ""
usernmae := ""
for _, doc := range req.Documents {
if v, ok := doc.Metadata["user_id"]; ok {
if str, isString := v.(string); isString {
userid = str
}
}
if v, ok := doc.Metadata["username"]; ok {
if str, isString := v.(string); isString {
usernmae = str
}
}
}
// Create knowledge space.
spaceReq := SpaceRequest{
ID: 1,
Name: ds.spaceID,
VectorType: "hnsw",
Name: userid,
VectorType: "KnowledgeGraph",
DomainType: "Normal",
Desc: "Default knowledge space",
Owner: "admin",
SpaceID: 1,
Desc: usernmae,
Owner: userid,
}
resp, err := ds.client.AddSpace(spaceReq)
if err != nil {
......@@ -306,19 +401,38 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
return fmt.Errorf("add space failed with status %d: %s", resp.StatusCode, string(body))
}
fmt.Println("space ok")
spaceId := userid
// Index each document.
for i, doc := range req.Documents {
// Ensure metadata includes user_id and username.
if doc.Metadata == nil {
doc.Metadata = make(map[string]interface{})
docName := ""
if v, ok := doc.Metadata["doc_name"]; ok {
if str, isString := v.(string); isString {
docName = str
} else {
// Generate random docName.
var err error
docName, err = generateRandomDocName(8)
if err != nil {
return fmt.Errorf("generate random docName for document %d: %w", i+1, err)
}
}
if _, ok := doc.Metadata["user_id"]; !ok {
doc.Metadata["user_id"] = "user123" // Mock data.
} else {
// Generate random docName.
var err error
docName, err = generateRandomDocName(8)
if err != nil {
return fmt.Errorf("generate random docName for document %d: %w", i+1, err)
}
if _, ok := doc.Metadata["username"]; !ok {
doc.Metadata["username"] = "Alice" // Mock data.
}
fmt.Println("docName: ", docName)
// Add document.
var sb strings.Builder
for _, p := range doc.Content {
......@@ -327,52 +441,43 @@ func (ds *docStore) Index(ctx context.Context, req *ai.IndexerRequest) error {
}
}
text := sb.String()
fmt.Println("text: ",text)
docReq := DocumentRequest{
DocName: fmt.Sprintf("doc_%d", i+1),
DocID: i + 1,
DocType: "text",
DocToken: "",
Content: text,
DocName: docName,
Source: "api",
DocType: "TEXT",
Content: text,
Labels: "",
Questions: []string{},
// Questions: []string{},
Metadata: doc.Metadata,
}
resp, err := ds.client.AddDocument(ds.spaceID, docReq)
resp, err := ds.client.AddDocument(spaceId, docReq)
if err != nil {
return fmt.Errorf("add document %d: %w", i+1, err)
}
body, _ := io.ReadAll(resp.Body)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("add document %d failed with status %d: %s", i+1, resp.StatusCode, string(body))
}
// Sync document for embedding.
syncReq := []SyncBatchRequest{
{
DocID: docReq.DocID,
SpaceID: ds.spaceID,
ModelName: ds.modelName,
ChunkParameters: ChunkParameters{
ChunkStrategy: "sentence",
TextSplitter: "recursive",
SplitterType: "user_define",
ChunkSize: 512,
ChunkOverlap: 50,
Separator: "\n",
EnableMerge: true,
},
},
}
syncResp, err := ds.client.SyncBatchDocument(ds.spaceID, syncReq)
ok, idx, err := ParseJSONResponse(body)
if err != nil {
return fmt.Errorf("sync batch document %d: %w", i+1, err)
return fmt.Errorf("ParseJSONResponse %d: %w", i+1, err)
}
defer syncResp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(syncResp.Body)
return fmt.Errorf("sync batch document %d failed with status %d: %s", i+1, syncResp.StatusCode, string(body))
if !ok{
return fmt.Errorf("ParseJSONResponse body %d: %w", i+1, err)
}
fmt.Println("document ok",string(body),idx)
ok ,err =ds.client.SyncDocuments(spaceId,[]string{idx})
if err != nil{
return err
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment