From ad0eb646868d33519a11755cdd0a53152bdca04b Mon Sep 17 00:00:00 2001 From: Max Isom Date: Mon, 29 Jul 2024 12:45:26 -0700 Subject: [PATCH] Review comments --- chromadb/db/mixins/embeddings_queue.py | 2 +- chromadb/ingest/impl/utils.py | 3 ++- chromadb/segment/impl/vector/local_hnsw.py | 2 +- chromadb/test/property/invariants.py | 13 ++++++++++--- .../test/property/test_cross_version_persist.py | 1 + 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/chromadb/db/mixins/embeddings_queue.py b/chromadb/db/mixins/embeddings_queue.py index dd820f892705..20ff95f47fcd 100644 --- a/chromadb/db/mixins/embeddings_queue.py +++ b/chromadb/db/mixins/embeddings_queue.py @@ -146,7 +146,7 @@ def purge_log(self, collection_id: UUID) -> None: if results: min_seq_id = min(self.decode_seq_id(row[0]) for row in results) else: - min_seq_id = -1 + return t = Table("embeddings_queue") q = ( diff --git a/chromadb/ingest/impl/utils.py b/chromadb/ingest/impl/utils.py index b6bc9227c842..53f1738c0ed5 100644 --- a/chromadb/ingest/impl/utils.py +++ b/chromadb/ingest/impl/utils.py @@ -33,7 +33,8 @@ def trigger_vector_segments_max_seq_id_migration( """ SELECT collection FROM "segments" - WHERE "id" NOT IN (SELECT "segment_id" FROM "max_seq_id") + WHERE "id" NOT IN (SELECT "segment_id" FROM "max_seq_id") AND + "type" = 'urn:chroma:segment/vector/hnsw-local-persisted' """ ) collection_ids_with_unmigrated_segments = [row[0] for row in cur.fetchall()] diff --git a/chromadb/segment/impl/vector/local_hnsw.py b/chromadb/segment/impl/vector/local_hnsw.py index 2eaafca74e3e..af282536126d 100644 --- a/chromadb/segment/impl/vector/local_hnsw.py +++ b/chromadb/segment/impl/vector/local_hnsw.py @@ -278,7 +278,7 @@ def _apply_batch(self, batch: Batch) -> None: self._total_elements_added += batch.add_count # If that succeeds, finally the seq ID - self._max_seq_id = max(self._max_seq_id, batch.max_seq_id) + self._max_seq_id = batch.max_seq_id @trace_method("LocalHnswSegment._write_records", OpenTelemetryGranularity.ALL) def _write_records(self, records: Sequence[LogRecord]) -> None: diff --git a/chromadb/test/property/invariants.py b/chromadb/test/property/invariants.py index 7df070d3a295..69987234a843 100644 --- a/chromadb/test/property/invariants.py +++ b/chromadb/test/property/invariants.py @@ -1,5 +1,6 @@ import gc import math +from chromadb.api.configuration import HNSWConfigurationInternal from chromadb.config import System from chromadb.db.base import get_sql from chromadb.db.impl.sqlite import SqliteDB @@ -341,9 +342,15 @@ def log_size_below_max( # Must always keep one entry to avoid reusing seq_ids assert _total_embedding_queue_log_size(sqlite) >= 1 - # todo: use new collection config API when available - sync_threshold = collection.metadata.get("hnsw:sync_threshold", 1000) - batch_size = collection.metadata.get("hnsw:batch_size", 100) + hnsw_config = cast( + HNSWConfigurationInternal, + collection.get_model() + .get_configuration() + .get_parameter("hnsw_configuration") + .value, + ) + sync_threshold = cast(int, hnsw_config.get_parameter("sync_threshold").value) + batch_size = cast(int, hnsw_config.get_parameter("batch_size").value) # -1 is used because the queue is always at least 1 entry long, so deletion stops before the max ack'ed sequence ID. # And if the batch_size != sync_threshold, the queue can have up to batch_size - 1 more entries. diff --git a/chromadb/test/property/test_cross_version_persist.py b/chromadb/test/property/test_cross_version_persist.py index 621d75672e26..d29676c1922f 100644 --- a/chromadb/test/property/test_cross_version_persist.py +++ b/chromadb/test/property/test_cross_version_persist.py @@ -351,6 +351,7 @@ def test_cycle_versions( # Should be able to clean log immediately after updating embeddings_queue = system.instance(SqliteDB) + # 07/29/24: the max_seq_id for vector segments was moved from the pickled metadata file to SQLite. # Cleaning the log is dependent on vector segments migrating their max_seq_id from the pickled metadata file to SQLite. # Vector segments migrate this field automatically on init, but at this point the segment has not been loaded yet. trigger_vector_segments_max_seq_id_migration(