diff --git a/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs b/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs new file mode 100644 index 00000000000..877c5980c85 --- /dev/null +++ b/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs @@ -0,0 +1,119 @@ +use crate::execution::operator::Operator; +use crate::segment::metadata_segment::MetadataSegmentError; +use crate::segment::record_segment::ApplyMaterializedLogError; +use crate::segment::record_segment::RecordSegmentReader; +use crate::segment::record_segment::RecordSegmentReaderCreationError; +use crate::segment::LogMaterializerError; +use crate::segment::MaterializeLogsResult; +use crate::segment::SegmentWriter; +use async_trait::async_trait; +use chroma_error::ChromaError; +use chroma_error::ErrorCodes; +use thiserror::Error; +use tracing::Instrument; + +#[derive(Error, Debug)] +pub enum ApplyLogToSegmentWriterOperatorError { + #[error("Log materialization result is empty")] + LogMaterializationResultEmpty, + #[error("Preparation for log materialization failed {0}")] + LogMaterializationPreparationError(#[from] RecordSegmentReaderCreationError), + #[error("Log materialization failed {0}")] + LogMaterializationError(#[from] LogMaterializerError), + #[error("Materialized logs failed to apply {0}")] + ApplyMaterializedLogsError(#[from] ApplyMaterializedLogError), + #[error("Materialized logs failed to apply {0}")] + ApplyMaterializedLogsErrorMetadataSegment(#[from] MetadataSegmentError), +} + +impl ChromaError for ApplyLogToSegmentWriterOperatorError { + fn code(&self) -> ErrorCodes { + match self { + ApplyLogToSegmentWriterOperatorError::LogMaterializationResultEmpty => { + ErrorCodes::Internal + } + ApplyLogToSegmentWriterOperatorError::LogMaterializationPreparationError(e) => e.code(), + ApplyLogToSegmentWriterOperatorError::LogMaterializationError(e) => e.code(), + ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(e) => e.code(), + ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsErrorMetadataSegment(e) => { + e.code() + } + } + } +} + +#[derive(Debug)] +pub struct ApplyLogToSegmentWriterOperator {} + +impl ApplyLogToSegmentWriterOperator { + pub fn new() -> Box { + Box::new(ApplyLogToSegmentWriterOperator {}) + } +} + +#[derive(Debug)] +pub struct ApplyLogToSegmentWriterInput<'bf, Writer: SegmentWriter> { + segment_writer: Writer, + materialized_logs: MaterializeLogsResult, + record_segment_reader: Option>, +} + +impl<'bf, Writer: SegmentWriter> ApplyLogToSegmentWriterInput<'bf, Writer> { + pub fn new( + segment_writer: Writer, + materialized_logs: MaterializeLogsResult, + record_segment_reader: Option>, + ) -> Self { + ApplyLogToSegmentWriterInput { + segment_writer, + materialized_logs, + record_segment_reader, + } + } +} + +#[derive(Debug)] +pub struct ApplyLogToSegmentWriterOutput {} + +#[async_trait] +impl + Operator, ApplyLogToSegmentWriterOutput> + for ApplyLogToSegmentWriterOperator +{ + type Error = ApplyLogToSegmentWriterOperatorError; + + fn get_name(&self) -> &'static str { + "ApplyLogToSegmentWriterOperator" + } + + async fn run( + &self, + input: &ApplyLogToSegmentWriterInput, + ) -> Result { + if input.materialized_logs.is_empty() { + return Err(ApplyLogToSegmentWriterOperatorError::LogMaterializationResultEmpty); + } + + // Apply materialized records. + match input + .segment_writer + .apply_materialized_log_chunk(&input.record_segment_reader, &input.materialized_logs) + .instrument(tracing::trace_span!( + "Apply materialized logs", + otel.name = format!( + "Apply materialized logs to segment writer {}", + input.segment_writer.get_name() + ), + segment = input.segment_writer.get_name() + )) + .await + { + Ok(()) => (), + Err(e) => { + return Err(ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(e)); + } + } + + Ok(ApplyLogToSegmentWriterOutput {}) + } +} diff --git a/rust/worker/src/execution/operators/mod.rs b/rust/worker/src/execution/operators/mod.rs index d7f959db448..980903feb8a 100644 --- a/rust/worker/src/execution/operators/mod.rs +++ b/rust/worker/src/execution/operators/mod.rs @@ -1,3 +1,4 @@ +pub mod apply_log_to_segment_writer; pub(super) mod count_records; pub(super) mod flush_s3; pub mod materialize_logs; @@ -7,7 +8,6 @@ pub mod spann_bf_pl; pub(super) mod spann_centers_search; pub(super) mod spann_fetch_pl; pub mod spann_knn_merge; -pub(super) mod write_segments; // Required for benchmark pub mod fetch_log; diff --git a/rust/worker/src/execution/operators/write_segments.rs b/rust/worker/src/execution/operators/write_segments.rs deleted file mode 100644 index fa0d041955a..00000000000 --- a/rust/worker/src/execution/operators/write_segments.rs +++ /dev/null @@ -1,184 +0,0 @@ -use crate::execution::operator::Operator; -use crate::execution::orchestration::CompactWriters; -use crate::segment::metadata_segment::MetadataSegmentError; -use crate::segment::record_segment::ApplyMaterializedLogError; -use crate::segment::record_segment::RecordSegmentReader; -use crate::segment::record_segment::RecordSegmentReaderCreationError; -use crate::segment::LogMaterializerError; -use crate::segment::MaterializeLogsResult; -use crate::segment::SegmentWriter; -use async_trait::async_trait; -use chroma_blockstore::provider::BlockfileProvider; -use chroma_error::ChromaError; -use chroma_error::ErrorCodes; -use chroma_types::Segment; -use thiserror::Error; -use tracing::Instrument; - -#[derive(Error, Debug)] -pub enum WriteSegmentsOperatorError { - #[error("Log materialization result empty")] - LogMaterializationResultEmpty, - #[error("Preparation for log materialization failed {0}")] - LogMaterializationPreparationError(#[from] RecordSegmentReaderCreationError), - #[error("Log materialization failed {0}")] - LogMaterializationError(#[from] LogMaterializerError), - #[error("Materialized logs failed to apply {0}")] - ApplyMaterializedLogsError(#[from] ApplyMaterializedLogError), - #[error("Materialized logs failed to apply {0}")] - ApplyMaterializedLogsErrorMetadataSegment(#[from] MetadataSegmentError), -} - -impl ChromaError for WriteSegmentsOperatorError { - fn code(&self) -> ErrorCodes { - match self { - WriteSegmentsOperatorError::LogMaterializationResultEmpty => ErrorCodes::Internal, - WriteSegmentsOperatorError::LogMaterializationPreparationError(e) => e.code(), - WriteSegmentsOperatorError::LogMaterializationError(e) => e.code(), - WriteSegmentsOperatorError::ApplyMaterializedLogsError(e) => e.code(), - WriteSegmentsOperatorError::ApplyMaterializedLogsErrorMetadataSegment(e) => e.code(), - } - } -} - -#[derive(Debug)] -pub struct WriteSegmentsOperator {} - -impl WriteSegmentsOperator { - pub fn new() -> Box { - Box::new(WriteSegmentsOperator {}) - } -} - -#[derive(Debug)] -pub struct WriteSegmentsInput { - writers: CompactWriters, - provider: BlockfileProvider, - record_segment: Segment, - materialized_logs: MaterializeLogsResult, -} - -impl WriteSegmentsInput { - pub fn new( - writers: CompactWriters, - provider: BlockfileProvider, - record_segment: Segment, - materialized_logs: MaterializeLogsResult, - ) -> Self { - WriteSegmentsInput { - writers, - provider, - record_segment, - materialized_logs, - } - } -} - -#[derive(Debug)] -pub struct WriteSegmentsOutput { - pub(crate) writers: CompactWriters, -} - -#[async_trait] -impl Operator for WriteSegmentsOperator { - type Error = WriteSegmentsOperatorError; - - fn get_name(&self) -> &'static str { - "WriteSegmentsOperator" - } - - async fn run(&self, input: &WriteSegmentsInput) -> Result { - if input.materialized_logs.is_empty() { - return Err(WriteSegmentsOperatorError::LogMaterializationResultEmpty); - } - - // Prepare for log materialization. - let record_segment_reader: Option; - match RecordSegmentReader::from_segment(&input.record_segment, &input.provider).await { - Ok(reader) => { - record_segment_reader = Some(reader); - } - Err(e) => { - match *e { - // Uninitialized segment is fine and means that the record - // segment is not yet initialized in storage. - RecordSegmentReaderCreationError::UninitializedSegment => { - record_segment_reader = None; - } - RecordSegmentReaderCreationError::BlockfileOpenError(e) => { - tracing::error!("Error creating record segment reader {}", e); - return Err( - WriteSegmentsOperatorError::LogMaterializationPreparationError( - RecordSegmentReaderCreationError::BlockfileOpenError(e), - ), - ); - } - RecordSegmentReaderCreationError::InvalidNumberOfFiles => { - tracing::error!("Error creating record segment reader {}", e); - return Err( - WriteSegmentsOperatorError::LogMaterializationPreparationError( - RecordSegmentReaderCreationError::InvalidNumberOfFiles, - ), - ); - } - RecordSegmentReaderCreationError::DataRecordNotFound(c) => { - tracing::error!( - "Error creating record segment reader: offset {} not found", - c - ); - return Err( - WriteSegmentsOperatorError::LogMaterializationPreparationError(*e), - ); - } - RecordSegmentReaderCreationError::UserRecordNotFound(ref c) => { - tracing::error!( - "Error creating record segment reader: user {} not found", - c - ); - return Err( - WriteSegmentsOperatorError::LogMaterializationPreparationError(*e), - ); - } - }; - } - }; - - // Apply materialized records. - input - .writers - .record - .apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs) - .instrument(tracing::trace_span!( - "Apply materialized logs to record segment" - )) - .await - .map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?; - tracing::debug!("Applied materialized records to record segment"); - - input - .writers - .metadata - .apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs) - .instrument(tracing::trace_span!( - "Apply materialized logs to metadata segment" - )) - .await - .map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?; - tracing::debug!("Applied materialized records to metadata segment"); - - input - .writers - .vector - .apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs) - .instrument(tracing::trace_span!( - "Apply materialized logs to HNSW segment" - )) - .await - .map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?; - tracing::debug!("Applied Materialized Records to HNSW Segment"); - - Ok(WriteSegmentsOutput { - writers: input.writers.clone(), - }) - } -} diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index e47775f42d6..d13ae829eb7 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -5,6 +5,10 @@ use crate::execution::dispatcher::Dispatcher; use crate::execution::operator::TaskError; use crate::execution::operator::TaskMessage; use crate::execution::operator::TaskResult; +use crate::execution::operators::apply_log_to_segment_writer::ApplyLogToSegmentWriterInput; +use crate::execution::operators::apply_log_to_segment_writer::ApplyLogToSegmentWriterOperator; +use crate::execution::operators::apply_log_to_segment_writer::ApplyLogToSegmentWriterOperatorError; +use crate::execution::operators::apply_log_to_segment_writer::ApplyLogToSegmentWriterOutput; use crate::execution::operators::fetch_log::FetchLogError; use crate::execution::operators::fetch_log::FetchLogOperator; use crate::execution::operators::fetch_log::FetchLogOutput; @@ -22,10 +26,6 @@ use crate::execution::operators::register::RegisterError; use crate::execution::operators::register::RegisterInput; use crate::execution::operators::register::RegisterOperator; use crate::execution::operators::register::RegisterOutput; -use crate::execution::operators::write_segments::WriteSegmentsInput; -use crate::execution::operators::write_segments::WriteSegmentsOperator; -use crate::execution::operators::write_segments::WriteSegmentsOperatorError; -use crate::execution::operators::write_segments::WriteSegmentsOutput; use crate::log::log::Log; use crate::segment::distributed_hnsw_segment::DistributedHNSWSegmentWriter; use crate::segment::metadata_segment::MetadataSegmentWriter; @@ -151,8 +151,8 @@ pub enum CompactionError { Partition(#[from] PartitionError), #[error("MaterializeLogs error: {0}")] MaterializeLogs(#[from] MaterializeLogOperatorError), - #[error("WriteSegments error: {0}")] - WriteSegments(#[from] WriteSegmentsOperatorError), + #[error("Apply logs to segment writer error: {0}")] + ApplyLogToSegmentWriter(#[from] ApplyLogToSegmentWriterOperatorError), #[error("Could not create record segment reader: {0}")] RecordSegmentReaderCreationFailed(#[from] RecordSegmentReaderCreationError), #[error("GetSegmentWriters error: {0}")] @@ -280,7 +280,7 @@ impl CompactOrchestrator { None => return, }; - self.num_write_tasks = partitions.len() as i32; + self.num_write_tasks = partitions.len() as i32 * 3; for partition in partitions.iter() { let operator = MaterializeLogOperator::new(); let input = MaterializeLogInput::new( @@ -298,7 +298,9 @@ impl CompactOrchestrator { &mut self, materialized_logs: MaterializeLogsResult, self_address: Box< - dyn ReceiverForMessage>, + dyn ReceiverForMessage< + TaskResult, + >, >, ctx: &crate::system::ComponentContext, ) { @@ -314,15 +316,53 @@ impl CompactOrchestrator { None => return, }; - let operator = WriteSegmentsOperator::new(); - let input = WriteSegmentsInput::new( - writers, - self.blockfile_provider.clone(), - record_segment, - materialized_logs, - ); - let task = wrap(operator, input, self_address); - self.send(task, ctx).await; + let record_segment_reader: Option> = match self.ok_or_terminate( + match RecordSegmentReader::from_segment(&record_segment, &self.blockfile_provider).await + { + Ok(reader) => Ok(Some(reader)), + Err(err) => match *err { + RecordSegmentReaderCreationError::UninitializedSegment => Ok(None), + _ => Err(*err), + }, + }, + ctx, + ) { + Some(reader) => reader, + None => return, + }; + + { + let operator = ApplyLogToSegmentWriterOperator::new(); + let input = ApplyLogToSegmentWriterInput::new( + writers.metadata, + materialized_logs.clone(), + record_segment_reader.clone(), + ); + let task = wrap(operator, input, self_address.clone()); + self.send(task, ctx).await; + } + + { + let operator = ApplyLogToSegmentWriterOperator::new(); + let input = ApplyLogToSegmentWriterInput::new( + writers.record, + materialized_logs.clone(), + record_segment_reader.clone(), + ); + let task = wrap(operator, input, self_address.clone()); + self.send(task, ctx).await; + } + + { + let operator = ApplyLogToSegmentWriterOperator::new(); + let input = ApplyLogToSegmentWriterInput::new( + *writers.vector, + materialized_logs, + record_segment_reader, + ); + let task = wrap(operator, input, self_address); + self.send(task, ctx).await; + } } async fn flush_s3( @@ -595,7 +635,7 @@ impl Handler> }; if materialized_result.is_empty() { - self.num_write_tasks -= 1; + self.num_write_tasks -= 3; if self.num_write_tasks == 0 { // There is nothing to flush, proceed to register @@ -609,27 +649,30 @@ impl Handler> } #[async_trait] -impl Handler> for CompactOrchestrator { +impl Handler> + for CompactOrchestrator +{ type Result = (); async fn handle( &mut self, - message: TaskResult, + message: TaskResult, ctx: &crate::system::ComponentContext, ) { - let output = match self.ok_or_terminate(message.into_inner(), ctx) { - Some(output) => output, + match self.ok_or_terminate(message.into_inner(), ctx) { + Some(_) => (), None => return, }; self.num_write_tasks -= 1; if self.num_write_tasks == 0 { - self.flush_s3( - output.writers.record, - output.writers.vector, - output.writers.metadata, - ctx, - ) - .await; + let writers = self.get_segment_writers().await; + let writers = match self.ok_or_terminate(writers, ctx) { + Some(writers) => writers, + None => return, + }; + + self.flush_s3(writers.record, writers.vector, writers.metadata, ctx) + .await; } } } diff --git a/rust/worker/src/segment/distributed_hnsw_segment.rs b/rust/worker/src/segment/distributed_hnsw_segment.rs index d64f213091d..c1188c8d1c8 100644 --- a/rust/worker/src/segment/distributed_hnsw_segment.rs +++ b/rust/worker/src/segment/distributed_hnsw_segment.rs @@ -193,6 +193,10 @@ impl DistributedHNSWSegmentWriter { } impl SegmentWriter for DistributedHNSWSegmentWriter { + fn get_name(&self) -> &'static str { + "DistributedHNSWSegmentWriter" + } + async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index 92aa7d255c7..3583a2bc63b 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -532,6 +532,10 @@ impl<'me> MetadataSegmentWriter<'me> { } impl SegmentWriter for MetadataSegmentWriter<'_> { + fn get_name(&self) -> &'static str { + "MetadataSegmentWriter" + } + async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index a0e9a74c5c2..611fa9fa45b 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -344,6 +344,10 @@ impl ChromaError for ApplyMaterializedLogError { } impl SegmentWriter for RecordSegmentWriter { + fn get_name(&self) -> &'static str { + "RecordSegmentWriter" + } + async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, @@ -647,6 +651,12 @@ pub struct RecordSegmentReader<'me> { curr_max_offset_id: Arc, } +impl Debug for RecordSegmentReader<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("RecordSegmentReader").finish() + } +} + #[derive(Error, Debug)] pub enum RecordSegmentReaderCreationError { #[error("Segment uninitialized")] diff --git a/rust/worker/src/segment/spann_segment.rs b/rust/worker/src/segment/spann_segment.rs index aad1c2299f6..7ca939eebaf 100644 --- a/rust/worker/src/segment/spann_segment.rs +++ b/rust/worker/src/segment/spann_segment.rs @@ -240,6 +240,10 @@ struct SpannSegmentFlusher { } impl SegmentWriter for SpannSegmentWriter { + fn get_name(&self) -> &'static str { + "SpannSegmentWriter" + } + async fn apply_materialized_log_chunk( &self, record_segment_reader: &Option>, diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index f329e3acc17..523c84f77e6 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -6,6 +6,7 @@ use chroma_types::{ UpdateMetadataValue, }; use std::collections::{HashMap, HashSet}; +use std::future::Future; use std::sync::atomic::AtomicU32; use std::sync::Arc; use thiserror::Error; @@ -863,11 +864,12 @@ pub async fn materialize_logs( // This needs to be public for testing #[allow(async_fn_in_trait)] pub trait SegmentWriter { - async fn apply_materialized_log_chunk( + fn get_name(&self) -> &'static str; + fn apply_materialized_log_chunk( &self, record_segment_reader: &Option, materialized_chunk: &MaterializeLogsResult, - ) -> Result<(), ApplyMaterializedLogError>; + ) -> impl Future> + Send; async fn commit(self) -> Result>; }