Skip to content

Commit

Permalink
feat: Introduce Abstract Class for Integration Testing (#674)
Browse files Browse the repository at this point in the history
**Reason for Change**:
Introduce abstract class for testing new integrations with ChromaDB as
the first example.
  • Loading branch information
ishaansehgal99 authored Nov 6, 2024
1 parent 79e425c commit 71ddc55
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 118 deletions.
1 change: 1 addition & 0 deletions pkg/ragengine/services/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ llama-index-llms-huggingface-api
fastapi
faiss-cpu
llama-index-vector-stores-faiss
llama-index-vector-stores-chroma
llama-index-vector-stores-azurecosmosmongo
uvicorn
# For UTs
Expand Down
119 changes: 119 additions & 0 deletions pkg/ragengine/services/tests/vector_store/test_base_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import os
from unittest.mock import patch
import pytest
from abc import ABC, abstractmethod

from services.vector_store.base import BaseVectorStore
from services.models import Document
from services.embedding.huggingface_local import LocalHuggingFaceEmbedding
from services.config import MODEL_ID, INFERENCE_URL, INFERENCE_ACCESS_SECRET
from services.config import PERSIST_DIR

class BaseVectorStoreTest(ABC):
"""Base class for vector store tests that defines the test structure."""

@pytest.fixture(scope='session')
def init_embed_manager(self):
return LocalHuggingFaceEmbedding(MODEL_ID)

@pytest.fixture
@abstractmethod
def vector_store_manager(self, init_embed_manager):
"""Each implementation must provide its own vector store manager."""
pass

@property
@abstractmethod
def expected_query_score(self):
"""Override this in implementation-specific test classes."""
pass

def test_index_documents(self, vector_store_manager):
first_doc_text, second_doc_text = "First document", "Second document"
documents = [
Document(text=first_doc_text, metadata={"type": "text"}),
Document(text=second_doc_text, metadata={"type": "text"})
]

doc_ids = vector_store_manager.index_documents("test_index", documents)

assert len(doc_ids) == 2
assert set(doc_ids) == {BaseVectorStore.generate_doc_id(first_doc_text),
BaseVectorStore.generate_doc_id(second_doc_text)}

def test_index_documents_isolation(self, vector_store_manager):
documents1 = [
Document(text="First document in index1", metadata={"type": "text"}),
]
documents2 = [
Document(text="First document in index2", metadata={"type": "text"}),
]

# Index documents in separate indices
index_name_1, index_name_2 = "index1", "index2"
vector_store_manager.index_documents(index_name_1, documents1)
vector_store_manager.index_documents(index_name_2, documents2)

# Call the backend-specific check method
self.check_indexed_documents(vector_store_manager)

@abstractmethod
def check_indexed_documents(self, vector_store_manager):
"""Abstract method to check indexed documents in backend-specific format."""
pass

@patch('requests.post')
def test_query_documents(self, mock_post, vector_store_manager):
mock_response = {
"result": "This is the completion from the API"
}
mock_post.return_value.json.return_value = mock_response

documents = [
Document(text="First document", metadata={"type": "text"}),
Document(text="Second document", metadata={"type": "text"})
]
vector_store_manager.index_documents("test_index", documents)

params = {"temperature": 0.7}
query_result = vector_store_manager.query("test_index", "First", top_k=1, llm_params=params)

assert query_result is not None
assert query_result["response"] == "{'result': 'This is the completion from the API'}"
assert query_result["source_nodes"][0]["text"] == "First document"
assert query_result["source_nodes"][0]["score"] == pytest.approx(self.expected_query_score, rel=1e-6)

mock_post.assert_called_once_with(
INFERENCE_URL,
json={"prompt": "Context information is below.\n---------------------\ntype: text\n\nFirst document\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: First\nAnswer: ", "formatted": True, 'temperature': 0.7},
headers={"Authorization": f"Bearer {INFERENCE_ACCESS_SECRET}"}
)

def test_add_document(self, vector_store_manager):
documents = [Document(text="Third document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)

new_document = [Document(text="Fourth document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", new_document)

assert vector_store_manager.document_exists("test_index", new_document[0],
BaseVectorStore.generate_doc_id("Fourth document"))

def test_persist_index_1(self, vector_store_manager):
documents = [Document(text="Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)
vector_store_manager._persist("test_index")
assert os.path.exists(PERSIST_DIR)

def test_persist_index_2(self, vector_store_manager):
documents = [Document(text="Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)

documents = [Document(text="Another Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("another_test_index", documents)

vector_store_manager._persist_all()
assert os.path.exists(PERSIST_DIR)
33 changes: 33 additions & 0 deletions pkg/ragengine/services/tests/vector_store/test_chromadb_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import pytest
import os

from tempfile import TemporaryDirectory
from services.tests.vector_store.test_base_store import BaseVectorStoreTest
from services.vector_store.chromadb_store import ChromaDBVectorStoreHandler

class TestChromaDBVectorStore(BaseVectorStoreTest):
"""Test implementation for ChromaDB vector store."""

@pytest.fixture
def vector_store_manager(self, init_embed_manager):
with TemporaryDirectory() as temp_dir:
print(f"Saving temporary test storage at: {temp_dir}")
os.environ['PERSIST_DIR'] = temp_dir
manager = ChromaDBVectorStoreHandler(init_embed_manager)
manager._clear_collection_and_indexes()
yield manager

def check_indexed_documents(self, vector_store_manager):
indexed_docs = vector_store_manager.list_all_indexed_documents()
assert len(indexed_docs) == 2
assert list(indexed_docs["index1"].values())[0]["text"] == "First document in index1"
assert list(indexed_docs["index2"].values())[0]["text"] == "First document in index2"

@property
def expected_query_score(self):
"""Override this in implementation-specific test classes."""
return 0.5601649858735368
147 changes: 29 additions & 118 deletions pkg/ragengine/services/tests/vector_store/test_faiss_store.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,37 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import os
from tempfile import TemporaryDirectory
from unittest.mock import patch

import pytest
import os

from services.vector_store.base import BaseVectorStore
from tempfile import TemporaryDirectory
from services.tests.vector_store.test_base_store import BaseVectorStoreTest
from services.vector_store.faiss_store import FaissVectorStoreHandler
from services.models import Document
from services.embedding.huggingface_local import LocalHuggingFaceEmbedding
from services.config import MODEL_ID, INFERENCE_URL, INFERENCE_ACCESS_SECRET
from services.config import PERSIST_DIR

@pytest.fixture(scope='session')
def init_embed_manager():
return LocalHuggingFaceEmbedding(MODEL_ID)

@pytest.fixture
def vector_store_manager(init_embed_manager):
with TemporaryDirectory() as temp_dir:
print(f"Saving temporary test storage at: {temp_dir}")
# Mock the persistence directory
os.environ['PERSIST_DIR'] = temp_dir
yield FaissVectorStoreHandler(init_embed_manager)

def test_index_documents(vector_store_manager):
first_doc_text, second_doc_text = "First document", "Second document"
documents = [
Document(text=first_doc_text, metadata={"type": "text"}),
Document(text=second_doc_text, metadata={"type": "text"})
]

doc_ids = vector_store_manager.index_documents("test_index", documents)
class TestFaissVectorStore(BaseVectorStoreTest):
"""Test implementation for FAISS vector store."""

assert len(doc_ids) == 2
assert set(doc_ids) == {BaseVectorStore.generate_doc_id(first_doc_text),
BaseVectorStore.generate_doc_id(second_doc_text)}

def test_index_documents_isolation(vector_store_manager):
documents1 = [
Document(text="First document in index1", metadata={"type": "text"}),
]
documents2 = [
Document(text="First document in index2", metadata={"type": "text"}),
]

# Index documents in separate indices
index_name_1, index_name_2 = "index1", "index2"
vector_store_manager.index_documents(index_name_1, documents1)
vector_store_manager.index_documents(index_name_2, documents2)

assert vector_store_manager.list_all_indexed_documents() == {
'index1': {"87117028123498eb7d757b1507aa3e840c63294f94c27cb5ec83c939dedb32fd":
{'hash': '1e64a170be48c45efeaa8667ab35919106da0489ec99a11d0029f2842db133aa',
'text': 'First document in index1'}},
'index2': {"49b198c0e126a99e1975f17b564756c25b4ad691a57eda583e232fd9bee6de91":
{'hash': 'a222f875b83ce8b6eb72b3cae278b620de9bcc7c6b73222424d3ce979d1a463b',
'text': 'First document in index2'}}
}

@patch('requests.post')
def test_query_documents(mock_post, vector_store_manager):
# Define Mock Response for Custom Inference API
mock_response = {
"result": "This is the completion from the API"
}

mock_post.return_value.json.return_value = mock_response

# Add documents to index
documents = [
Document(text="First document", metadata={"type": "text"}),
Document(text="Second document", metadata={"type": "text"})
]
vector_store_manager.index_documents("test_index", documents)

params = {"temperature": 0.7}
# Mock query and results
query_result = vector_store_manager.query("test_index", "First", top_k=1, llm_params=params)

assert query_result is not None
assert query_result["response"] == "{'result': 'This is the completion from the API'}"
assert query_result["source_nodes"][0]["text"] == "First document"
assert query_result["source_nodes"][0]["score"] == pytest.approx(0.5795239210128784, rel=1e-6)

mock_post.assert_called_once_with(
INFERENCE_URL,
# Auto-Generated by LlamaIndex
json={"prompt": "Context information is below.\n---------------------\ntype: text\n\nFirst document\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: First\nAnswer: ", "formatted": True, 'temperature': 0.7},
headers={"Authorization": f"Bearer {INFERENCE_ACCESS_SECRET}"}
)

def test_add_document(vector_store_manager):
documents = [Document(text="Third document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)

# Add a document to the existing index
new_document = [Document(text="Fourth document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", new_document)

# Assert that the document exists
assert vector_store_manager.document_exists("test_index", new_document[0],
BaseVectorStore.generate_doc_id("Fourth document"))

def test_persist_index_1(vector_store_manager):
"""Test that the index store is persisted."""
# Add a document and persist the index
documents = [Document(text="Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)
vector_store_manager._persist("test_index")
assert os.path.exists(PERSIST_DIR)

def test_persist_index_2(vector_store_manager):
"""Test that an index store is persisted."""
# Add a document and persist the index
documents = [Document(text="Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("test_index", documents)

documents = [Document(text="Another Test document", metadata={"type": "text"})]
vector_store_manager.index_documents("another_test_index", documents)

vector_store_manager._persist_all()
assert os.path.exists(PERSIST_DIR)
@pytest.fixture
def vector_store_manager(self, init_embed_manager):
with TemporaryDirectory() as temp_dir:
print(f"Saving temporary test storage at: {temp_dir}")
os.environ['PERSIST_DIR'] = temp_dir
yield FaissVectorStoreHandler(init_embed_manager)

def check_indexed_documents(self, vector_store_manager):
expected_output = {
'index1': {"87117028123498eb7d757b1507aa3e840c63294f94c27cb5ec83c939dedb32fd": {
'hash': '1e64a170be48c45efeaa8667ab35919106da0489ec99a11d0029f2842db133aa',
'text': 'First document in index1'
}},
'index2': {"49b198c0e126a99e1975f17b564756c25b4ad691a57eda583e232fd9bee6de91": {
'hash': 'a222f875b83ce8b6eb72b3cae278b620de9bcc7c6b73222424d3ce979d1a463b',
'text': 'First document in index2'
}}
}
assert vector_store_manager.list_all_indexed_documents() == expected_output

@property
def expected_query_score(self):
"""Override this in implementation-specific test classes."""
return 0.5795239210128784
68 changes: 68 additions & 0 deletions pkg/ragengine/services/vector_store/chromadb_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from typing import Dict, List
from services.models import Document
import logging

import chromadb
import json
from llama_index.vector_stores.chroma import ChromaVectorStore

from .base import BaseVectorStore

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ChromaDBVectorStoreHandler(BaseVectorStore):
def __init__(self, embedding_manager):
super().__init__(embedding_manager)
self.chroma_client = chromadb.EphemeralClient()

def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
chroma_collection = self.chroma_client.create_collection(index_name)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
return self._create_index_common(index_name, documents, vector_store)

def document_exists(self, index_name: str, doc: Document, doc_id: str) -> bool:
"""ChromaDB for checking document existence."""
if index_name not in self.index_map:
logger.warning(f"No such index: '{index_name}' exists in vector store.")
return False
return doc.text in self.chroma_client.get_collection(name=index_name).get()["documents"]

def list_all_indexed_documents(self) -> Dict[str, Dict[str, Dict[str, str]]]:
indexed_docs = {} # Accumulate documents across all indexes
try:
for collection in self.chroma_client.list_collections():
collection_info = collection.get()
for doc in zip(collection_info["ids"], collection_info["documents"], collection_info["metadatas"]):
indexed_docs.setdefault(collection.name, {})[doc[0]] = {
"text": doc[1],
"metadata": json.dumps(doc[2]),
}
except Exception as e:
print(f"Failed to get all collections in the ChromaDB instance: {e}")
return indexed_docs

def _clear_collection_and_indexes(self):
"""Clears all collections and drops all indexes in the ChromaDB instance.
This method is primarily intended for testing purposes to ensure
a clean state between tests, preventing index and document conflicts.
"""
try:
# Get all collections
collections = self.chroma_client.list_collections()

# Delete each collection
for collection in collections:
collection_name = collection.name
self.chroma_client.delete_collection(name=collection_name)
print(f"Collection '{collection_name}' has been deleted.")

print("All collections in the ChromaDB instance have been deleted.")
except Exception as e:
print(f"Failed to clear collections in the ChromaDB instance: {e}")

0 comments on commit 71ddc55

Please sign in to comment.