Skip to content

Commit

Permalink
Add support for multiple indexing workers (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
Weves authored Aug 23, 2023
1 parent 3ea2052 commit e307275
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 94 deletions.
309 changes: 224 additions & 85 deletions backend/danswer/background/update.py

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@
CONTINUE_ON_CONNECTOR_FAILURE = os.environ.get(
"CONTINUE_ON_CONNECTOR_FAILURE", ""
).lower() not in ["false", ""]
# Controls how many worker processes we spin up to index documents in the
# background. This is useful for speeding up indexing, but does require a
# fairly large amount of memory in order to increase substantially, since
# each worker loads the embedding models into memory.
NUM_INDEXING_WORKERS = int(os.environ.get("NUM_INDEXING_WORKERS") or 1)


#####
Expand Down
6 changes: 3 additions & 3 deletions backend/danswer/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ def get_last_successful_attempt_time(


def update_connector_credential_pair(
db_session: Session,
connector_id: int,
credential_id: int,
attempt_status: IndexingStatus,
net_docs: int | None,
run_dt: datetime | None,
db_session: Session,
net_docs: int | None = None,
run_dt: datetime | None = None,
) -> None:
cc_pair = get_connector_credential_pair(connector_id, credential_id, db_session)
if not cc_pair:
Expand Down
7 changes: 7 additions & 0 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
logger = setup_logger()


def get_index_attempt(
db_session: Session, index_attempt_id: int
) -> IndexAttempt | None:
stmt = select(IndexAttempt).where(IndexAttempt.id == index_attempt_id)
return db_session.scalars(stmt).first()


def create_index_attempt(
connector_id: int,
credential_id: int,
Expand Down
15 changes: 13 additions & 2 deletions backend/danswer/listeners/slack_listener.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from collections.abc import Callable
from functools import wraps
Expand Down Expand Up @@ -183,7 +184,12 @@ def process_slack_event(client: SocketModeClient, req: SocketModeRequest) -> Non
# TODO: message should be enqueued and processed elsewhere,
# but doing it here for now for simplicity

@retry(tries=DANSWER_BOT_NUM_RETRIES, delay=0.25, backoff=2, logger=logger)
@retry(
tries=DANSWER_BOT_NUM_RETRIES,
delay=0.25,
backoff=2,
logger=cast(logging.Logger, logger),
)
def _get_answer(question: QuestionRequest) -> QAResponse:
answer = answer_question(
question=question,
Expand Down Expand Up @@ -227,7 +233,12 @@ def _get_answer(question: QuestionRequest) -> QAResponse:
else:
text = f"{answer.answer}\n\n*Warning*: no sources were quoted for this answer, so it may be unreliable 😔\n\n{top_documents_str_with_header}"

@retry(tries=DANSWER_BOT_NUM_RETRIES, delay=0.25, backoff=2, logger=logger)
@retry(
tries=DANSWER_BOT_NUM_RETRIES,
delay=0.25,
backoff=2,
logger=cast(logging.Logger, logger),
)
def _respond_in_thread(
channel: str,
text: str,
Expand Down
42 changes: 38 additions & 4 deletions backend/danswer/utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
import logging
from logging import Logger
from collections.abc import MutableMapping
from typing import Any

from danswer.configs.app_configs import LOG_LEVEL


class IndexAttemptSingleton:
"""Used to tell if this process is an indexing job, and if so what is the
unique identifier for this indexing attempt. For things like the API server,
main background job (scheduler), etc. this will not be used."""

_INDEX_ATTEMPT_ID: None | int = None

@classmethod
def get_index_attempt_id(cls) -> None | int:
return cls._INDEX_ATTEMPT_ID

@classmethod
def set_index_attempt_id(cls, index_attempt_id: int) -> None:
cls._INDEX_ATTEMPT_ID = index_attempt_id


def get_log_level_from_str(log_level_str: str = LOG_LEVEL) -> int:
log_level_dict = {
"CRITICAL": logging.CRITICAL,
Expand All @@ -17,14 +34,31 @@ def get_log_level_from_str(log_level_str: str = LOG_LEVEL) -> int:
return log_level_dict.get(log_level_str.upper(), logging.INFO)


class _IndexAttemptLoggingAdapter(logging.LoggerAdapter):
"""This is used to globally add the index attempt id to all log messages
during indexing by workers. This is done so that the logs can be filtered
by index attempt ID to get a better idea of what happened during a specific
indexing attempt. If the index attempt ID is not set, then this adapter
is a no-op."""

def process(
self, msg: str, kwargs: MutableMapping[str, Any]
) -> tuple[str, MutableMapping[str, Any]]:
attempt_id = IndexAttemptSingleton.get_index_attempt_id()
if attempt_id is None:
return msg, kwargs

return f"[Attempt ID: {attempt_id}] {msg}", kwargs


def setup_logger(
name: str = __name__, log_level: int = get_log_level_from_str()
) -> Logger:
) -> logging.LoggerAdapter:
logger = logging.getLogger(name)

# If the logger already has handlers, assume it was already configured and return it.
if logger.handlers:
return logger
return _IndexAttemptLoggingAdapter(logger)

logger.setLevel(log_level)

Expand All @@ -39,4 +73,4 @@ def setup_logger(

logger.addHandler(handler)

return logger
return _IndexAttemptLoggingAdapter(logger)
2 changes: 2 additions & 0 deletions backend/requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ alembic==1.10.4
asyncpg==0.27.0
atlassian-python-api==3.37.0
beautifulsoup4==4.12.0
dask==2023.8.1
distributed==2023.8.1
python-dateutil==2.8.2
fastapi==0.95.0
fastapi-users==11.0.0
Expand Down
1 change: 1 addition & 0 deletions deployment/docker_compose/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ services:
- API_VERSION_OPENAI=${API_VERSION_OPENAI:-}
- AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-}
- CONTINUE_ON_CONNECTOR_FAILURE=${CONTINUE_ON_CONNECTOR_FAILURE:-}
- NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-}
- DANSWER_BOT_SLACK_APP_TOKEN=${DANSWER_BOT_SLACK_APP_TOKEN:-}
- DANSWER_BOT_SLACK_BOT_TOKEN=${DANSWER_BOT_SLACK_BOT_TOKEN:-}
- LOG_LEVEL=${LOG_LEVEL:-info}
Expand Down

1 comment on commit e307275

@vercel
Copy link

@vercel vercel bot commented on e307275 Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.