-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathpgvector.py
99 lines (86 loc) · 3.13 KB
/
pgvector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from typing import List
import vecs
from semantic_router.encoders import BaseEncoder
from tqdm import tqdm
from qdrant_client.http import models as rest
from models.delete import DeleteResponse
from models.document import BaseDocumentChunk
from models.query import Filter
from vectordbs.base import BaseVectorDatabase
MAX_QUERY_TOP_K = 5
class PGVectorService(BaseVectorDatabase):
def __init__(
self, index_name: str, dimension: int, credentials: dict, encoder: BaseEncoder
):
super().__init__(
index_name=index_name,
dimension=dimension,
credentials=credentials,
encoder=encoder,
)
client = vecs.create_client(connection_string=credentials["database_uri"])
self.collection = client.get_or_create_collection(
name=self.index_name,
dimension=dimension,
)
# TODO: remove this
async def convert_to_rerank_format(self, chunks: List[rest.PointStruct]):
docs = [
{
"content": chunk.payload.get("content"),
"page_label": chunk.payload.get("page_label"),
"file_url": chunk.payload.get("file_url"),
}
for chunk in chunks
]
return docs
async def upsert(self, chunks: List[BaseDocumentChunk]) -> None:
records = []
for chunk in tqdm(chunks, desc="Upserting to PGVector"):
records.append(
(
chunk.id,
chunk.dense_embedding,
{
"document_id": chunk.document_id,
"content": chunk.content,
"doc_url": chunk.doc_url,
**(chunk.metadata if chunk.metadata else {}),
},
)
)
self.collection.upsert(records)
self.collection.create_index()
async def query(
self, input: str, filter: Filter = None, top_k: int = MAX_QUERY_TOP_K
) -> List:
vectors = await self._generate_vectors(input=input)
results = self.collection.query(
data=vectors[0],
limit=top_k,
include_metadata=True,
include_value=False,
filters=filter.model_dump() if filter else {},
)
chunks = []
for result in results:
(
id,
metadata,
) = result
chunks.append(
BaseDocumentChunk(
id=id,
source_type=metadata.get("filetype"),
source=metadata.get("doc_url"),
document_id=metadata.get("document_id"),
content=metadata.get("content"),
doc_url=metadata.get("doc_url"),
page_number=metadata.get("page_number"),
metadata={**metadata},
)
)
return chunks
async def delete(self, file_url: str) -> None:
deleted = self.collection.delete(filters={"doc_url": {"$eq": file_url}})
return DeleteResponse(num_of_deleted_chunks=len(deleted))