Skip to content

Commit

Permalink
Add optimze to get best performance
Browse files Browse the repository at this point in the history
VectorDBBench need this, so that we don't need to change
VDB everytime when server breaks some rules.

This function makes sure VDB results are reproduceable.

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Aug 22, 2023
1 parent 0a70b58 commit 2aa63e9
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
5 changes: 2 additions & 3 deletions pymilvus/client/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
# limitations under the License.
"""Base class for interceptors that operate on all RPC types."""

import collections
from typing import Any, Callable, List
from typing import Any, Callable, List, NamedTuple

import grpc

Expand Down Expand Up @@ -74,7 +73,7 @@ def intercept_stream_stream(


class _ClientCallDetails(
collections.namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")),
NamedTuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")),
grpc.ClientCallDetails,
):
pass
Expand Down
46 changes: 46 additions & 0 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import copy
import json
import time
from typing import Dict, List, Optional, Union

import pandas as pd
Expand All @@ -20,6 +21,7 @@
from pymilvus.client.types import (
CompactionPlans,
CompactionState,
LoadState,
Replica,
cmp_consistency_level,
get_consistency_level,
Expand All @@ -29,6 +31,7 @@
DataTypeNotMatchException,
ExceptionsMessage,
IndexNotExistException,
MilvusException,
PartitionAlreadyExistException,
PartitionNotExistException,
SchemaNotReadyException,
Expand Down Expand Up @@ -1360,3 +1363,46 @@ def get_replicas(self, timeout: Optional[float] = None, **kwargs) -> Replica:
def describe(self, timeout: Optional[float] = None):
conn = self._get_connection()
return conn.describe_collection(self.name, timeout=timeout)

def optimize(self, timeout: Optional[float] = None, **kwargs):
"""Optimize the server to gain the best performance.
Be careful, by default this method may hang very very long.
The collection should be INDEXED before optimize.
"""

timeout = timeout or 12 * 60 * 60 # set default timeout to 12hrs

start_time = time.time()
conn = self._get_connection()

# check if indexed
if not self.has_index():
raise MilvusException(message="Please index before calling optimize")

self.flush(timeout=timeout)
index = self.index()

def has_pending_rows() -> bool:
info = conn.get_index_build_progress(self.name, index.index_name, timeout=timeout)
return info.get("pending_index_rows", -1) > 0

while True:
if not has_pending_rows():
self.compact()
self.wait_for_compaction_completed()
if not has_pending_rows():
break

if time.time() - start_time > timeout:
raise MilvusException(message=f"Wait for optimize timeout in {timeout}s")

time.sleep(5)

# If loaded, load refresh
state = conn.get_load_state(self.name)
if state == LoadState.Loaded:
self.load(_refresh=True)
elif state == LoadState.Loading:
conn.wait_for_loading_collection(self.name, timeout=timeout)
self.load(_refresh=True)
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ ignore = [
"ARG002",
"E501", # black takes care of it
"ARG005", # [ruff] ARG005 Unused lambda argument: `disable` [E]
"TRY400",
"PYI024"
"TRY400", # TRY400 Use `logging.exception` instead of `logging.error` TODO
]

# Allow autofix for all enabled rules (when `--fix`) is provided.
Expand Down

0 comments on commit 2aa63e9

Please sign in to comment.