Skip to content

Commit

Permalink
support add_embeddings for elasticsearch (langchain-ai#11002)
Browse files Browse the repository at this point in the history
- **Description:** Provide a way to use different text for embedding.
- For example, if you are ingesting stack-overflow Q&As for RAG, you
would want to embed the questions and return the answer(s) for the hits.
With this change, the consumer of langchain can implement that easily.
- I noticed the similar function is added on faiss.py with langchain-ai#1912 which
was for performance reason, but I see the same function can be used to
achieve what I thought. So instead of changing Document class to have
embedding_content, I mimicked the implementation of faiss.py.
- The test should provide some guidance on how to use it. It would be
more intuitive if I just pass texts and embedding_texts as separate
arguments, but I chose to use `zip`-ed object for the consistency with
faiss.py implementation.
      - I plan to make similar pull request for OpenSearch.
  - **Issue:** N/A
  - **Dependencies:** None other than the existing ones.

Co-authored-by: Bagatur <[email protected]>
  • Loading branch information
kennethchoe and baskaryan authored Oct 19, 2023
1 parent 76d3afa commit 62efe1f
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 59 deletions.
167 changes: 109 additions & 58 deletions libs/langchain/langchain/vectorstores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,33 +866,17 @@ def _create_index_if_not_exists(
)
self.client.indices.create(index=index_name, **indexSettings)

def add_texts(
def __add(
self,
texts: Iterable[str],
embeddings: Optional[List[List[float]]],
metadatas: Optional[List[Dict[Any, Any]]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
refresh_indices: Whether to refresh the Elasticsearch indices
after adding the texts.
create_index_if_not_exists: Whether to create the Elasticsearch
index if it doesn't already exist.
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
- chunk_size: Optional. Number of texts to add to the
index at a time. Defaults to 500.
Returns:
List of ids from adding the texts into the vectorstore.
"""
try:
from elasticsearch.helpers import BulkIndexError, bulk
except ImportError:
Expand All @@ -901,53 +885,33 @@ def add_texts(
"Please install it with `pip install elasticsearch`."
)
bulk_kwargs = bulk_kwargs or {}
embeddings = []
ids = ids or [str(uuid.uuid4()) for _ in texts]
requests = []

if self.embedding is not None:
# If no search_type requires inference, we use the provided
# embedding function to embed the texts.
embeddings = self.embedding.embed_documents(list(texts))
dims_length = len(embeddings[0])

if create_index_if_not_exists:
self._create_index_if_not_exists(
index_name=self.index_name, dims_length=dims_length
)

for i, (text, vector) in enumerate(zip(texts, embeddings)):
metadata = metadatas[i] if metadatas else {}
if create_index_if_not_exists:
if embeddings:
dims_length = len(embeddings[0])
else:
dims_length = None

requests.append(
{
"_op_type": "index",
"_index": self.index_name,
self.query_field: text,
self.vector_query_field: vector,
"metadata": metadata,
"_id": ids[i],
}
)
self._create_index_if_not_exists(
index_name=self.index_name, dims_length=dims_length
)

else:
# the search_type doesn't require inference, so we don't need to
# embed the texts.
if create_index_if_not_exists:
self._create_index_if_not_exists(index_name=self.index_name)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}

for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
request = {
"_op_type": "index",
"_index": self.index_name,
self.query_field: text,
"metadata": metadata,
"_id": ids[i],
}
if embeddings:
request[self.vector_query_field] = embeddings[i]

requests.append(
{
"_op_type": "index",
"_index": self.index_name,
self.query_field: text,
"metadata": metadata,
"_id": ids[i],
}
)
requests.append(request)

if len(requests) > 0:
try:
Expand All @@ -974,6 +938,93 @@ def add_texts(
logger.debug("No texts to add to index")
return []

def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[Any, Any]]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
refresh_indices: Whether to refresh the Elasticsearch indices
after adding the texts.
create_index_if_not_exists: Whether to create the Elasticsearch
index if it doesn't already exist.
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
- chunk_size: Optional. Number of texts to add to the
index at a time. Defaults to 500.
Returns:
List of ids from adding the texts into the vectorstore.
"""
if self.embedding is not None:
# If no search_type requires inference, we use the provided
# embedding function to embed the texts.
embeddings = self.embedding.embed_documents(list(texts))
else:
# the search_type doesn't require inference, so we don't need to
# embed the texts.
embeddings = None

return self.__add(
texts,
embeddings,
metadatas=metadatas,
ids=ids,
refresh_indices=refresh_indices,
create_index_if_not_exists=create_index_if_not_exists,
bulk_kwargs=bulk_kwargs,
kwargs=kwargs,
)

def add_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""Add the given texts and embeddings to the vectorstore.
Args:
text_embeddings: Iterable pairs of string and embedding to
add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of unique IDs.
refresh_indices: Whether to refresh the Elasticsearch indices
after adding the texts.
create_index_if_not_exists: Whether to create the Elasticsearch
index if it doesn't already exist.
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
- chunk_size: Optional. Number of texts to add to the
index at a time. Defaults to 500.
Returns:
List of ids from adding the texts into the vectorstore.
"""
texts, embeddings = zip(*text_embeddings)
return self.__add(
list(texts),
list(embeddings),
metadatas=metadatas,
ids=ids,
refresh_indices=refresh_indices,
create_index_if_not_exists=create_index_if_not_exists,
bulk_kwargs=bulk_kwargs,
kwargs=kwargs,
)

@classmethod
def from_texts(
cls,
Expand Down
2 changes: 1 addition & 1 deletion libs/langchain/langchain/vectorstores/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def add_embeddings(
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
"""Add the given texts and embeddings to the vectorstore.
Args:
text_embeddings: Iterable pairs of string and embedding to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,34 @@ async def test_similarity_search_without_metadat_async(
output = await docsearch.asimilarity_search("foo", k=1)
assert output == [Document(page_content="foo")]

def test_add_embeddings(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""
Test add_embeddings, which accepts pre-built embeddings instead of
using inference for the texts.
This allows you to separate the embeddings text and the page_content
for better proximity between user's question and embedded text.
For example, your embedding text can be a question, whereas page_content
is the answer.
"""
embeddings = ConsistentFakeEmbeddings()
text_input = ["foo1", "foo2", "foo3"]
metadatas = [{"page": i} for i in range(len(text_input))]

"""In real use case, embedding_input can be questions for each text"""
embedding_input = ["foo2", "foo3", "foo1"]
embedding_vectors = embeddings.embed_documents(embedding_input)

docsearch = ElasticsearchStore._create_cls_from_kwargs(
embeddings,
**elasticsearch_connection,
index_name=index_name,
)
docsearch.add_embeddings(list(zip(text_input, embedding_vectors)), metadatas)
output = docsearch.similarity_search("foo1", k=1)
assert output == [Document(page_content="foo3", metadata={"page": 2})]

def test_similarity_search_with_metadata(
self, elasticsearch_connection: dict, index_name: str
) -> None:
Expand Down

0 comments on commit 62efe1f

Please sign in to comment.