diff --git a/Cargo.lock b/Cargo.lock index 8cc4fa7bd8d..5baa5bbafbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1365,7 +1365,7 @@ dependencies = [ [[package]] name = "chromadb" version = "2.0.0" -source = "git+https://github.com/rescrv/chromadb-rs?rev=4017daf7c5c601a095b5c509bfddabc6125e6616#4017daf7c5c601a095b5c509bfddabc6125e6616" +source = "git+https://github.com/rescrv/chromadb-rs?rev=b78e0ad1281b0ae7427dbe3dd8a68408af4bb323#b78e0ad1281b0ae7427dbe3dd8a68408af4bb323" dependencies = [ "anyhow", "async-trait", @@ -1374,6 +1374,7 @@ dependencies = [ "reqwest 0.11.27", "serde", "serde_json", + "tokio", ] [[package]] diff --git a/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs b/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs index 3b6c7891bdc..1652bb7b99b 100644 --- a/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs +++ b/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs @@ -3,9 +3,9 @@ use crate::segment::metadata_segment::MetadataSegmentError; use crate::segment::record_segment::ApplyMaterializedLogError; use crate::segment::record_segment::RecordSegmentReader; use crate::segment::record_segment::RecordSegmentReaderCreationError; +use crate::segment::ChromaSegmentWriter; use crate::segment::LogMaterializerError; use crate::segment::MaterializeLogsResult; -use crate::segment::SegmentWriter; use async_trait::async_trait; use chroma_error::ChromaError; use chroma_error::ErrorCodes; @@ -53,15 +53,15 @@ impl ApplyLogToSegmentWriterOperator { } #[derive(Debug)] -pub struct ApplyLogToSegmentWriterInput<'bf, Writer: SegmentWriter> { - segment_writer: Writer, +pub struct ApplyLogToSegmentWriterInput<'bf> { + segment_writer: ChromaSegmentWriter<'bf>, materialized_logs: MaterializeLogsResult, record_segment_reader: Option>, } -impl<'bf, Writer: SegmentWriter> ApplyLogToSegmentWriterInput<'bf, Writer> { +impl<'bf> ApplyLogToSegmentWriterInput<'bf> { pub fn new( - segment_writer: Writer, + segment_writer: ChromaSegmentWriter<'bf>, materialized_logs: MaterializeLogsResult, record_segment_reader: Option>, ) -> Self { @@ -79,8 +79,7 @@ pub struct ApplyLogToSegmentWriterOutput { } #[async_trait] -impl - Operator, ApplyLogToSegmentWriterOutput> +impl Operator, ApplyLogToSegmentWriterOutput> for ApplyLogToSegmentWriterOperator { type Error = ApplyLogToSegmentWriterOperatorError; @@ -91,7 +90,7 @@ impl async fn run( &self, - input: &ApplyLogToSegmentWriterInput, + input: &ApplyLogToSegmentWriterInput, ) -> Result { if input.materialized_logs.is_empty() { return Err(ApplyLogToSegmentWriterOperatorError::LogMaterializationResultEmpty); diff --git a/rust/worker/src/execution/operators/commit_segment_writer.rs b/rust/worker/src/execution/operators/commit_segment_writer.rs index 87d023912f4..e6e15a60ccf 100644 --- a/rust/worker/src/execution/operators/commit_segment_writer.rs +++ b/rust/worker/src/execution/operators/commit_segment_writer.rs @@ -1,7 +1,6 @@ use crate::execution::operator::Operator; use crate::segment::ChromaSegmentFlusher; use crate::segment::ChromaSegmentWriter; -use crate::segment::SegmentWriter; use async_trait::async_trait; use chroma_error::ChromaError; use chroma_error::ErrorCodes; diff --git a/rust/worker/src/execution/operators/count_records.rs b/rust/worker/src/execution/operators/count_records.rs index 243ac44c23f..ca866f8b2ef 100644 --- a/rust/worker/src/execution/operators/count_records.rs +++ b/rust/worker/src/execution/operators/count_records.rs @@ -207,7 +207,7 @@ mod tests { operator::Operator, operators::count_records::{CountRecordsInput, CountRecordsOperator}, }, - segment::{record_segment::RecordSegmentWriter, SegmentWriter}, + segment::record_segment::RecordSegmentWriter, }; use chroma_blockstore::provider::BlockfileProvider; use chroma_types::{Chunk, CollectionUuid, LogRecord, Operation, OperationRecord, SegmentUuid}; diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index d5eb311d62b..4762493d289 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -41,7 +41,6 @@ use crate::segment::ChromaSegmentFlusher; use crate::segment::ChromaSegmentWriter; use crate::segment::MaterializeLogsResult; use crate::segment::SegmentFlusher; -use crate::segment::SegmentWriter; use crate::sysdb::sysdb::GetCollectionsError; use crate::sysdb::sysdb::GetSegmentsError; use crate::sysdb::sysdb::SysDb; @@ -361,16 +360,17 @@ impl CompactOrchestrator { { self.num_uncompleted_tasks_by_segment - .entry(writers.metadata.get_id()) + .entry(writers.metadata.id) .and_modify(|v| { *v += 1; }) .or_insert(1); - let span = self.get_segment_writer_span(&writers.metadata); + let writer = ChromaSegmentWriter::MetadataSegment(writers.metadata); + let span = self.get_segment_writer_span(&writer); let operator = ApplyLogToSegmentWriterOperator::new(); let input = ApplyLogToSegmentWriterInput::new( - writers.metadata, + writer, materialized_logs.clone(), record_segment_reader.clone(), ); @@ -384,16 +384,17 @@ impl CompactOrchestrator { { self.num_uncompleted_tasks_by_segment - .entry(writers.record.get_id()) + .entry(writers.record.id) .and_modify(|v| { *v += 1; }) .or_insert(1); - let span = self.get_segment_writer_span(&writers.record); + let writer = ChromaSegmentWriter::RecordSegment(writers.record); + let span = self.get_segment_writer_span(&writer); let operator = ApplyLogToSegmentWriterOperator::new(); let input = ApplyLogToSegmentWriterInput::new( - writers.record, + writer, materialized_logs.clone(), record_segment_reader.clone(), ); @@ -413,13 +414,11 @@ impl CompactOrchestrator { }) .or_insert(1); - let span = self.get_segment_writer_span(writers.vector.as_ref()); + let writer = ChromaSegmentWriter::DistributedHNSWSegment(writers.vector); + let span = self.get_segment_writer_span(&writer); let operator = ApplyLogToSegmentWriterOperator::new(); - let input = ApplyLogToSegmentWriterInput::new( - *writers.vector, - materialized_logs, - record_segment_reader, - ); + let input = + ApplyLogToSegmentWriterInput::new(writer, materialized_logs, record_segment_reader); let task = wrap(operator, input, self_address); let res = self.dispatcher().send(task, Some(span)).await; self.ok_or_terminate(res, ctx); @@ -612,11 +611,11 @@ impl CompactOrchestrator { ) -> Result, GetSegmentWritersError> { let writers = self.get_segment_writers().await?; - if writers.metadata.get_id() == segment_id { + if writers.metadata.id == segment_id { return Ok(ChromaSegmentWriter::MetadataSegment(writers.metadata)); } - if writers.record.get_id() == segment_id { + if writers.record.id == segment_id { return Ok(ChromaSegmentWriter::RecordSegment(writers.record)); } @@ -627,7 +626,7 @@ impl CompactOrchestrator { Err(GetSegmentWritersError::NoSegmentsFound) } - fn get_segment_writer_span(&mut self, writer: &W) -> Span { + fn get_segment_writer_span(&mut self, writer: &ChromaSegmentWriter) -> Span { let span = self .segment_spans .entry(writer.get_id()) diff --git a/rust/worker/src/segment/distributed_hnsw_segment.rs b/rust/worker/src/segment/distributed_hnsw_segment.rs index 1788fb2e8a1..da0d24b01f2 100644 --- a/rust/worker/src/segment/distributed_hnsw_segment.rs +++ b/rust/worker/src/segment/distributed_hnsw_segment.rs @@ -1,6 +1,6 @@ use super::record_segment::{ApplyMaterializedLogError, RecordSegmentReader}; use super::utils::hnsw_params_from_segment; -use super::{MaterializeLogsResult, SegmentFlusher, SegmentWriter}; +use super::{MaterializeLogsResult, SegmentFlusher}; use crate::segment::utils::distance_function_from_segment; use async_trait::async_trait; use chroma_distance::DistanceFunctionError; @@ -190,20 +190,8 @@ impl DistributedHNSWSegmentWriter { ))) } } -} - -impl SegmentWriter for DistributedHNSWSegmentWriter { - type Flusher = DistributedHNSWSegmentWriter; - - fn get_id(&self) -> SegmentUuid { - self.id - } - - fn get_name(&self) -> &'static str { - "DistributedHNSWSegmentWriter" - } - async fn apply_materialized_log_chunk( + pub async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, materialized: &MaterializeLogsResult, @@ -266,7 +254,7 @@ impl SegmentWriter for DistributedHNSWSegmentWriter { Ok(()) } - async fn commit(self) -> Result> { + pub async fn commit(self) -> Result> { let res = self.hnsw_index_provider.commit(self.index.clone()); match res { Ok(_) => Ok(self), diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index a945de85bbd..2f3a390847b 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -1,6 +1,5 @@ use super::super::execution::operators::filter::MetadataProvider; use super::record_segment::ApplyMaterializedLogError; -use super::types::SegmentWriter; use super::SegmentFlusher; use crate::execution::operators::filter::RoaringMetadataFilter; use crate::segment::record_segment::RecordSegmentReader; @@ -470,20 +469,8 @@ impl<'me> MetadataSegmentWriter<'me> { // Insert new value. Ok(self.set_metadata(key, new_value, offset_id).await?) } -} - -impl SegmentWriter for MetadataSegmentWriter<'_> { - type Flusher = MetadataSegmentFlusher; - - fn get_id(&self) -> SegmentUuid { - self.id - } - - fn get_name(&self) -> &'static str { - "MetadataSegmentWriter" - } - async fn apply_materialized_log_chunk( + pub async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, materialized: &MaterializeLogsResult, @@ -662,7 +649,7 @@ impl SegmentWriter for MetadataSegmentWriter<'_> { Ok(()) } - async fn finish(&mut self) -> Result<(), Box> { + pub async fn finish(&mut self) -> Result<(), Box> { let mut full_text_index_writer = match self.full_text_index_writer.take() { Some(writer) => writer, None => return Err(Box::new(MetadataSegmentError::NoWriter)), @@ -721,7 +708,7 @@ impl SegmentWriter for MetadataSegmentWriter<'_> { Ok(()) } - async fn commit(self) -> Result> { + pub async fn commit(self) -> Result> { let full_text_flusher = match self.full_text_index_writer { Some(flusher) => match flusher.commit().await { Ok(flusher) => flusher, @@ -1127,7 +1114,7 @@ mod test { record_segment::{ RecordSegmentReader, RecordSegmentReaderCreationError, RecordSegmentWriter, }, - SegmentFlusher, SegmentWriter, + SegmentFlusher, }; use chroma_blockstore::{ arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider}, diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index 055ef839552..f36d7b764b0 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -1,5 +1,4 @@ use super::spann_segment::SpannSegmentWriterError; -use super::types::SegmentWriter; use super::{ HydratedMaterializedLogRecord, LogMaterializerError, MaterializeLogsResult, SegmentFlusher, }; @@ -302,59 +301,8 @@ impl RecordSegmentWriter { id: segment.id, }) } -} - -#[derive(Error, Debug)] -// TODO(Sanket): Should compose errors here but can't currently because -// of Box. -// Since blockfile does not support read then write semantics natively -// all write operations to it are either set or delete. -pub enum ApplyMaterializedLogError { - #[error("Error setting to blockfile")] - BlockfileSet, - #[error("Error deleting from blockfile")] - BlockfileDelete, - #[error("Error updating blockfile")] - BlockfileUpdate, - #[error("Allocation error")] - Allocation, - #[error("Error writing to the full text index: {0}")] - FullTextIndex(#[from] FullTextIndexError), - #[error("Error writing to hnsw index")] - HnswIndex(#[from] Box), - #[error("Log materialization error: {0}")] - Materialization(#[from] LogMaterializerError), - #[error("Error applying materialized records to spann segment: {0}")] - SpannSegmentError(#[from] SpannSegmentWriterError), -} - -impl ChromaError for ApplyMaterializedLogError { - fn code(&self) -> ErrorCodes { - match self { - ApplyMaterializedLogError::BlockfileSet => ErrorCodes::Internal, - ApplyMaterializedLogError::BlockfileDelete => ErrorCodes::Internal, - ApplyMaterializedLogError::BlockfileUpdate => ErrorCodes::Internal, - ApplyMaterializedLogError::Allocation => ErrorCodes::Internal, - ApplyMaterializedLogError::FullTextIndex(e) => e.code(), - ApplyMaterializedLogError::HnswIndex(_) => ErrorCodes::Internal, - ApplyMaterializedLogError::Materialization(e) => e.code(), - ApplyMaterializedLogError::SpannSegmentError(e) => e.code(), - } - } -} - -impl SegmentWriter for RecordSegmentWriter { - type Flusher = RecordSegmentFlusher; - fn get_id(&self) -> SegmentUuid { - self.id - } - - fn get_name(&self) -> &'static str { - "RecordSegmentWriter" - } - - async fn apply_materialized_log_chunk( + pub async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, materialized: &MaterializeLogsResult, @@ -499,7 +447,7 @@ impl SegmentWriter for RecordSegmentWriter { Ok(()) } - async fn commit(mut self) -> Result> { + pub async fn commit(mut self) -> Result> { // Commit all the blockfiles let flusher_user_id_to_id = self .user_id_to_id @@ -571,6 +519,45 @@ impl SegmentWriter for RecordSegmentWriter { } } +#[derive(Error, Debug)] +// TODO(Sanket): Should compose errors here but can't currently because +// of Box. +// Since blockfile does not support read then write semantics natively +// all write operations to it are either set or delete. +pub enum ApplyMaterializedLogError { + #[error("Error setting to blockfile")] + BlockfileSet, + #[error("Error deleting from blockfile")] + BlockfileDelete, + #[error("Error updating blockfile")] + BlockfileUpdate, + #[error("Allocation error")] + Allocation, + #[error("Error writing to the full text index: {0}")] + FullTextIndex(#[from] FullTextIndexError), + #[error("Error writing to hnsw index")] + HnswIndex(#[from] Box), + #[error("Log materialization error: {0}")] + Materialization(#[from] LogMaterializerError), + #[error("Error applying materialized records to spann segment: {0}")] + SpannSegmentError(#[from] SpannSegmentWriterError), +} + +impl ChromaError for ApplyMaterializedLogError { + fn code(&self) -> ErrorCodes { + match self { + ApplyMaterializedLogError::BlockfileSet => ErrorCodes::Internal, + ApplyMaterializedLogError::BlockfileDelete => ErrorCodes::Internal, + ApplyMaterializedLogError::BlockfileUpdate => ErrorCodes::Internal, + ApplyMaterializedLogError::Allocation => ErrorCodes::Internal, + ApplyMaterializedLogError::FullTextIndex(e) => e.code(), + ApplyMaterializedLogError::HnswIndex(_) => ErrorCodes::Internal, + ApplyMaterializedLogError::Materialization(e) => e.code(), + ApplyMaterializedLogError::SpannSegmentError(e) => e.code(), + } + } +} + pub struct RecordSegmentFlusher { id: SegmentUuid, user_id_to_id_flusher: BlockfileFlusher, @@ -900,9 +887,7 @@ mod tests { use crate::{ log::test::{upsert_generator, LogGenerator}, - segment::{ - materialize_logs, record_segment::MAX_OFFSET_ID, test::TestSegment, SegmentWriter, - }, + segment::{materialize_logs, record_segment::MAX_OFFSET_ID, test::TestSegment}, }; use super::RecordSegmentWriter; diff --git a/rust/worker/src/segment/spann_segment.rs b/rust/worker/src/segment/spann_segment.rs index 469d4d482d1..74f2b99f296 100644 --- a/rust/worker/src/segment/spann_segment.rs +++ b/rust/worker/src/segment/spann_segment.rs @@ -18,7 +18,7 @@ use super::record_segment::RecordSegmentReader; use super::{ record_segment::ApplyMaterializedLogError, utils::{distance_function_from_segment, hnsw_params_from_segment}, - SegmentFlusher, SegmentWriter, + SegmentFlusher, }; use super::{BorrowedMaterializedLogRecord, HydratedMaterializedLogRecord, MaterializeLogsResult}; @@ -233,25 +233,9 @@ impl SpannSegmentWriter { .await .map_err(SpannSegmentWriterError::SpannSegmentWriterAddRecordError) } -} - -pub struct SpannSegmentFlusher { - id: SegmentUuid, - index_flusher: SpannIndexFlusher, -} - -impl SegmentWriter for SpannSegmentWriter { - type Flusher = SpannSegmentFlusher; - - fn get_id(&self) -> SegmentUuid { - self.id - } - fn get_name(&self) -> &'static str { - "SpannSegmentWriter" - } - - async fn apply_materialized_log_chunk( + #[allow(dead_code)] + pub async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, materialized_chunk: &MaterializeLogsResult, @@ -290,7 +274,8 @@ impl SegmentWriter for SpannSegmentWriter { Ok(()) } - async fn commit(self) -> Result> { + #[allow(dead_code)] + pub async fn commit(self) -> Result> { let index_flusher = self .index .commit() @@ -306,6 +291,11 @@ impl SegmentWriter for SpannSegmentWriter { } } +pub struct SpannSegmentFlusher { + id: SegmentUuid, + index_flusher: SpannIndexFlusher, +} + #[async_trait] impl SegmentFlusher for SpannSegmentFlusher { fn get_id(&self) -> SegmentUuid { @@ -527,7 +517,7 @@ mod test { use crate::segment::{ materialize_logs, spann_segment::{SpannSegmentReader, SpannSegmentWriter}, - SegmentFlusher, SegmentWriter, + SegmentFlusher, }; #[tokio::test] diff --git a/rust/worker/src/segment/test.rs b/rust/worker/src/segment/test.rs index 571e6619ed2..149f3a4dac2 100644 --- a/rust/worker/src/segment/test.rs +++ b/rust/worker/src/segment/test.rs @@ -12,7 +12,6 @@ use crate::log::test::{LogGenerator, TEST_EMBEDDING_DIMENSION}; use super::{ distributed_hnsw_segment::DistributedHNSWSegmentWriter, materialize_logs, metadata_segment::MetadataSegmentWriter, record_segment::RecordSegmentWriter, SegmentFlusher, - SegmentWriter, }; #[derive(Clone)] diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index 48c6bdcf6cf..0cb29f626e3 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -6,7 +6,6 @@ use chroma_types::{ UpdateMetadata, UpdateMetadataValue, }; use std::collections::{HashMap, HashSet}; -use std::future::Future; use std::sync::atomic::AtomicU32; use std::sync::Arc; use thiserror::Error; @@ -864,24 +863,6 @@ pub async fn materialize_logs( }) } -// This needs to be public for testing -#[allow(async_fn_in_trait)] -pub trait SegmentWriter { - type Flusher: SegmentFlusher + Sync + Send; - - fn get_id(&self) -> SegmentUuid; - fn get_name(&self) -> &'static str; - fn apply_materialized_log_chunk( - &self, - record_segment_reader: &Option, - materialized_chunk: &MaterializeLogsResult, - ) -> impl Future> + Send; - fn finish(&mut self) -> impl Future>> + Send { - async { Ok(()) } - } - fn commit(self) -> impl Future>> + Send; -} - // This needs to be public for testing #[async_trait] pub trait SegmentFlusher { @@ -897,30 +878,24 @@ pub enum ChromaSegmentWriter<'bf> { DistributedHNSWSegment(Box), } -impl<'a> SegmentWriter for ChromaSegmentWriter<'a> { - type Flusher = ChromaSegmentFlusher; - - fn get_id(&self) -> SegmentUuid { +impl<'a> ChromaSegmentWriter<'a> { + pub fn get_id(&self) -> SegmentUuid { match self { - ChromaSegmentWriter::RecordSegment(writer) => writer.get_id(), - ChromaSegmentWriter::MetadataSegment(writer) => writer.get_id(), - ChromaSegmentWriter::DistributedHNSWSegment(writer) => { - SegmentWriter::get_id(writer.as_ref()) - } + ChromaSegmentWriter::RecordSegment(writer) => writer.id, + ChromaSegmentWriter::MetadataSegment(writer) => writer.id, + ChromaSegmentWriter::DistributedHNSWSegment(writer) => writer.get_id(), } } - fn get_name(&self) -> &'static str { + pub fn get_name(&self) -> &'static str { match self { - ChromaSegmentWriter::RecordSegment(writer) => writer.get_name(), - ChromaSegmentWriter::MetadataSegment(writer) => writer.get_name(), - ChromaSegmentWriter::DistributedHNSWSegment(writer) => { - SegmentWriter::get_name(writer.as_ref()) - } + ChromaSegmentWriter::RecordSegment(_) => "RecordSegmentWriter", + ChromaSegmentWriter::MetadataSegment(_) => "MetadataSegmentWriter", + ChromaSegmentWriter::DistributedHNSWSegment(_) => "DistributedHNSWSegmentWriter", } } - async fn apply_materialized_log_chunk( + pub async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, materialized: &MaterializeLogsResult, @@ -944,15 +919,15 @@ impl<'a> SegmentWriter for ChromaSegmentWriter<'a> { } } - async fn finish(&mut self) -> Result<(), Box> { + pub async fn finish(&mut self) -> Result<(), Box> { match self { - ChromaSegmentWriter::RecordSegment(writer) => writer.finish().await, + ChromaSegmentWriter::RecordSegment(_) => Ok(()), ChromaSegmentWriter::MetadataSegment(writer) => writer.finish().await, - ChromaSegmentWriter::DistributedHNSWSegment(writer) => writer.finish().await, + ChromaSegmentWriter::DistributedHNSWSegment(_) => Ok(()), } } - async fn commit(self) -> Result> { + pub async fn commit(self) -> Result> { match self { ChromaSegmentWriter::RecordSegment(writer) => writer .commit() @@ -983,9 +958,7 @@ impl SegmentFlusher for ChromaSegmentFlusher { match self { ChromaSegmentFlusher::RecordSegment(flusher) => flusher.get_id(), ChromaSegmentFlusher::MetadataSegment(flusher) => flusher.get_id(), - ChromaSegmentFlusher::DistributedHNSWSegment(flusher) => { - SegmentWriter::get_id(flusher.as_ref()) - } + ChromaSegmentFlusher::DistributedHNSWSegment(flusher) => flusher.get_id(), } }