diff --git a/libs/milvus/langchain_milvus/vectorstores/milvus.py b/libs/milvus/langchain_milvus/vectorstores/milvus.py index 279f963..684178a 100644 --- a/libs/milvus/langchain_milvus/vectorstores/milvus.py +++ b/libs/milvus/langchain_milvus/vectorstores/milvus.py @@ -2,7 +2,6 @@ import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union -from uuid import uuid4 import numpy as np from langchain_core.documents import Document @@ -13,8 +12,25 @@ logger = logging.getLogger(__name__) +# - If you only need a local vector database for small scale data or prototyping, +# setting the uri as a local file, e.g.`./milvus.db`, is the most convenient method, +# as it automatically utilizes [Milvus Lite](https://milvus.io/docs/milvus_lite.md) +# to store all data in this file. +# +# - If you have large scale of data, say more than a million vectors, you can set up a +# more performant Milvus server on [Docker or Kubernetes](https://milvus.io/docs/quickstart.md). +# In this setup, please use the server address and port as your uri, e.g.`http://localhost:19530`. +# If you enable the authentication feature on Milvus, use +# ":" as the token, otherwise don't set the token. +# +# - If you use [Zilliz Cloud](https://zilliz.com/cloud), the fully managed cloud service +# for Milvus, adjust the `uri` and `token`, which correspond to the +# [Public Endpoint and API key](https://docs.zilliz.com/docs/on-zilliz-cloud-console#cluster-details) +# in Zilliz Cloud. + DEFAULT_MILVUS_CONNECTION = { "uri": "http://localhost:19530", + # "token": "", } Matrix = Union[List[List[float]], List[np.ndarray], np.ndarray] @@ -246,7 +262,7 @@ def __init__( ): """Initialize the Milvus vector store.""" try: - from pymilvus import Collection, utility + from pymilvus import Collection, MilvusClient, utility except ImportError: raise ValueError( "Could not import pymilvus python package. " @@ -323,7 +339,11 @@ def __init__( # Create the connection to the server if connection_args is None: connection_args = DEFAULT_MILVUS_CONNECTION - self.alias = self._create_connection_alias(connection_args) + self._milvus_client = MilvusClient( + **connection_args, + ) + self.alias = self.client._using + self.col: Optional[Collection] = None # Grab the existing collection if it exists @@ -350,64 +370,10 @@ def __init__( def embeddings(self) -> Union[Embeddings, BaseSparseEmbedding]: # type: ignore return self.embedding_func - def _create_connection_alias(self, connection_args: dict) -> str: - """Create the connection to the Milvus server.""" - from pymilvus import MilvusException, connections - - # Grab the connection arguments that are used for checking existing connection - host: str = connection_args.get("host", None) - port: Union[str, int] = connection_args.get("port", None) - address: str = connection_args.get("address", None) - uri: str = connection_args.get("uri", None) - user = connection_args.get("user", None) - db_name = connection_args.get("db_name", "default") - - # Order of use is host/port, uri, address - if host is not None and port is not None: - given_address = str(host) + ":" + str(port) - elif uri is not None: - if uri.startswith("https://"): - given_address = uri.split("https://")[1] - elif uri.startswith("http://"): - given_address = uri.split("http://")[1] - else: - given_address = uri # Milvus lite - elif address is not None: - given_address = address - else: - given_address = None - logger.debug("Missing standard address type for reuse attempt") - - # User defaults to empty string when getting connection info - if user is not None: - tmp_user = user - else: - tmp_user = "" - - # If a valid address was given, then check if a connection exists - if given_address is not None: - for con in connections.list_connections(): - addr = connections.get_connection_addr(con[0]) - if ( - con[1] - and ("address" in addr) - and (addr["address"] == given_address) - and ("user" in addr) - and (addr["user"] == tmp_user) - and (addr.get("db_name", "default") == db_name) - ): - logger.debug("Using previous connection: %s", con[0]) - return con[0] - - # Generate a new connection if one doesn't exist - alias = uuid4().hex - try: - connections.connect(alias=alias, **connection_args) - logger.debug("Created new connection using: %s", alias) - return alias - except MilvusException as e: - logger.error("Failed to create new connection using: %s", alias) - raise e + @property + def client(self) -> Any: + """Get client.""" + return self._milvus_client @property def _is_sparse_embedding(self) -> bool: