Skip to content

Commit

Permalink
better logging for actions being taken inside document_by_cc_pair_cle…
Browse files Browse the repository at this point in the history
…anup (#2713)
  • Loading branch information
rkuo-danswer authored Oct 7, 2024
1 parent 6fa8fab commit 79d3715
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
18 changes: 14 additions & 4 deletions backend/danswer/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ def document_by_cc_pair_cleanup_task(
connector / credential pair from the access list
(6) delete all relevant entries from postgres
"""
task_logger.info(f"document_id={document_id}")

try:
with Session(get_sqlalchemy_engine()) as db_session:
action = "skip"
chunks_affected = 0

curr_ind_name, sec_ind_name = get_both_index_names(db_session)
document_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
Expand All @@ -56,12 +57,16 @@ def document_by_cc_pair_cleanup_task(
if count == 1:
# count == 1 means this is the only remaining cc_pair reference to the doc
# delete it from vespa and the db
document_index.delete(doc_ids=[document_id])
action = "delete"

chunks_affected = document_index.delete_single(document_id)
delete_documents_complete__no_commit(
db_session=db_session,
document_ids=[document_id],
)
elif count > 1:
action = "update"

# count > 1 means the document still has cc_pair references
doc = get_document(document_id, db_session)
if not doc:
Expand All @@ -84,7 +89,9 @@ def document_by_cc_pair_cleanup_task(
)

# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
document_index.update_single(document_id, fields=fields)
chunks_affected = document_index.update_single(
document_id, fields=fields
)

# there are still other cc_pair references to the doc, so just resync to Vespa
delete_document_by_connector_credential_pair__no_commit(
Expand All @@ -100,6 +107,9 @@ def document_by_cc_pair_cleanup_task(
else:
pass

task_logger.info(
f"document_id={document_id} refcount={count} action={action} chunks={chunks_affected}"
)
db_session.commit()
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc_id={document_id}")
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class Deletable(abc.ABC):
"""

@abc.abstractmethod
def delete_single(self, doc_id: str) -> None:
def delete_single(self, doc_id: str) -> int:
"""
Given a single document id, hard delete it from the document index
Expand Down Expand Up @@ -203,7 +203,7 @@ class Updatable(abc.ABC):
"""

@abc.abstractmethod
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None:
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
"""
Updates all chunks for a document with the specified fields.
None values mean that the field does not need an update.
Expand Down
20 changes: 12 additions & 8 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,14 @@ def update(self, update_requests: list[UpdateRequest]) -> None:
time.monotonic() - update_start,
)

def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None:
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
"""Note: if the document id does not exist, the update will be a no-op and the
function will complete with no errors or exceptions.
Handle other exceptions if you wish to implement retry behavior
"""

total_chunks_updated = 0

# Handle Vespa character limitations
# Mutating update_request but it's not used later anyway
normalized_doc_id = replace_invalid_doc_id_characters(doc_id)
Expand All @@ -411,7 +413,7 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None:

if not update_dict["fields"]:
logger.error("Update request received but nothing to update")
return
return 0

index_names = [self.index_name]
if self.secondary_index_name:
Expand All @@ -426,7 +428,6 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None:
}
)

total_chunks_updated = 0
while True:
try:
resp = http_client.put(
Expand Down Expand Up @@ -462,9 +463,10 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None:
f"VespaIndex.update_single: "
f"index={index_name} "
f"doc={normalized_doc_id} "
f"chunks_deleted={total_chunks_updated}"
f"chunks_updated={total_chunks_updated}"
)
return

return total_chunks_updated

def delete(self, doc_ids: list[str]) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
Expand All @@ -484,10 +486,12 @@ def delete(self, doc_ids: list[str]) -> None:
)
return

def delete_single(self, doc_id: str) -> None:
def delete_single(self, doc_id: str) -> int:
"""Possibly faster overall than the delete method due to using a single
delete call with a selection query."""

total_chunks_deleted = 0

# Vespa deletion is poorly documented ... luckily we found this
# https://docs.vespa.ai/en/operations/batch-delete.html#example

Expand All @@ -508,7 +512,6 @@ def delete_single(self, doc_id: str) -> None:
}
)

total_chunks_deleted = 0
while True:
try:
resp = http_client.delete(
Expand Down Expand Up @@ -543,7 +546,8 @@ def delete_single(self, doc_id: str) -> None:
f"doc={doc_id} "
f"chunks_deleted={total_chunks_deleted}"
)
return

return total_chunks_deleted

def id_based_retrieval(
self,
Expand Down

0 comments on commit 79d3715

Please sign in to comment.