Skip to content

Commit

Permalink
ensure all local commits pushed
Browse files Browse the repository at this point in the history
  • Loading branch information
pablodanswer committed Oct 5, 2024
1 parent b62218b commit 05fd2a4
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 19 deletions.
15 changes: 9 additions & 6 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from danswer.db.search_settings import get_current_search_settings
from danswer.db.search_settings import get_secondary_search_settings
from danswer.db.swap_index import check_index_swap
from danswer.document_index.vespa.index import VespaIndex
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.utils.logger import setup_logger
Expand Down Expand Up @@ -151,11 +152,6 @@ def _mark_run_failed(
"""Main funcs"""


def cleanup_vespa_index(tenant_id: str | None) -> None:
if not MULTI_TENANT:
return


def create_indexing_jobs(
existing_jobs: dict[int, Future | SimpleJob], tenant_id: str | None
) -> None:
Expand Down Expand Up @@ -502,7 +498,14 @@ def update_loop(
f"Processing {'index attempts' if tenant_id is None else f'tenant {tenant_id}'}"
)
with get_session_with_tenant(tenant_id) as db_session:
check_index_swap(db_session=db_session)
index_to_expire = check_index_swap(db_session=db_session)

if index_to_expire and tenant_id and MULTI_TENANT:
VespaIndex.delete_entries_by_tenant_id(
tenant_id=tenant_id,
index_name=index_to_expire.index_name,
)

if not MULTI_TENANT:
search_settings = get_current_search_settings(db_session)
if search_settings.provider_type is None:
Expand Down
14 changes: 5 additions & 9 deletions backend/danswer/db/swap_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@
from danswer.db.index_attempt import (
count_unique_cc_pairs_with_successful_index_attempts,
)
from danswer.db.models import SearchSettings
from danswer.db.search_settings import get_current_search_settings
from danswer.db.search_settings import get_secondary_search_settings
from danswer.db.search_settings import update_search_settings_status
from danswer.document_index.vespa.index import VespaIndex
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.utils.logger import setup_logger
from shared_configs.configs import current_tenant_id


logger = setup_logger()


def check_index_swap(db_session: Session) -> None:
def check_index_swap(db_session: Session) -> SearchSettings | None:
"""Get count of cc-pairs and count of successful index_attempts for the
new model grouped by connector + credential, if it's the same, then assume
new index is done building. If so, swap the indices and expire the old one."""
Expand All @@ -31,7 +30,7 @@ def check_index_swap(db_session: Session) -> None:
search_settings = get_secondary_search_settings(db_session)

if not search_settings:
return
return None

unique_cc_indexings = count_unique_cc_pairs_with_successful_index_attempts(
search_settings_id=search_settings.id, db_session=db_session
Expand Down Expand Up @@ -69,8 +68,5 @@ def check_index_swap(db_session: Session) -> None:
resync_cc_pair(cc_pair, db_session=db_session)

if MULTI_TENANT:
# Delete all chunks for this tenant in the previous index
VespaIndex.expire_tenant_index(
tenant_id=current_tenant_id,
index_name=now_old_search_settings.index_name,
)
return now_old_search_settings
return None
161 changes: 157 additions & 4 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import os
import re
import time
import urllib
import zipfile
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from typing import BinaryIO
from typing import cast
from typing import List

import httpx # type: ignore
import requests # type: ignore
Expand Down Expand Up @@ -637,10 +639,6 @@ def hybrid_retrieval(

return query_vespa(params)

@classmethod
def expire_tenant_index(self, tenant_id: str, index_name: str) -> None:
pass

def admin_retrieval(
self,
query: str,
Expand Down Expand Up @@ -669,3 +667,158 @@ def admin_retrieval(
}

return query_vespa(params)

@classmethod
def delete_entries_by_tenant_id(cls, tenant_id: str, index_name: str) -> None:
"""
Deletes all entries in the specified index with the given tenant_id.
Parameters:
tenant_id (str): The tenant ID whose documents are to be deleted.
index_name (str): The name of the index from which to delete documents.
"""
logger.info(
f"Deleting entries with tenant_id: {tenant_id} from index: {index_name}"
)

# Step 1: Retrieve all document IDs with the given tenant_id
document_ids = cls._get_all_document_ids_by_tenant_id(tenant_id, index_name)

if not document_ids:
logger.info(
f"No documents found with tenant_id: {tenant_id} in index: {index_name}"
)
return

# Step 2: Delete documents in batches
delete_requests = [
_VespaDeleteRequest(document_id=doc_id, index_name=index_name)
for doc_id in document_ids
]

cls._apply_deletes_batched(delete_requests)

@classmethod
def _get_all_document_ids_by_tenant_id(
cls, tenant_id: str, index_name: str
) -> List[str]:
"""
Retrieves all document IDs with the specified tenant_id, handling pagination.
Parameters:
tenant_id (str): The tenant ID to search for.
index_name (str): The name of the index to search in.
Returns:
List[str]: A list of document IDs matching the tenant_id.
"""
offset = 0
limit = 1000 # Vespa's maximum hits per query
document_ids = []

logger.debug(
f"Starting document ID retrieval for tenant_id: {tenant_id} in index: {index_name}"
)

while True:
# Construct the query to fetch document IDs
query_params = {
"yql": f'select id from sources * where tenant_id contains "{tenant_id}";',
"offset": str(offset),
"hits": str(limit),
"timeout": "10s",
"format": "json",
"summary": "id",
}

url = f"{VESPA_APPLICATION_ENDPOINT}/search/"

logger.debug(
f"Querying for document IDs with tenant_id: {tenant_id}, offset: {offset}"
)

with httpx.Client(http2=True) as http_client:
response = http_client.get(url, params=query_params)
response.raise_for_status()

search_result = response.json()
hits = search_result.get("root", {}).get("children", [])

if not hits:
break

for hit in hits:
doc_id = hit.get("id")
if doc_id:
document_ids.append(doc_id)

offset += limit # Move to the next page

logger.debug(
f"Retrieved {len(document_ids)} document IDs for tenant_id: {tenant_id}"
)
return document_ids

@classmethod
def _apply_deletes_batched(
cls,
delete_requests: List["_VespaDeleteRequest"],
batch_size: int = BATCH_SIZE,
) -> None:
"""
Deletes documents in batches using multiple threads.
Parameters:
delete_requests (List[_VespaDeleteRequest]): The list of delete requests.
batch_size (int): The number of documents to delete in each batch.
"""

def _delete_document(
delete_request: "_VespaDeleteRequest", http_client: httpx.Client
) -> None:
logger.debug(f"Deleting document with ID {delete_request.document_id}")
response = http_client.delete(
delete_request.url,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()

logger.debug(f"Starting batch deletion for {len(delete_requests)} documents")

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
with httpx.Client(http2=True) as http_client:
for batch_start in range(0, len(delete_requests), batch_size):
batch = delete_requests[batch_start : batch_start + batch_size]

future_to_document_id = {
executor.submit(
_delete_document,
delete_request,
http_client,
): delete_request.document_id
for delete_request in batch
}

for future in concurrent.futures.as_completed(
future_to_document_id
):
doc_id = future_to_document_id[future]
try:
future.result()
logger.debug(f"Successfully deleted document: {doc_id}")
except httpx.HTTPError as e:
logger.error(f"Failed to delete document {doc_id}: {e}")
# Optionally, implement retry logic or error handling here

logger.info("Batch deletion completed")


class _VespaDeleteRequest:
def __init__(self, document_id: str, index_name: str) -> None:
self.document_id = document_id
# Encode the document ID to ensure it's safe for use in the URL
encoded_doc_id = urllib.parse.quote_plus(self.document_id)
self.url = (
f"{VESPA_APPLICATION_ENDPOINT}/document/v1/"
f"{index_name}/{index_name}/docid/{encoded_doc_id}"
)

0 comments on commit 05fd2a4

Please sign in to comment.