Skip to content

Commit

Permalink
Extended knowledge vault sync functionality to accomodate different e…
Browse files Browse the repository at this point in the history
…vents (#1606)

* Extended knowledge vault sync functionality to accomodate different events

* Reduced code complexity and added corresponding test cases
  • Loading branch information
himanshugt16 authored Nov 26, 2024
1 parent 2338dde commit 9a425e0
Show file tree
Hide file tree
Showing 5 changed files with 862 additions and 64 deletions.
5 changes: 3 additions & 2 deletions kairon/api/app/routers/bot/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ async def download_error_csv(
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
event_type: str,
data: List[dict],
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
):
Expand All @@ -339,7 +340,7 @@ async def knowledge_vault_sync(
"""
data = [{key.lower(): value for key, value in row.items()} for row in data]

error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), data, current_user.get_bot())
error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data, current_user.get_bot())

if error_summary:
return Response(
Expand All @@ -349,7 +350,7 @@ async def knowledge_vault_sync(
error_code=400
)

await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), data,
await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data,
current_user.get_bot(), current_user.get_user())

return Response(
Expand Down
124 changes: 87 additions & 37 deletions kairon/shared/cognition/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kairon.shared.cognition.data_objects import CognitionData, CognitionSchema, ColumnMetadata, CollectionData
from kairon.shared.data.constant import DEFAULT_LLM
from kairon.shared.data.processor import MongoProcessor
from kairon.shared.models import CognitionDataType, CognitionMetadataType
from kairon.shared.models import CognitionDataType, CognitionMetadataType, VaultSyncEventType


class CognitionDataProcessor:
Expand Down Expand Up @@ -428,66 +428,99 @@ def get_pydantic_type(data_type: str):
else:
raise ValueError(f"Unsupported data type: {data_type}")

def validate_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str) -> Dict:
def validate_data(self, primary_key_col: str, collection_name: str, event_type: str, data: List[Dict], bot: str) -> Dict:
"""
Validates each dictionary in the data list according to the expected schema from column_dict.
Args:
data: List of dictionaries where each dictionary represents a row to be validated.
collection_name: The name of the collection (table name).
event_type: The type of the event being validated.
bot: The bot identifier.
primary_key_col: The primary key column for identifying rows.
Returns:
Dict: Summary of validation errors, if any.
"""
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")
self._validate_event_type(event_type)
event_validations = VaultSyncEventType[event_type].value

self._validate_collection_exists(collection_name)
column_dict = MongoProcessor().get_column_datatype_dict(bot, collection_name)

error_summary = {}

model_fields = {
column_name: self.get_pydantic_type(data_type)
for column_name, data_type in column_dict.items()
existing_documents = CognitionData.objects(bot=bot, collection=collection_name).as_pymongo()
existing_document_map = {
doc["data"].get(primary_key_col): doc
for doc in existing_documents
if doc["data"].get(primary_key_col) is not None # Ensure primary key exists in map
}
DynamicModel = create_model('DynamicModel', **model_fields)

for row in data:
row_key = row.get(primary_key_col)
if not row_key:
raise AppException(f"Primary key '{primary_key_col}' must exist in each row.")

row_errors = []
if set(row.keys()) != set(column_dict.keys()):
row_errors.append({
"status": "Column headers mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})

if "column_length_mismatch" in event_validations:
if len(row.keys()) != len(column_dict.keys()):
row_errors.append({
"status": "Column length mismatch",
"expected_columns": list(column_dict.keys()),
"actual_columns": list(row.keys())
})

if "invalid_columns" in event_validations:
expected_columns = list(column_dict.keys())
if event_type == "field_update":
expected_columns = [primary_key_col + " + any from " + str([col for col in column_dict.keys() if col != primary_key_col])]
if not set(row.keys()).issubset(set(column_dict.keys())):
row_errors.append({
"status": "Invalid columns in input data",
"expected_columns": expected_columns,
"actual_columns": list(row.keys())
})

if "document_non_existence" in event_validations:
if str(row_key) not in existing_document_map:
row_errors.append({
"status": "Document does not exist",
"primary_key": row_key,
"message": f"No document found for '{primary_key_col}': {row_key}"
})

if row_errors:
error_summary[row_key] = row_errors
continue

try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details
model_fields = {}
for column_name in row.keys():
value = column_dict.get(column_name)
model_fields[column_name] = self.get_pydantic_type(value)

DynamicModel = create_model('DynamicModel', **model_fields)

if "pydantic_validation" in event_validations:
try:
DynamicModel(**row)
except ValidationError as e:
error_details = []
for error in e.errors():
column_name = error['loc'][0]
input_value = row.get(column_name)
status = "Required Field is Empty" if input_value == "" else "Invalid DataType"
error_details.append({
"column_name": column_name,
"input": input_value,
"status": status
})
error_summary[row_key] = error_details

return error_summary

async def upsert_data(self, primary_key_col: str, collection_name: str, data: List[Dict], bot: str, user: Text):
async def upsert_data(self, primary_key_col: str, collection_name: str, event_type: str, data: List[Dict], bot: str, user: Text):
"""
Upserts data into the CognitionData collection.
If document with the primary key exists, it will be updated.
Expand All @@ -496,6 +529,7 @@ async def upsert_data(self, primary_key_col: str, collection_name: str, data: Li
Args:
primary_key_col: The primary key column name to check for uniqueness.
collection_name: The collection name (table).
event_type: The type of the event being upserted
data: List of rows of data to upsert.
bot: The bot identifier associated with the data.
user: The user
Expand All @@ -509,24 +543,32 @@ async def upsert_data(self, primary_key_col: str, collection_name: str, data: Li
if await llm_processor.__collection_exists__(qdrant_collection) is False:
await llm_processor.__create_collection__(qdrant_collection)

existing_documents = CognitionData.objects(bot=bot, collection=collection_name).as_pymongo()

existing_document_map = {
doc["data"].get(primary_key_col): doc for doc in existing_documents
}

for row in data:
row = {str(key): str(value) for key, value in row.items()}
primary_key_value = row.get(primary_key_col)

existing_document = existing_document_map.get(primary_key_value)

if event_type == "field_update" and existing_document:
existing_data = existing_document.get("data", {})
merged_data = {**existing_data, **row}
logger.debug(f"Merged row for {primary_key_col} {primary_key_value}: {merged_data}")
else:
merged_data = row

payload = {
"data": row,
"data": merged_data,
"content_type": CognitionDataType.json.value,
"collection": collection_name
}
existing_document = CognitionData.objects(
Q(bot=bot) &
Q(collection=collection_name) &
Q(**{f"data__{primary_key_col}": str(primary_key_value)})
).first()

if existing_document:
if not isinstance(existing_document, dict):
existing_document = existing_document.to_mongo().to_dict()
row_id = str(existing_document["_id"])
self.update_cognition_data(row_id, payload, user, bot)
updated_document = CognitionData.objects(id=row_id).first()
Expand Down Expand Up @@ -570,3 +612,11 @@ async def sync_with_qdrant(self, llm_processor, collection_name, bot, document,
logger.info(f"Row with {primary_key_col}: {document['data'].get(primary_key_col)} upserted in Qdrant.")
except Exception as e:
raise AppException(f"Failed to sync document with Qdrant: {str(e)}")

def _validate_event_type(self, event_type: str):
if event_type not in VaultSyncEventType.__members__.keys():
raise AppException("Event type does not exist")

def _validate_collection_exists(self, collection_name: str):
if not CognitionSchema.objects(collection_name=collection_name).first():
raise AppException(f"Collection '{collection_name}' does not exist.")
5 changes: 5 additions & 0 deletions kairon/shared/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ class CognitionMetadataType(str, Enum):
str = "str"
int = "int"
float = "float"

class VaultSyncEventType(str, Enum):
push_menu = ["column_length_mismatch", "invalid_columns", "pydantic_validation"]
field_update = ["invalid_columns", "document_non_existence", "pydantic_validation"]

Loading

0 comments on commit 9a425e0

Please sign in to comment.