diff --git a/kairon/api/app/routers/bot/data.py b/kairon/api/app/routers/bot/data.py index 42b79b217..36cb4b4d4 100644 --- a/kairon/api/app/routers/bot/data.py +++ b/kairon/api/app/routers/bot/data.py @@ -331,6 +331,7 @@ async def download_error_csv( async def knowledge_vault_sync( primary_key_col: str, collection_name: str, + event_type: str, data: List[dict], current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS), ): @@ -339,7 +340,7 @@ async def knowledge_vault_sync( """ data = [{key.lower(): value for key, value in row.items()} for row in data] - error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), data, current_user.get_bot()) + error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data, current_user.get_bot()) if error_summary: return Response( @@ -349,7 +350,7 @@ async def knowledge_vault_sync( error_code=400 ) - await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), data, + await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data, current_user.get_bot(), current_user.get_user()) return Response( diff --git a/kairon/shared/cognition/processor.py b/kairon/shared/cognition/processor.py index 4ac49c531..ae6182b21 100644 --- a/kairon/shared/cognition/processor.py +++ b/kairon/shared/cognition/processor.py @@ -11,7 +11,7 @@ from kairon.shared.cognition.data_objects import CognitionData, CognitionSchema, ColumnMetadata, CollectionData from kairon.shared.data.constant import DEFAULT_LLM from kairon.shared.data.processor import MongoProcessor -from kairon.shared.models import CognitionDataType, CognitionMetadataType +from kairon.shared.models import CognitionDataType, CognitionMetadataType, VaultSyncEventType class CognitionDataProcessor: @@ -428,31 +428,34 @@ def get_pydantic_type(data_type: str): else: raise ValueError(f"Unsupported data type: {data_type}") - def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict: + def validate_data(self, primary_key_col: str, collection_name: str, event_type: str, data: List[Dict], bot: str) -> Dict: """ Validates each dictionary in the data list according to the expected schema from column_dict. Args: data: List of dictionaries where each dictionary represents a row to be validated. collection_name: The name of the collection (table name). + event_type: The type of the event being validated. bot: The bot identifier. primary_key_col: The primary key column for identifying rows. Returns: Dict: Summary of validation errors, if any. """ - if not CognitionSchema.objects(collection_name=collection_name).first(): - raise AppException(f"Collection '{collection_name}' does not exist.") + self._validate_event_type(event_type) + event_validations = VaultSyncEventType[event_type].value + self._validate_collection_exists(collection_name) column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name) error_summary = {} - model_fields = { - column_name: self.get_pydantic_type(data_type) - for column_name, data_type in column_dict.items() + existing_documents = CognitionData.objects(bot=bot, collection=collection_name).as_pymongo() + existing_document_map = { + doc["data"].get(primary_key_col): doc + for doc in existing_documents + if doc["data"].get(primary_key_col) is not None # Ensure primary key exists in map } - DynamicModel = create_model('DynamicModel', **model_fields) for row in data: row_key = row.get(primary_key_col) @@ -460,34 +463,64 @@ def validate_data(self, primary_key_col: str, collection_name: str, data: List[D raise AppException(f"Primary key '{primary_key_col}' must exist in each row.") row_errors = [] - if set(row.keys()) != set(column_dict.keys()): - row_errors.append({ - "status": "Column headers mismatch", - "expected_columns": list(column_dict.keys()), - "actual_columns": list(row.keys()) - }) + + if "column_length_mismatch" in event_validations: + if len(row.keys()) != len(column_dict.keys()): + row_errors.append({ + "status": "Column length mismatch", + "expected_columns": list(column_dict.keys()), + "actual_columns": list(row.keys()) + }) + + if "invalid_columns" in event_validations: + expected_columns = list(column_dict.keys()) + if event_type == "field_update": + expected_columns = [primary_key_col + " + any from " + str([col for col in column_dict.keys() if col != primary_key_col])] + if not set(row.keys()).issubset(set(column_dict.keys())): + row_errors.append({ + "status": "Invalid columns in input data", + "expected_columns": expected_columns, + "actual_columns": list(row.keys()) + }) + + if "document_non_existence" in event_validations: + if str(row_key) not in existing_document_map: + row_errors.append({ + "status": "Document does not exist", + "primary_key": row_key, + "message": f"No document found for '{primary_key_col}': {row_key}" + }) + if row_errors: error_summary[row_key] = row_errors continue - try: - DynamicModel(**row) - except ValidationError as e: - error_details = [] - for error in e.errors(): - column_name = error['loc'][0] - input_value = row.get(column_name) - status = "Required Field is Empty" if input_value == "" else "Invalid DataType" - error_details.append({ - "column_name": column_name, - "input": input_value, - "status": status - }) - error_summary[row_key] = error_details + model_fields = {} + for column_name in row.keys(): + value = column_dict.get(column_name) + model_fields[column_name] = self.get_pydantic_type(value) + + DynamicModel = create_model('DynamicModel', **model_fields) + + if "pydantic_validation" in event_validations: + try: + DynamicModel(**row) + except ValidationError as e: + error_details = [] + for error in e.errors(): + column_name = error['loc'][0] + input_value = row.get(column_name) + status = "Required Field is Empty" if input_value == "" else "Invalid DataType" + error_details.append({ + "column_name": column_name, + "input": input_value, + "status": status + }) + error_summary[row_key] = error_details return error_summary - async def upsert_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str, user: Text): + async def upsert_data(self, primary_key_col: str, collection_name: str, event_type: str, data: List[Dict], bot: str, user: Text): """ Upserts data into the CognitionData collection. If document with the primary key exists, it will be updated. @@ -496,6 +529,7 @@ async def upsert_data(self, primary_key_col: str, collection_name: str, data: Li Args: primary_key_col: The primary key column name to check for uniqueness. collection_name: The collection name (table). + event_type: The type of the event being upserted data: List of rows of data to upsert. bot: The bot identifier associated with the data. user: The user @@ -509,24 +543,32 @@ async def upsert_data(self, primary_key_col: str, collection_name: str, data: Li if await llm_processor.__collection_exists__(qdrant_collection) is False: await llm_processor.__create_collection__(qdrant_collection) + existing_documents = CognitionData.objects(bot=bot, collection=collection_name).as_pymongo() + + existing_document_map = { + doc["data"].get(primary_key_col): doc for doc in existing_documents + } + for row in data: row = {str(key): str(value) for key, value in row.items()} primary_key_value = row.get(primary_key_col) + existing_document = existing_document_map.get(primary_key_value) + + if event_type == "field_update" and existing_document: + existing_data = existing_document.get("data", {}) + merged_data = {**existing_data, **row} + logger.debug(f"Merged row for {primary_key_col} {primary_key_value}: {merged_data}") + else: + merged_data = row + payload = { - "data": row, + "data": merged_data, "content_type": CognitionDataType.json.value, "collection": collection_name } - existing_document = CognitionData.objects( - Q(bot=bot) & - Q(collection=collection_name) & - Q(**{f"data__{primary_key_col}": str(primary_key_value)}) - ).first() if existing_document: - if not isinstance(existing_document, dict): - existing_document = existing_document.to_mongo().to_dict() row_id = str(existing_document["_id"]) self.update_cognition_data(row_id, payload, user, bot) updated_document = CognitionData.objects(id=row_id).first() @@ -570,3 +612,11 @@ async def sync_with_qdrant(self, llm_processor, collection_name, bot, document, logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.") except Exception as e: raise AppException(f"Failed to sync document with Qdrant: {str(e)}") + + def _validate_event_type(self, event_type: str): + if event_type not in VaultSyncEventType.__members__.keys(): + raise AppException("Event type does not exist") + + def _validate_collection_exists(self, collection_name: str): + if not CognitionSchema.objects(collection_name=collection_name).first(): + raise AppException(f"Collection '{collection_name}' does not exist.") diff --git a/kairon/shared/models.py b/kairon/shared/models.py index 2c38afccb..0651dd35e 100644 --- a/kairon/shared/models.py +++ b/kairon/shared/models.py @@ -116,3 +116,8 @@ class CognitionMetadataType(str, Enum): str = "str" int = "int" float = "float" + +class VaultSyncEventType(str, Enum): + push_menu = ["column_length_mismatch", "invalid_columns", "pydantic_validation"] + field_update = ["invalid_columns", "document_non_existence", "pydantic_validation"] + diff --git a/tests/integration_test/services_test.py b/tests/integration_test/services_test.py index 717c1735f..b490d210b 100644 --- a/tests/integration_test/services_test.py +++ b/tests/integration_test/services_test.py @@ -1407,7 +1407,7 @@ def test_default_values(): @mock.patch.object(LLMProcessor, "__create_collection__", autospec=True) @mock.patch.object(LLMProcessor, "__collection_upsert__", autospec=True) @mock.patch.object(litellm, "aembedding", autospec=True) -def test_knowledge_vault_sync(mock_embedding, mock_collection_exists, mock_create_collection, mock_collection_upsert): +def test_knowledge_vault_sync_push_menu(mock_embedding, mock_collection_exists, mock_create_collection, mock_collection_upsert): bot_settings = BotSettings.objects(bot=pytest.bot).get() bot_settings.content_importer_limit_per_day = 10 bot_settings.cognition_collections_limit = 10 @@ -1476,13 +1476,12 @@ def test_knowledge_vault_sync(mock_embedding, mock_collection_exists, mock_creat ] response = client.post( - url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries", + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries&event_type=push_menu", json=sync_data, headers={"Authorization": pytest.token_type + " " + pytest.access_token} ) actual= response.json() - print(actual) assert actual["success"] assert actual["message"] == "Processing completed successfully" assert actual["error_code"] == 0 @@ -1525,6 +1524,187 @@ def test_knowledge_vault_sync(mock_embedding, mock_collection_exists, mock_creat LLMSecret.objects.delete() +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(LLMProcessor, "__collection_exists__", autospec=True) +@mock.patch.object(LLMProcessor, "__create_collection__", autospec=True) +@mock.patch.object(LLMProcessor, "__collection_upsert__", autospec=True) +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync_field_update(mock_embedding, mock_collection_exists, mock_create_collection, mock_collection_upsert): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + mock_collection_exists.return_value = False + mock_create_collection.return_value = None + mock_collection_upsert.return_value = None + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + }, + ] + + for secret in secrets: + LLMSecret(**secret).save() + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/schema", + json={ + "metadata": [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ], + "collection_name": "groceries" + }, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + schema_response = response.json() + assert schema_response["message"] == "Schema saved!" + assert schema_response["error_code"] == 0 + + dummy_data_one = { + "id": "1", + "item": "Juice", + "price": "2.80", + "quantity": "56" + } + dummy_doc = CognitionData( + data=dummy_data_one, + content_type="json", + collection="groceries", + user="himanshu.gupta@digite.com", + bot=pytest.bot, + timestamp=datetime.utcnow() + ) + dummy_doc.save() + dummy_data_two = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "12" + } + dummy_doc = CognitionData( + data=dummy_data_two, + content_type="json", + collection="groceries", + user="himanshu.gupta@digite.com", + bot=pytest.bot, + timestamp=datetime.utcnow() + ) + dummy_doc.save() + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 2 + + sync_data = [ + {"id": 1, "price": "80.50"}, + {"id": 2, "price": "27.00"} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries&event_type=field_update", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual= response.json() + assert actual["success"] + assert actual["message"] == "Processing completed successfully" + assert actual["error_code"] == 0 + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 2 + + expected_data = [ + {"id": "1", "item": "Juice", "price": "80.50", "quantity": "56"}, + {"id": "2", "item": "Milk", "price": "27.00", "quantity": "12"} + ] + + for index, doc in enumerate(cognition_data): + doc_data = doc.to_mongo().to_dict()["data"] + assert doc_data == expected_data[index] + + expected_calls = [ + { + "model": "text-embedding-3-small", + "input": ['{"id":1,"item":"Juice","price":80.5,"quantity":56}'], + "metadata": {'user': 'integration@demo.ai', 'bot': pytest.bot, 'invocation': 'knowledge_vault_sync'}, + "api_key": "common_openai_key", + "num_retries": 3 + }, + { + "model": "text-embedding-3-small", + "input": ['{"id":2,"item":"Milk","price":27.0,"quantity":12}'], # Second input + "metadata": {'user': 'integration@demo.ai', 'bot': pytest.bot, 'invocation': 'knowledge_vault_sync'}, + "api_key": "common_openai_key", + "num_retries": 3 + } + ] + + for i, expected in enumerate(expected_calls): + actual_call = mock_embedding.call_args_list[i].kwargs + assert actual_call == expected + CognitionData.objects(bot=pytest.bot, collection="groceries").delete() + CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() + LLMSecret.objects.delete() + +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync_event_type_does_not_exist(mock_embedding): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + } + ] + for secret in secrets: + LLMSecret(**secret).save() + + sync_data = [ + {"id": 1, "item": "Juice", "price": 2.50, "quantity": 10}, + {"id": 2, "item": "Apples", "price": 1.20, "quantity": 20} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries&event_type=non_existent_event_type", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual = response.json() + assert not actual["success"] + assert actual["message"] == "Event type does not exist" + assert actual["error_code"] == 422 + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="nonexistent_collection") + assert cognition_data.count() == 0 + LLMSecret.objects.delete() + @pytest.mark.asyncio @responses.activate @mock.patch.object(litellm, "aembedding", autospec=True) @@ -1556,7 +1736,7 @@ def test_knowledge_vault_sync_missing_collection(mock_embedding): ] response = client.post( - url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=nonexistent_collection", + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=nonexistent_collection&event_type=push_menu", json=sync_data, headers={"Authorization": pytest.token_type + " " + pytest.access_token} ) @@ -1571,11 +1751,10 @@ def test_knowledge_vault_sync_missing_collection(mock_embedding): LLMSecret.objects.delete() - @pytest.mark.asyncio @responses.activate @mock.patch.object(litellm, "aembedding", autospec=True) -def test_knowledge_vault_sync_column_header_mismatch(mock_embedding): +def test_knowledge_vault_sync_missing_primary_key(mock_embedding): bot_settings = BotSettings.objects(bot=pytest.bot).get() bot_settings.content_importer_limit_per_day = 10 bot_settings.cognition_collections_limit = 10 @@ -1616,12 +1795,12 @@ def test_knowledge_vault_sync_column_header_mismatch(mock_embedding): assert schema_response.json()["error_code"] == 0 sync_data = [ - {"id": 1, "item": "Juice", "quantity": 10, "description": "Orange juice"}, - {"id": 2, "item": "Apples", "quantity": 20, "description": "Fresh apples"} + {"item": "Juice", "price": 2.50, "quantity": 10}, + {"item": "Apples", "price": 1.20, "quantity": 20} ] response = client.post( - url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries", + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries&event_type=push_menu", json=sync_data, headers={"Authorization": pytest.token_type + " " + pytest.access_token} ) @@ -1629,20 +1808,86 @@ def test_knowledge_vault_sync_column_header_mismatch(mock_embedding): actual = response.json() print(actual) assert not actual["success"] + assert actual["message"] == "Primary key 'id' must exist in each row." + assert actual["error_code"] == 422 + + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 0 + + CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() + LLMSecret.objects.delete() + + +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync_column_length_mismatch(mock_embedding): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + } + ] + for secret in secrets: + LLMSecret(**secret).save() + + schema_response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/schema", + json={ + "metadata": [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ], + "collection_name": "groceries" + }, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + assert schema_response.status_code == 200 + assert schema_response.json()["message"] == "Schema saved!" + assert schema_response.json()["error_code"] == 0 + + sync_data = [ + {"id": 1, "item": "Juice", "quantity": 10}, + {"id": 2, "item": "Apples", "quantity": 20} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries&event_type=push_menu", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual = response.json() + assert not actual["success"] assert actual["message"] == "Validation failed" assert actual["error_code"] == 400 - assert actual["data"] == {'1': [{'status': 'Column headers mismatch', 'expected_columns': ['id', 'item', 'price', 'quantity'], 'actual_columns': ['id', 'item', 'quantity', 'description']}], '2': [{'status': 'Column headers mismatch', 'expected_columns': ['id', 'item', 'price', 'quantity'], 'actual_columns': ['id', 'item', 'quantity', 'description']}]} - + assert actual["data"] == {'1': [{'status': 'Column length mismatch', 'expected_columns': ['id', 'item', 'price', 'quantity'], 'actual_columns': ['id', 'item', 'quantity']}], '2': [{'status': 'Column length mismatch', 'expected_columns': ['id', 'item', 'price', 'quantity'], 'actual_columns': ['id', 'item', 'quantity']}]} cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") assert cognition_data.count() == 0 CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() LLMSecret.objects.delete() + @pytest.mark.asyncio @responses.activate @mock.patch.object(litellm, "aembedding", autospec=True) -def test_knowledge_vault_sync_missing_primary_key(mock_embedding): +def test_knowledge_vault_sync_invalid_columns(mock_embedding): bot_settings = BotSettings.objects(bot=pytest.bot).get() bot_settings.content_importer_limit_per_day = 10 bot_settings.cognition_collections_limit = 10 @@ -1682,27 +1927,144 @@ def test_knowledge_vault_sync_missing_primary_key(mock_embedding): assert schema_response.json()["message"] == "Schema saved!" assert schema_response.json()["error_code"] == 0 + dummy_data = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "12" + } + dummy_doc = CognitionData( + data=dummy_data, + content_type="json", + collection="groceries", + user="himanshu.gupta@digite.com", + bot=pytest.bot, + timestamp=datetime.utcnow() + ) + dummy_doc.save() + sync_data = [ - {"item": "Juice", "price": 2.50, "quantity": 10}, - {"item": "Apples", "price": 1.20, "quantity": 20} + {"id": 2, "discount": 0.75} ] response = client.post( - url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries", + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries&event_type=field_update", json=sync_data, headers={"Authorization": pytest.token_type + " " + pytest.access_token} ) actual = response.json() - print(actual) assert not actual["success"] - assert actual["message"] == "Primary key 'id' must exist in each row." - assert actual["error_code"] == 422 + assert actual["message"] == "Validation failed" + assert actual["error_code"] == 400 + assert actual["data"] == {'2': [{'status': 'Invalid columns in input data', 'expected_columns': ["id + any from ['item', 'price', 'quantity']"], 'actual_columns': ['id', 'discount']}]} cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") - assert cognition_data.count() == 0 + assert cognition_data.count() == 1 + expected_data = [ + {"id": "2", "item": "Milk", "price": "2.80", "quantity": "12"} + ] + for index, doc in enumerate(cognition_data): + doc_data = doc.to_mongo().to_dict()["data"] + assert doc_data == expected_data[index] CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() + CognitionData.objects(bot=pytest.bot, collection="groceries").delete() + LLMSecret.objects.delete() + + +@pytest.mark.asyncio +@responses.activate +@mock.patch.object(litellm, "aembedding", autospec=True) +def test_knowledge_vault_sync_document_non_existence(mock_embedding): + bot_settings = BotSettings.objects(bot=pytest.bot).get() + bot_settings.content_importer_limit_per_day = 10 + bot_settings.cognition_collections_limit = 10 + bot_settings.llm_settings['enable_faq'] = True + bot_settings.save() + + embedding = list(np.random.random(LLMProcessor.__embedding__)) + mock_embedding.return_value = litellm.EmbeddingResponse(**{'data': [{'embedding': embedding}]}) + + secrets = [ + { + "llm_type": "openai", + "api_key": "common_openai_key", + "models": ["common_openai_model1", "common_openai_model2"], + "user": "123", + "timestamp": datetime.utcnow() + } + ] + for secret in secrets: + LLMSecret(**secret).save() + + schema_response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/schema", + json={ + "metadata": [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ], + "collection_name": "groceries" + }, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + assert schema_response.status_code == 200 + assert schema_response.json()["message"] == "Schema saved!" + assert schema_response.json()["error_code"] == 0 + + dummy_data = { + "id": "1", + "item": "Juice", + "price": "2.80", + "quantity": "5" + } + dummy_doc = CognitionData( + data=dummy_data, + content_type="json", + collection="groceries", + user="himanshu.gupta@digite.com", + bot=pytest.bot, + timestamp=datetime.utcnow() + ) + dummy_doc.save() + + sync_data = [ + {"id": "2", "price": 27.0} + ] + + response = client.post( + url=f"/api/bot/{pytest.bot}/data/cognition/sync?primary_key_col=id&collection_name=groceries&event_type=field_update", + json=sync_data, + headers={"Authorization": pytest.token_type + " " + pytest.access_token} + ) + + actual = response.json() + assert not actual["success"] + assert actual["message"] == "Validation failed" + assert actual["error_code"] == 400 + assert actual["data"] == {'2': [{'status': 'Document does not exist', 'primary_key': '2', 'message': "No document found for 'id': 2"}]} + cognition_data = CognitionData.objects(bot=pytest.bot, collection="groceries") + assert cognition_data.count() == 1 + + expected_data = [ + { + "id": "1", + "item": "Juice", + "price": "2.80", + "quantity": "5" + } + ] + + for index, doc in enumerate(cognition_data): + doc_data = doc.to_mongo().to_dict()["data"] + assert doc_data == expected_data[index] + + CognitionSchema.objects(bot=pytest.bot, collection_name="groceries").delete() + CognitionData.objects(bot=pytest.bot, collection="groceries").delete() LLMSecret.objects.delete() @responses.activate diff --git a/tests/unit_test/data_processor/data_processor_test.py b/tests/unit_test/data_processor/data_processor_test.py index 4d28aa4da..452a9746c 100644 --- a/tests/unit_test/data_processor/data_processor_test.py +++ b/tests/unit_test/data_processor/data_processor_test.py @@ -92,7 +92,7 @@ from kairon.shared.live_agent.live_agent import LiveAgentHandler from kairon.shared.metering.constants import MetricType from kairon.shared.metering.data_object import Metering -from kairon.shared.models import StoryEventType, HttpContentType, CognitionDataType +from kairon.shared.models import StoryEventType, HttpContentType, CognitionDataType, VaultSyncEventType from kairon.shared.multilingual.processor import MultilingualLogProcessor from kairon.shared.test.data_objects import ModelTestingLogs from kairon.shared.test.processor import ModelTestingLogProcessor @@ -1365,11 +1365,12 @@ def test_bot_id_change(self): bot_id = Slots.objects(bot="test_load_yml", user="testUser", influence_conversation=False, name='bot').get() assert bot_id['initial_value'] == "test_load_yml" - def test_validate_data_success(self): + def test_validate_data_push_menu_success(self): bot = 'test_bot' user = 'test_user' collection_name = 'groceries' primary_key_col = "id" + event_type = 'push_menu' metadata = [ { @@ -1418,6 +1419,7 @@ def test_validate_data_success(self): validation_summary = processor.validate_data( primary_key_col=primary_key_col, collection_name=collection_name, + event_type=event_type, data=data, bot=bot ) @@ -1425,11 +1427,124 @@ def test_validate_data_success(self): assert validation_summary == {} CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + def test_validate_data_field_update_success(self): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = "id" + event_type = "field_update" + + metadata = [ + { + "column_name": "id", + "data_type": "int", + "enable_search": True, + "create_embeddings": True + }, + { + "column_name": "item", + "data_type": "str", + "enable_search": True, + "create_embeddings": True + }, + { + "column_name": "price", + "data_type": "float", + "enable_search": True, + "create_embeddings": True + }, + { + "column_name": "quantity", + "data_type": "int", + "enable_search": True, + "create_embeddings": True + } + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + dummy_data_one = { + "id": "1", + "item": "Juice", + "price": "2.80", + "quantity": "5" + } + existing_document = CognitionData( + data=dummy_data_one, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + existing_document.save() + + dummy_data_two = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "50" + } + existing_document = CognitionData( + data=dummy_data_two, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + existing_document.save() + + data = [ + {"id": 1, "price": 2.50}, + {"id": 2, "price": 1.20} + ] + + processor = CognitionDataProcessor() + validation_summary = processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + event_type=event_type, + data=data, + bot=bot + ) + + assert validation_summary == {} + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + CognitionData.objects(bot=bot, collection="groceries").delete() + + def test_validate_data_event_type_does_not_exist(self): + bot = 'test_bot' + collection_name = 'groceries' + primary_key_col = "id" + data = [{"id": 1, "item": "Juice", "price": 2.50, "quantity": 10}] + event_type = 'non_existent_event_type' + + processor = CognitionDataProcessor() + + with pytest.raises(AppException, match=f"Event type does not exist"): + processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + event_type=event_type, + data=data, + bot=bot + ) + def test_validate_data_missing_collection(self): bot = 'test_bot' collection_name = 'nonexistent_collection' primary_key_col = "id" data = [{"id": 1, "item": "Juice", "price": 2.50, "quantity": 10}] + event_type = 'push_menu' processor = CognitionDataProcessor() @@ -1437,6 +1552,7 @@ def test_validate_data_missing_collection(self): processor.validate_data( primary_key_col=primary_key_col, collection_name=collection_name, + event_type=event_type, data=data, bot=bot ) @@ -1446,6 +1562,7 @@ def test_validate_data_missing_primary_key(self): user = 'test_user' collection_name = 'groceries' primary_key_col = "id" + event_type = 'push_menu' metadata = [ {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, @@ -1475,15 +1592,17 @@ def test_validate_data_missing_primary_key(self): primary_key_col=primary_key_col, collection_name=collection_name, data=data, + event_type=event_type, bot=bot ) CognitionSchema.objects(bot=bot, collection_name="groceries").delete() - def test_validate_data_column_header_mismatch(self): + def test_validate_data_column_length_mismatch(self): bot = 'test_bot' user = 'test_user' collection_name = 'groceries' primary_key_col = "id" + event_type = 'push_menu' metadata = [ {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, @@ -1503,34 +1622,136 @@ def test_validate_data_column_header_mismatch(self): cognition_schema.save() data = [ - {"id": "1", "item": "Juice", "quantity": 10, "discount": 0.10} + {"id": "1", "item": "Milk", "quantity": 10} ] processor = CognitionDataProcessor() validation_summary = processor.validate_data( primary_key_col=primary_key_col, collection_name=collection_name, + event_type=event_type, data=data, bot=bot ) + assert "1" in validation_summary + assert validation_summary["1"][0]["status"] == "Column length mismatch" + assert validation_summary["1"][0]["expected_columns"] == ["id", "item", "price", "quantity"] + assert validation_summary["1"][0]["actual_columns"] == ["id", "item", "quantity"] + CognitionSchema.objects(bot=bot, collection_name=collection_name).delete() + + def test_validate_data_invalid_columns(self): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = "id" + event_type = 'push_menu' + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True} + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + data = [ + {"id": "1", "item": "Juice", "quantity": 10, "discount": 0.10} # Invalid "discount" column + ] + processor = CognitionDataProcessor() + validation_summary = processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + event_type=event_type, + data=data, + bot=bot + ) assert "1" in validation_summary - assert validation_summary["1"][0]["status"] == "Column headers mismatch" + assert validation_summary["1"][0]["status"] == "Invalid columns in input data" assert validation_summary["1"][0]["expected_columns"] == ["id", "item", "price", "quantity"] assert validation_summary["1"][0]["actual_columns"] == ["id", "item", "quantity", "discount"] + CognitionSchema.objects(bot=bot, collection_name=collection_name).delete() + + def test_validate_data_document_non_existence(self): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = "id" + event_type = 'field_update' + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True} + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + dummy_data = { + "id": "1", + "item": "Juice", + "price": "2.80", + "quantity": "5" + } + existing_document = CognitionData( + data=dummy_data, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + existing_document.save() + + data = [ + {"id": "2", "price": 27.0} + ] + + processor = CognitionDataProcessor() + validation_summary = processor.validate_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + event_type=event_type, + data=data, + bot=bot + ) + assert "2" in validation_summary + assert validation_summary["2"][0]["status"] == "Document does not exist" + assert validation_summary["2"][0]["primary_key"] == "2" + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + CognitionData.objects(bot=bot, collection="groceries").delete() @pytest.mark.asyncio @patch.object(LLMProcessor, "__collection_exists__", autospec=True) @patch.object(LLMProcessor, "__create_collection__", autospec=True) @patch.object(LLMProcessor, "__collection_upsert__", autospec=True) @patch.object(litellm, "aembedding", autospec=True) - async def test_upsert_data_success(self, mock_embedding, mock_collection_upsert, mock_create_collection, + async def test_upsert_data_push_menu_success(self, mock_embedding, mock_collection_upsert, mock_create_collection, mock_collection_exists): bot = 'test_bot' user = 'test_user' collection_name = 'groceries' primary_key_col = 'id' + event_type = 'push_menu' metadata = [ {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, @@ -1592,6 +1813,7 @@ async def test_upsert_data_success(self, mock_embedding, mock_collection_upsert, result = await processor.upsert_data( primary_key_col=primary_key_col, collection_name=collection_name, + event_type=event_type, data=upsert_data, bot=bot, user=user @@ -1618,6 +1840,122 @@ async def test_upsert_data_success(self, mock_embedding, mock_collection_upsert, CognitionData.objects(bot=bot, collection="groceries").delete() LLMSecret.objects.delete() + @pytest.mark.asyncio + @patch.object(LLMProcessor, "__collection_exists__", autospec=True) + @patch.object(LLMProcessor, "__create_collection__", autospec=True) + @patch.object(LLMProcessor, "__collection_upsert__", autospec=True) + @patch.object(litellm, "aembedding", autospec=True) + async def test_upsert_data_field_update_success(self, mock_embedding, mock_collection_upsert, mock_create_collection, + mock_collection_exists): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + primary_key_col = 'id' + event_type = 'field_update' + + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, + {"column_name": "item", "data_type": "str", "enable_search": True, "create_embeddings": True}, + {"column_name": "price", "data_type": "float", "enable_search": True, "create_embeddings": True}, + {"column_name": "quantity", "data_type": "int", "enable_search": True, "create_embeddings": True}, + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + dummy_data_one = { + "id": "1", + "item": "Juice", + "price": "2.80", + "quantity": "56" + } + existing_document = CognitionData( + data=dummy_data_one, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + existing_document.save() + + dummy_data_two = { + "id": "2", + "item": "Milk", + "price": "2.80", + "quantity": "12" + } + existing_document = CognitionData( + data=dummy_data_two, + content_type="json", + collection=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + existing_document.save() + + upsert_data = [ + {"id": 1, "price": "80.50"}, + {"id": 2, "price": "27.00"} + ] + + llm_secret = LLMSecret( + llm_type="openai", + api_key="openai_key", + models=["model1", "model2"], + api_base_url="https://api.example.com", + bot=bot, + user=user + ) + llm_secret.save() + + mock_collection_exists.return_value = False + mock_create_collection.return_value = None + mock_collection_upsert.return_value = None + + embedding = list(np.random.random(1532)) + mock_embedding.return_value = {'data': [{'embedding': embedding}, {'embedding': embedding}]} + + processor = CognitionDataProcessor() + + result = await processor.upsert_data( + primary_key_col=primary_key_col, + collection_name=collection_name, + event_type=event_type, + data=upsert_data, + bot=bot, + user=user + ) + + upserted_data = list(CognitionData.objects(bot=bot, collection=collection_name)) + + assert result["message"] == "Upsert complete!" + assert len(upserted_data) == 2 + + inserted_record = next((item for item in upserted_data if item.data["id"] == "1"), None) + assert inserted_record is not None + assert inserted_record.data["item"] == "Juice" + assert inserted_record.data["price"] == "80.50" + assert inserted_record.data["quantity"] == "56" + + updated_record = next((item for item in upserted_data if item.data["id"] == "2"), None) + assert updated_record is not None + assert updated_record.data["item"] == "Milk" + assert updated_record.data["price"] == "27.00" # Updated price + assert updated_record.data["quantity"] == "12" + + CognitionSchema.objects(bot=bot, collection_name="groceries").delete() + CognitionData.objects(bot=bot, collection="groceries").delete() + LLMSecret.objects.delete() + @pytest.mark.asyncio @patch.object(LLMProcessor, "__collection_exists__", autospec=True) @patch.object(LLMProcessor, "__create_collection__", autospec=True) @@ -1629,6 +1967,7 @@ async def test_upsert_data_empty_data_list(self, mock_embedding, mock_collection user = 'test_user' collection_name = 'groceries' primary_key_col = 'id' + event_type = 'push_menu' metadata = [ {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True}, @@ -1686,6 +2025,7 @@ async def test_upsert_data_empty_data_list(self, mock_embedding, mock_collection result = await processor.upsert_data( primary_key_col=primary_key_col, collection_name=collection_name, + event_type=event_type, data=upsert_data, bot=bot, user=user @@ -1910,6 +2250,46 @@ def test_get_pydantic_type_invalid(self): with pytest.raises(ValueError, match="Unsupported data type: unknown"): CognitionDataProcessor.get_pydantic_type('unknown') + def test_validate_event_type_valid(self): + processor = CognitionDataProcessor() + valid_event_type = list(VaultSyncEventType.__members__.keys())[0] + processor._validate_event_type(valid_event_type) + + def test_validate_event_type_invalid(self): + processor = CognitionDataProcessor() + invalid_event_type = "invalid_event" + with pytest.raises(AppException, match="Event type does not exist"): + processor._validate_event_type(invalid_event_type) + + def test_validate_collection_exists_valid(self): + bot = 'test_bot' + user = 'test_user' + collection_name = 'groceries' + metadata = [ + {"column_name": "id", "data_type": "int", "enable_search": True, "create_embeddings": True} + ] + + cognition_schema = CognitionSchema( + metadata=[ColumnMetadata(**item) for item in metadata], + collection_name=collection_name, + user=user, + bot=bot, + timestamp=datetime.utcnow() + ) + cognition_schema.validate(clean=True) + cognition_schema.save() + + processor = CognitionDataProcessor() + processor._validate_collection_exists(collection_name) + + CognitionSchema.objects(collection_name=collection_name).delete() + + def test_validate_collection_exists_invalid(self): + processor = CognitionDataProcessor() + invalid_collection_name = "non_existent_collection" + with pytest.raises(AppException, match=f"Collection '{invalid_collection_name}' does not exist."): + processor._validate_collection_exists(invalid_collection_name) + def test_save_and_validate_success(self): bot = 'test_bot' user = 'test_user'