From 71ddc55bed38291421ba210a7efba931476cb4a5 Mon Sep 17 00:00:00 2001 From: Ishaan Sehgal Date: Tue, 5 Nov 2024 18:08:42 -0800 Subject: [PATCH] feat: Introduce Abstract Class for Integration Testing (#674) **Reason for Change**: Introduce abstract class for testing new integrations with ChromaDB as the first example. --- pkg/ragengine/services/requirements.txt | 1 + .../tests/vector_store/test_base_store.py | 119 ++++++++++++++ .../tests/vector_store/test_chromadb_store.py | 33 ++++ .../tests/vector_store/test_faiss_store.py | 147 ++++-------------- .../services/vector_store/chromadb_store.py | 68 ++++++++ 5 files changed, 250 insertions(+), 118 deletions(-) create mode 100644 pkg/ragengine/services/tests/vector_store/test_base_store.py create mode 100644 pkg/ragengine/services/tests/vector_store/test_chromadb_store.py create mode 100644 pkg/ragengine/services/vector_store/chromadb_store.py diff --git a/pkg/ragengine/services/requirements.txt b/pkg/ragengine/services/requirements.txt index c91a95430..a5b460c7a 100644 --- a/pkg/ragengine/services/requirements.txt +++ b/pkg/ragengine/services/requirements.txt @@ -10,6 +10,7 @@ llama-index-llms-huggingface-api fastapi faiss-cpu llama-index-vector-stores-faiss +llama-index-vector-stores-chroma llama-index-vector-stores-azurecosmosmongo uvicorn # For UTs diff --git a/pkg/ragengine/services/tests/vector_store/test_base_store.py b/pkg/ragengine/services/tests/vector_store/test_base_store.py new file mode 100644 index 000000000..017c036e6 --- /dev/null +++ b/pkg/ragengine/services/tests/vector_store/test_base_store.py @@ -0,0 +1,119 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os +from unittest.mock import patch +import pytest +from abc import ABC, abstractmethod + +from services.vector_store.base import BaseVectorStore +from services.models import Document +from services.embedding.huggingface_local import LocalHuggingFaceEmbedding +from services.config import MODEL_ID, INFERENCE_URL, INFERENCE_ACCESS_SECRET +from services.config import PERSIST_DIR + +class BaseVectorStoreTest(ABC): + """Base class for vector store tests that defines the test structure.""" + + @pytest.fixture(scope='session') + def init_embed_manager(self): + return LocalHuggingFaceEmbedding(MODEL_ID) + + @pytest.fixture + @abstractmethod + def vector_store_manager(self, init_embed_manager): + """Each implementation must provide its own vector store manager.""" + pass + + @property + @abstractmethod + def expected_query_score(self): + """Override this in implementation-specific test classes.""" + pass + + def test_index_documents(self, vector_store_manager): + first_doc_text, second_doc_text = "First document", "Second document" + documents = [ + Document(text=first_doc_text, metadata={"type": "text"}), + Document(text=second_doc_text, metadata={"type": "text"}) + ] + + doc_ids = vector_store_manager.index_documents("test_index", documents) + + assert len(doc_ids) == 2 + assert set(doc_ids) == {BaseVectorStore.generate_doc_id(first_doc_text), + BaseVectorStore.generate_doc_id(second_doc_text)} + + def test_index_documents_isolation(self, vector_store_manager): + documents1 = [ + Document(text="First document in index1", metadata={"type": "text"}), + ] + documents2 = [ + Document(text="First document in index2", metadata={"type": "text"}), + ] + + # Index documents in separate indices + index_name_1, index_name_2 = "index1", "index2" + vector_store_manager.index_documents(index_name_1, documents1) + vector_store_manager.index_documents(index_name_2, documents2) + + # Call the backend-specific check method + self.check_indexed_documents(vector_store_manager) + + @abstractmethod + def check_indexed_documents(self, vector_store_manager): + """Abstract method to check indexed documents in backend-specific format.""" + pass + + @patch('requests.post') + def test_query_documents(self, mock_post, vector_store_manager): + mock_response = { + "result": "This is the completion from the API" + } + mock_post.return_value.json.return_value = mock_response + + documents = [ + Document(text="First document", metadata={"type": "text"}), + Document(text="Second document", metadata={"type": "text"}) + ] + vector_store_manager.index_documents("test_index", documents) + + params = {"temperature": 0.7} + query_result = vector_store_manager.query("test_index", "First", top_k=1, llm_params=params) + + assert query_result is not None + assert query_result["response"] == "{'result': 'This is the completion from the API'}" + assert query_result["source_nodes"][0]["text"] == "First document" + assert query_result["source_nodes"][0]["score"] == pytest.approx(self.expected_query_score, rel=1e-6) + + mock_post.assert_called_once_with( + INFERENCE_URL, + json={"prompt": "Context information is below.\n---------------------\ntype: text\n\nFirst document\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: First\nAnswer: ", "formatted": True, 'temperature': 0.7}, + headers={"Authorization": f"Bearer {INFERENCE_ACCESS_SECRET}"} + ) + + def test_add_document(self, vector_store_manager): + documents = [Document(text="Third document", metadata={"type": "text"})] + vector_store_manager.index_documents("test_index", documents) + + new_document = [Document(text="Fourth document", metadata={"type": "text"})] + vector_store_manager.index_documents("test_index", new_document) + + assert vector_store_manager.document_exists("test_index", new_document[0], + BaseVectorStore.generate_doc_id("Fourth document")) + + def test_persist_index_1(self, vector_store_manager): + documents = [Document(text="Test document", metadata={"type": "text"})] + vector_store_manager.index_documents("test_index", documents) + vector_store_manager._persist("test_index") + assert os.path.exists(PERSIST_DIR) + + def test_persist_index_2(self, vector_store_manager): + documents = [Document(text="Test document", metadata={"type": "text"})] + vector_store_manager.index_documents("test_index", documents) + + documents = [Document(text="Another Test document", metadata={"type": "text"})] + vector_store_manager.index_documents("another_test_index", documents) + + vector_store_manager._persist_all() + assert os.path.exists(PERSIST_DIR) diff --git a/pkg/ragengine/services/tests/vector_store/test_chromadb_store.py b/pkg/ragengine/services/tests/vector_store/test_chromadb_store.py new file mode 100644 index 000000000..37ddbe1e4 --- /dev/null +++ b/pkg/ragengine/services/tests/vector_store/test_chromadb_store.py @@ -0,0 +1,33 @@ + +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import pytest +import os + +from tempfile import TemporaryDirectory +from services.tests.vector_store.test_base_store import BaseVectorStoreTest +from services.vector_store.chromadb_store import ChromaDBVectorStoreHandler + +class TestChromaDBVectorStore(BaseVectorStoreTest): + """Test implementation for ChromaDB vector store.""" + + @pytest.fixture + def vector_store_manager(self, init_embed_manager): + with TemporaryDirectory() as temp_dir: + print(f"Saving temporary test storage at: {temp_dir}") + os.environ['PERSIST_DIR'] = temp_dir + manager = ChromaDBVectorStoreHandler(init_embed_manager) + manager._clear_collection_and_indexes() + yield manager + + def check_indexed_documents(self, vector_store_manager): + indexed_docs = vector_store_manager.list_all_indexed_documents() + assert len(indexed_docs) == 2 + assert list(indexed_docs["index1"].values())[0]["text"] == "First document in index1" + assert list(indexed_docs["index2"].values())[0]["text"] == "First document in index2" + + @property + def expected_query_score(self): + """Override this in implementation-specific test classes.""" + return 0.5601649858735368 diff --git a/pkg/ragengine/services/tests/vector_store/test_faiss_store.py b/pkg/ragengine/services/tests/vector_store/test_faiss_store.py index 9d8595f2c..9cd9a41ea 100644 --- a/pkg/ragengine/services/tests/vector_store/test_faiss_store.py +++ b/pkg/ragengine/services/tests/vector_store/test_faiss_store.py @@ -1,126 +1,37 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -import os -from tempfile import TemporaryDirectory -from unittest.mock import patch - import pytest +import os -from services.vector_store.base import BaseVectorStore +from tempfile import TemporaryDirectory +from services.tests.vector_store.test_base_store import BaseVectorStoreTest from services.vector_store.faiss_store import FaissVectorStoreHandler -from services.models import Document -from services.embedding.huggingface_local import LocalHuggingFaceEmbedding -from services.config import MODEL_ID, INFERENCE_URL, INFERENCE_ACCESS_SECRET -from services.config import PERSIST_DIR - -@pytest.fixture(scope='session') -def init_embed_manager(): - return LocalHuggingFaceEmbedding(MODEL_ID) -@pytest.fixture -def vector_store_manager(init_embed_manager): - with TemporaryDirectory() as temp_dir: - print(f"Saving temporary test storage at: {temp_dir}") - # Mock the persistence directory - os.environ['PERSIST_DIR'] = temp_dir - yield FaissVectorStoreHandler(init_embed_manager) - -def test_index_documents(vector_store_manager): - first_doc_text, second_doc_text = "First document", "Second document" - documents = [ - Document(text=first_doc_text, metadata={"type": "text"}), - Document(text=second_doc_text, metadata={"type": "text"}) - ] - - doc_ids = vector_store_manager.index_documents("test_index", documents) +class TestFaissVectorStore(BaseVectorStoreTest): + """Test implementation for FAISS vector store.""" - assert len(doc_ids) == 2 - assert set(doc_ids) == {BaseVectorStore.generate_doc_id(first_doc_text), - BaseVectorStore.generate_doc_id(second_doc_text)} - -def test_index_documents_isolation(vector_store_manager): - documents1 = [ - Document(text="First document in index1", metadata={"type": "text"}), - ] - documents2 = [ - Document(text="First document in index2", metadata={"type": "text"}), - ] - - # Index documents in separate indices - index_name_1, index_name_2 = "index1", "index2" - vector_store_manager.index_documents(index_name_1, documents1) - vector_store_manager.index_documents(index_name_2, documents2) - - assert vector_store_manager.list_all_indexed_documents() == { - 'index1': {"87117028123498eb7d757b1507aa3e840c63294f94c27cb5ec83c939dedb32fd": - {'hash': '1e64a170be48c45efeaa8667ab35919106da0489ec99a11d0029f2842db133aa', - 'text': 'First document in index1'}}, - 'index2': {"49b198c0e126a99e1975f17b564756c25b4ad691a57eda583e232fd9bee6de91": - {'hash': 'a222f875b83ce8b6eb72b3cae278b620de9bcc7c6b73222424d3ce979d1a463b', - 'text': 'First document in index2'}} - } - -@patch('requests.post') -def test_query_documents(mock_post, vector_store_manager): - # Define Mock Response for Custom Inference API - mock_response = { - "result": "This is the completion from the API" - } - - mock_post.return_value.json.return_value = mock_response - - # Add documents to index - documents = [ - Document(text="First document", metadata={"type": "text"}), - Document(text="Second document", metadata={"type": "text"}) - ] - vector_store_manager.index_documents("test_index", documents) - - params = {"temperature": 0.7} - # Mock query and results - query_result = vector_store_manager.query("test_index", "First", top_k=1, llm_params=params) - - assert query_result is not None - assert query_result["response"] == "{'result': 'This is the completion from the API'}" - assert query_result["source_nodes"][0]["text"] == "First document" - assert query_result["source_nodes"][0]["score"] == pytest.approx(0.5795239210128784, rel=1e-6) - - mock_post.assert_called_once_with( - INFERENCE_URL, - # Auto-Generated by LlamaIndex - json={"prompt": "Context information is below.\n---------------------\ntype: text\n\nFirst document\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: First\nAnswer: ", "formatted": True, 'temperature': 0.7}, - headers={"Authorization": f"Bearer {INFERENCE_ACCESS_SECRET}"} - ) - -def test_add_document(vector_store_manager): - documents = [Document(text="Third document", metadata={"type": "text"})] - vector_store_manager.index_documents("test_index", documents) - - # Add a document to the existing index - new_document = [Document(text="Fourth document", metadata={"type": "text"})] - vector_store_manager.index_documents("test_index", new_document) - - # Assert that the document exists - assert vector_store_manager.document_exists("test_index", new_document[0], - BaseVectorStore.generate_doc_id("Fourth document")) - -def test_persist_index_1(vector_store_manager): - """Test that the index store is persisted.""" - # Add a document and persist the index - documents = [Document(text="Test document", metadata={"type": "text"})] - vector_store_manager.index_documents("test_index", documents) - vector_store_manager._persist("test_index") - assert os.path.exists(PERSIST_DIR) - -def test_persist_index_2(vector_store_manager): - """Test that an index store is persisted.""" - # Add a document and persist the index - documents = [Document(text="Test document", metadata={"type": "text"})] - vector_store_manager.index_documents("test_index", documents) - - documents = [Document(text="Another Test document", metadata={"type": "text"})] - vector_store_manager.index_documents("another_test_index", documents) - - vector_store_manager._persist_all() - assert os.path.exists(PERSIST_DIR) + @pytest.fixture + def vector_store_manager(self, init_embed_manager): + with TemporaryDirectory() as temp_dir: + print(f"Saving temporary test storage at: {temp_dir}") + os.environ['PERSIST_DIR'] = temp_dir + yield FaissVectorStoreHandler(init_embed_manager) + + def check_indexed_documents(self, vector_store_manager): + expected_output = { + 'index1': {"87117028123498eb7d757b1507aa3e840c63294f94c27cb5ec83c939dedb32fd": { + 'hash': '1e64a170be48c45efeaa8667ab35919106da0489ec99a11d0029f2842db133aa', + 'text': 'First document in index1' + }}, + 'index2': {"49b198c0e126a99e1975f17b564756c25b4ad691a57eda583e232fd9bee6de91": { + 'hash': 'a222f875b83ce8b6eb72b3cae278b620de9bcc7c6b73222424d3ce979d1a463b', + 'text': 'First document in index2' + }} + } + assert vector_store_manager.list_all_indexed_documents() == expected_output + + @property + def expected_query_score(self): + """Override this in implementation-specific test classes.""" + return 0.5795239210128784 \ No newline at end of file diff --git a/pkg/ragengine/services/vector_store/chromadb_store.py b/pkg/ragengine/services/vector_store/chromadb_store.py new file mode 100644 index 000000000..639facae4 --- /dev/null +++ b/pkg/ragengine/services/vector_store/chromadb_store.py @@ -0,0 +1,68 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from typing import Dict, List +from services.models import Document +import logging + +import chromadb +import json +from llama_index.vector_stores.chroma import ChromaVectorStore + +from .base import BaseVectorStore + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class ChromaDBVectorStoreHandler(BaseVectorStore): + def __init__(self, embedding_manager): + super().__init__(embedding_manager) + self.chroma_client = chromadb.EphemeralClient() + + def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]: + chroma_collection = self.chroma_client.create_collection(index_name) + vector_store = ChromaVectorStore(chroma_collection=chroma_collection) + return self._create_index_common(index_name, documents, vector_store) + + def document_exists(self, index_name: str, doc: Document, doc_id: str) -> bool: + """ChromaDB for checking document existence.""" + if index_name not in self.index_map: + logger.warning(f"No such index: '{index_name}' exists in vector store.") + return False + return doc.text in self.chroma_client.get_collection(name=index_name).get()["documents"] + + def list_all_indexed_documents(self) -> Dict[str, Dict[str, Dict[str, str]]]: + indexed_docs = {} # Accumulate documents across all indexes + try: + for collection in self.chroma_client.list_collections(): + collection_info = collection.get() + for doc in zip(collection_info["ids"], collection_info["documents"], collection_info["metadatas"]): + indexed_docs.setdefault(collection.name, {})[doc[0]] = { + "text": doc[1], + "metadata": json.dumps(doc[2]), + } + except Exception as e: + print(f"Failed to get all collections in the ChromaDB instance: {e}") + return indexed_docs + + def _clear_collection_and_indexes(self): + """Clears all collections and drops all indexes in the ChromaDB instance. + + This method is primarily intended for testing purposes to ensure + a clean state between tests, preventing index and document conflicts. + """ + try: + # Get all collections + collections = self.chroma_client.list_collections() + + # Delete each collection + for collection in collections: + collection_name = collection.name + self.chroma_client.delete_collection(name=collection_name) + print(f"Collection '{collection_name}' has been deleted.") + + print("All collections in the ChromaDB instance have been deleted.") + except Exception as e: + print(f"Failed to clear collections in the ChromaDB instance: {e}") + \ No newline at end of file