Skip to content

Commit

Permalink
[Elasticsearch Document Store] improve error handling in `write_docum…
Browse files Browse the repository at this point in the history
…ents` (#59)

* improve error handling + tests

* improve logic to support skip

* Update document_stores/elasticsearch/src/elasticsearch_haystack/document_store.py

Co-authored-by: Silvano Cerza <[email protected]>

---------

Co-authored-by: Silvano Cerza <[email protected]>
  • Loading branch information
anakin87 and silvanocerza authored Nov 21, 2023
1 parent 6727367 commit c5da601
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from haystack.preview import default_from_dict, default_to_dict
from haystack.preview.dataclasses import Document
from haystack.preview.document_stores.decorator import document_store
from haystack.preview.document_stores.errors import DuplicateDocumentError
from haystack.preview.document_stores.errors import DocumentStoreError, DuplicateDocumentError
from haystack.preview.document_stores.protocols import DuplicatePolicy

from elasticsearch_haystack.filters import _normalize_filters
Expand Down Expand Up @@ -214,7 +214,10 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
- skip: keep the existing document and ignore the new one.
- overwrite: remove the old document and write the new one.
- fail: an error is raised
:raises ValueError: if 'documents' parameter is not a list of Document objects
:raises DuplicateDocumentError: Exception trigger on duplicate document if `policy=DuplicatePolicy.FAIL`
:raises DocumentStoreError: Exception trigger on any other error when writing documents
:return: None
"""
if len(documents) > 0:
Expand All @@ -237,16 +240,27 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
index=self._index,
raise_on_error=False,
)
if errors and policy == DuplicatePolicy.FAIL:
# TODO: Handle errors in a better way, we're assuming that all errors
# are related to duplicate documents but that could be very well be wrong.

# mypy complains that `errors`` could be either `int` or a `list` of `dict`s.
# Since the type depends on the parameters passed to `helpers.bulk()`` we know
# for sure that it will be a `list`.
ids = ", ".join(e["create"]["_id"] for e in errors) # type: ignore[union-attr]
msg = f"IDs '{ids}' already exist in the document store."
raise DuplicateDocumentError(msg)

if errors:
duplicate_errors_ids = []
other_errors = []
for e in errors:
error_type = e["create"]["error"]["type"]
if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception":
duplicate_errors_ids.append(e["create"]["_id"])
elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception":
# when the policy is skip, duplication errors are OK and we should not raise an exception
continue
else:
other_errors.append(e)

if len(duplicate_errors_ids) > 0:
msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store."
raise DuplicateDocumentError(msg)

if len(other_errors) > 0:
msg = f"Failed to write documents to Elasticsearch. Errors:\n{other_errors}"
raise DocumentStoreError(msg)

def _deserialize_document(self, hit: Dict[str, Any]) -> Document:
"""
Expand Down
14 changes: 13 additions & 1 deletion document_stores/elasticsearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pytest
from elasticsearch.exceptions import BadRequestError # type: ignore[import-not-found]
from haystack.preview.dataclasses.document import Document
from haystack.preview.document_stores.errors import DuplicateDocumentError
from haystack.preview.document_stores.errors import DocumentStoreError, DuplicateDocumentError
from haystack.preview.document_stores.protocols import DuplicatePolicy
from haystack.preview.testing.document_store import DocumentStoreBaseTests

Expand Down Expand Up @@ -335,3 +335,15 @@ def test_embedding_retrieval_query_documents_different_embedding_sizes(self, doc

with pytest.raises(BadRequestError):
docstore._embedding_retrieval(query_embedding=[0.1, 0.1])

def test_write_documents_different_embedding_sizes_fail(self, docstore: ElasticsearchDocumentStore):
"""
Test that write_documents fails if the documents have different embedding sizes.
"""
docs = [
Document(content="Hello world", embedding=[0.1, 0.2, 0.3, 0.4]),
Document(content="Hello world", embedding=[0.1, 0.2]),
]

with pytest.raises(DocumentStoreError):
docstore.write_documents(docs)

0 comments on commit c5da601

Please sign in to comment.