From 19e1971be1046bb7d63df27a433cbcaae6979bdd Mon Sep 17 00:00:00 2001 From: Max Isom Date: Fri, 3 Jan 2025 08:16:34 -0800 Subject: [PATCH] [ENH]: move materialization into operator (#3357) ## Description of changes Log materialization is now in its own operator. Having materialization in its own operator unlocks two main benefits: - allows us to pipeline log applications across segment types (currently unrealized) - we can easily bail for any partition with an empty set of materialized records before constructing writers ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust Tested locally with SciDocs as well. ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?* --- .../src/compactor/compaction_manager.rs | 4 - .../execution/operators/materialize_logs.rs | 97 ++++ rust/worker/src/execution/operators/mod.rs | 1 + .../src/execution/operators/write_segments.rs | 161 +++---- .../src/execution/orchestration/compact.rs | 418 ++++++++++-------- 5 files changed, 390 insertions(+), 291 deletions(-) create mode 100644 rust/worker/src/execution/operators/materialize_logs.rs diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index d69d29ec68d..9e6e3147c25 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -23,8 +23,6 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use std::fmt::Debug; use std::fmt::Formatter; -use std::sync::atomic::AtomicU32; -use std::sync::Arc; use std::time::Duration; use thiserror::Error; use tracing::instrument; @@ -123,8 +121,6 @@ impl CompactionManager { self.hnsw_index_provider.clone(), dispatcher, None, - None, - Arc::new(AtomicU32::new(1)), self.max_compaction_size, self.max_partition_size, ); diff --git a/rust/worker/src/execution/operators/materialize_logs.rs b/rust/worker/src/execution/operators/materialize_logs.rs new file mode 100644 index 00000000000..afb4ea3ac43 --- /dev/null +++ b/rust/worker/src/execution/operators/materialize_logs.rs @@ -0,0 +1,97 @@ +use crate::execution::operator::Operator; +use crate::segment::record_segment::RecordSegmentReaderCreationError; +use crate::segment::{materialize_logs, record_segment::RecordSegmentReader}; +use crate::segment::{LogMaterializerError, MaterializeLogsResult}; +use async_trait::async_trait; +use chroma_blockstore::provider::BlockfileProvider; +use chroma_error::ChromaError; +use chroma_types::{Chunk, LogRecord, Segment}; +use futures::TryFutureExt; +use std::sync::atomic::AtomicU32; +use std::sync::Arc; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum MaterializeLogOperatorError { + #[error("Could not create record segment reader: {0}")] + RecordSegmentReaderCreationFailed(#[from] RecordSegmentReaderCreationError), + #[error("Log materialization failed: {0}")] + LogMaterializationFailed(#[from] LogMaterializerError), +} + +impl ChromaError for MaterializeLogOperatorError { + fn code(&self) -> chroma_error::ErrorCodes { + match self { + MaterializeLogOperatorError::RecordSegmentReaderCreationFailed(e) => e.code(), + MaterializeLogOperatorError::LogMaterializationFailed(e) => e.code(), + } + } +} + +#[derive(Debug)] +pub struct MaterializeLogOperator {} + +impl MaterializeLogOperator { + pub fn new() -> Box { + Box::new(MaterializeLogOperator {}) + } +} + +#[derive(Debug)] +pub struct MaterializeLogInput { + logs: Chunk, + provider: BlockfileProvider, + record_segment: Segment, + offset_id: Arc, +} + +impl MaterializeLogInput { + pub fn new( + logs: Chunk, + provider: BlockfileProvider, + record_segment: Segment, + offset_id: Arc, + ) -> Self { + MaterializeLogInput { + logs, + provider, + record_segment, + offset_id, + } + } +} + +#[async_trait] +impl Operator for MaterializeLogOperator { + type Error = MaterializeLogOperatorError; + + async fn run(&self, input: &MaterializeLogInput) -> Result { + tracing::debug!("Materializing {} log entries", input.logs.total_len()); + + let record_segment_reader = + match RecordSegmentReader::from_segment(&input.record_segment, &input.provider).await { + Ok(reader) => Some(reader), + Err(e) => { + match *e { + // Uninitialized segment is fine and means that the record + // segment is not yet initialized in storage. + RecordSegmentReaderCreationError::UninitializedSegment => None, + err => { + tracing::error!("Error creating record segment reader: {:?}", err); + return Err( + MaterializeLogOperatorError::RecordSegmentReaderCreationFailed(err), + ); + } + } + } + }; + + materialize_logs( + &record_segment_reader, + input.logs.clone(), + Some(input.offset_id.clone()), + ) + .map_err(MaterializeLogOperatorError::LogMaterializationFailed) + .await + } +} diff --git a/rust/worker/src/execution/operators/mod.rs b/rust/worker/src/execution/operators/mod.rs index 76eccedaab9..d7f959db448 100644 --- a/rust/worker/src/execution/operators/mod.rs +++ b/rust/worker/src/execution/operators/mod.rs @@ -1,5 +1,6 @@ pub(super) mod count_records; pub(super) mod flush_s3; +pub mod materialize_logs; pub(super) mod partition; pub(super) mod register; pub mod spann_bf_pl; diff --git a/rust/worker/src/execution/operators/write_segments.rs b/rust/worker/src/execution/operators/write_segments.rs index 0af60926025..fa0d041955a 100644 --- a/rust/worker/src/execution/operators/write_segments.rs +++ b/rust/worker/src/execution/operators/write_segments.rs @@ -1,52 +1,42 @@ -use crate::segment::materialize_logs; +use crate::execution::operator::Operator; +use crate::execution::orchestration::CompactWriters; use crate::segment::metadata_segment::MetadataSegmentError; -use crate::segment::metadata_segment::MetadataSegmentWriter; use crate::segment::record_segment::ApplyMaterializedLogError; use crate::segment::record_segment::RecordSegmentReader; use crate::segment::record_segment::RecordSegmentReaderCreationError; use crate::segment::LogMaterializerError; +use crate::segment::MaterializeLogsResult; use crate::segment::SegmentWriter; -use crate::{ - execution::operator::Operator, - segment::{ - distributed_hnsw_segment::DistributedHNSWSegmentWriter, record_segment::RecordSegmentWriter, - }, -}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::ChromaError; use chroma_error::ErrorCodes; -use chroma_types::Chunk; -use chroma_types::LogRecord; use chroma_types::Segment; -use std::sync::atomic::AtomicU32; -use std::sync::Arc; use thiserror::Error; use tracing::Instrument; -use tracing::Span; #[derive(Error, Debug)] pub enum WriteSegmentsOperatorError { + #[error("Log materialization result empty")] + LogMaterializationResultEmpty, #[error("Preparation for log materialization failed {0}")] LogMaterializationPreparationError(#[from] RecordSegmentReaderCreationError), #[error("Log materialization failed {0}")] LogMaterializationError(#[from] LogMaterializerError), #[error("Materialized logs failed to apply {0}")] - ApplyMaterializatedLogsError(#[from] ApplyMaterializedLogError), + ApplyMaterializedLogsError(#[from] ApplyMaterializedLogError), #[error("Materialized logs failed to apply {0}")] - ApplyMaterializatedLogsErrorMetadataSegment(#[from] MetadataSegmentError), - #[error("Unitialized writer")] - UnintializedWriter, + ApplyMaterializedLogsErrorMetadataSegment(#[from] MetadataSegmentError), } impl ChromaError for WriteSegmentsOperatorError { fn code(&self) -> ErrorCodes { match self { + WriteSegmentsOperatorError::LogMaterializationResultEmpty => ErrorCodes::Internal, WriteSegmentsOperatorError::LogMaterializationPreparationError(e) => e.code(), WriteSegmentsOperatorError::LogMaterializationError(e) => e.code(), - WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e) => e.code(), - WriteSegmentsOperatorError::ApplyMaterializatedLogsErrorMetadataSegment(e) => e.code(), - WriteSegmentsOperatorError::UnintializedWriter => ErrorCodes::Internal, + WriteSegmentsOperatorError::ApplyMaterializedLogsError(e) => e.code(), + WriteSegmentsOperatorError::ApplyMaterializedLogsErrorMetadataSegment(e) => e.code(), } } } @@ -62,42 +52,31 @@ impl WriteSegmentsOperator { #[derive(Debug)] pub struct WriteSegmentsInput { - record_segment_writer: Option, - hnsw_segment_writer: Option>, - metadata_segment_writer: Option>, - chunk: Chunk, + writers: CompactWriters, provider: BlockfileProvider, record_segment: Segment, - next_offset_id: Arc, + materialized_logs: MaterializeLogsResult, } impl WriteSegmentsInput { pub fn new( - record_segment_writer: Option, - hnsw_segment_writer: Option>, - metadata_segment_writer: Option>, - chunk: Chunk, + writers: CompactWriters, provider: BlockfileProvider, record_segment: Segment, - next_offset_id: Arc, + materialized_logs: MaterializeLogsResult, ) -> Self { WriteSegmentsInput { - record_segment_writer, - hnsw_segment_writer, - metadata_segment_writer, - chunk, + writers, provider, record_segment, - next_offset_id, + materialized_logs, } } } #[derive(Debug)] pub struct WriteSegmentsOutput { - pub(crate) record_segment_writer: Option, - pub(crate) hnsw_segment_writer: Option>, - pub(crate) metadata_segment_writer: Option>, + pub(crate) writers: CompactWriters, } #[async_trait] @@ -109,7 +88,10 @@ impl Operator for WriteSegmentsOperator } async fn run(&self, input: &WriteSegmentsInput) -> Result { - tracing::debug!("Materializing N Records: {:?}", input.chunk.len()); + if input.materialized_logs.is_empty() { + return Err(WriteSegmentsOperatorError::LogMaterializationResultEmpty); + } + // Prepare for log materialization. let record_segment_reader: Option; match RecordSegmentReader::from_segment(&input.record_segment, &input.provider).await { @@ -160,78 +142,43 @@ impl Operator for WriteSegmentsOperator }; } }; - // Materialize the logs. - let res = match materialize_logs( - &record_segment_reader, - input.chunk.clone(), - Some(input.next_offset_id.clone()), - ) - .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) - .await - { - Ok(records) => records, - Err(e) => { - tracing::error!("Error materializing records {}", e); - return Err(WriteSegmentsOperatorError::LogMaterializationError(e)); - } - }; - if !res.is_empty() { - // Apply materialized records. - match input - .record_segment_writer - .as_ref() - .ok_or(WriteSegmentsOperatorError::UnintializedWriter)? - .apply_materialized_log_chunk(&record_segment_reader, &res) - .instrument(tracing::trace_span!( - "Apply materialized logs to record segment" - )) - .await - { - Ok(()) => (), - Err(e) => { - return Err(WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e)); - } - } - tracing::debug!("Applied materialized records to record segment"); - match input - .metadata_segment_writer - .as_ref() - .ok_or(WriteSegmentsOperatorError::UnintializedWriter)? - .apply_materialized_log_chunk(&record_segment_reader, &res) - .instrument(tracing::trace_span!( - "Apply materialized logs to metadata segment" - )) - .await - { - Ok(()) => (), - Err(e) => { - return Err(WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e)); - } - } - tracing::debug!("Applied materialized records to metadata segment"); - match input - .hnsw_segment_writer - .as_ref() - .ok_or(WriteSegmentsOperatorError::UnintializedWriter)? - .apply_materialized_log_chunk(&record_segment_reader, &res) - .instrument(tracing::trace_span!( - "Apply materialized logs to HNSW segment" - )) - .await - { - Ok(()) => (), - Err(e) => { - return Err(WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e)); - } - } - } + // Apply materialized records. + input + .writers + .record + .apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs) + .instrument(tracing::trace_span!( + "Apply materialized logs to record segment" + )) + .await + .map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?; + tracing::debug!("Applied materialized records to record segment"); + input + .writers + .metadata + .apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs) + .instrument(tracing::trace_span!( + "Apply materialized logs to metadata segment" + )) + .await + .map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?; + tracing::debug!("Applied materialized records to metadata segment"); + + input + .writers + .vector + .apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs) + .instrument(tracing::trace_span!( + "Apply materialized logs to HNSW segment" + )) + .await + .map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?; tracing::debug!("Applied Materialized Records to HNSW Segment"); + Ok(WriteSegmentsOutput { - record_segment_writer: input.record_segment_writer.clone(), - hnsw_segment_writer: input.hnsw_segment_writer.clone(), - metadata_segment_writer: input.metadata_segment_writer.clone(), + writers: input.writers.clone(), }) } } diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index fb241c5f2fc..e47775f42d6 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -11,6 +11,9 @@ use crate::execution::operators::fetch_log::FetchLogOutput; use crate::execution::operators::flush_s3::FlushS3Input; use crate::execution::operators::flush_s3::FlushS3Operator; use crate::execution::operators::flush_s3::FlushS3Output; +use crate::execution::operators::materialize_logs::MaterializeLogInput; +use crate::execution::operators::materialize_logs::MaterializeLogOperator; +use crate::execution::operators::materialize_logs::MaterializeLogOperatorError; use crate::execution::operators::partition::PartitionError; use crate::execution::operators::partition::PartitionInput; use crate::execution::operators::partition::PartitionOperator; @@ -27,7 +30,9 @@ use crate::log::log::Log; use crate::segment::distributed_hnsw_segment::DistributedHNSWSegmentWriter; use crate::segment::metadata_segment::MetadataSegmentWriter; use crate::segment::record_segment::RecordSegmentReader; +use crate::segment::record_segment::RecordSegmentReaderCreationError; use crate::segment::record_segment::RecordSegmentWriter; +use crate::segment::MaterializeLogsResult; use crate::sysdb::sysdb::GetCollectionsError; use crate::sysdb::sysdb::GetSegmentsError; use crate::sysdb::sysdb::SysDb; @@ -35,6 +40,7 @@ use crate::system::ChannelError; use crate::system::ComponentContext; use crate::system::ComponentHandle; use crate::system::Handler; +use crate::system::ReceiverForMessage; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::ChromaError; @@ -43,12 +49,12 @@ use chroma_index::hnsw_provider::HnswIndexProvider; use chroma_types::Chunk; use chroma_types::{CollectionUuid, LogRecord, Segment, SegmentFlushInfo, SegmentType}; use core::panic; -use std::sync::atomic; use std::sync::atomic::AtomicU32; use std::sync::Arc; use thiserror::Error; use tokio::sync::oneshot::error::RecvError; use tokio::sync::oneshot::Sender; +use tokio::sync::OnceCell; use uuid::Uuid; /** The state of the orchestrator. @@ -70,16 +76,16 @@ understand. We can always add more abstraction later if we need it. enum ExecutionState { Pending, Partition, - Write, + MaterializeAndWrite, Flush, Register, } #[derive(Clone, Debug)] -struct CompactWriters { - metadata: MetadataSegmentWriter<'static>, - record: RecordSegmentWriter, - vector: Box, +pub(crate) struct CompactWriters { + pub(crate) metadata: MetadataSegmentWriter<'static>, + pub(crate) record: RecordSegmentWriter, + pub(crate) vector: Box, } #[derive(Debug)] @@ -96,23 +102,21 @@ pub struct CompactOrchestrator { hnsw_index_provider: HnswIndexProvider, // State we hold across the execution pulled_log_offset: Option, - record_segment: Option, // Dispatcher dispatcher: ComponentHandle, - // Shared writers - writers: Option, // number of write segments tasks num_write_tasks: i32, // Result Channel result_channel: Option>>, - // Next offset id - next_offset_id: Arc, max_compaction_size: usize, max_partition_size: usize, + // Populated during the compaction process + cached_segments: Option>, + writers: OnceCell, } #[derive(Error, Debug)] -enum GetSegmentWritersError { +pub enum GetSegmentWritersError { #[error("No segments found for collection")] NoSegmentsFound, #[error("SysDB GetSegments Error")] @@ -123,16 +127,12 @@ enum GetSegmentWritersError { MetadataSegmentWriterError, #[error("Error creating HNSW Segment Writer")] HnswSegmentWriterError, - #[error("No record segment found for collection")] - NoRecordSegmentFound, - #[error("No metadata segment found for collection")] - NoMetadataSegmentFound, #[error("Collection not found")] CollectionNotFound, #[error("Error getting collection")] GetCollectionError(#[from] GetCollectionsError), - #[error("No hnsw segment found for collection")] - NoHnswSegmentFound, + #[error("Collection is missing dimension")] + CollectionMissingDimension, } impl ChromaError for GetSegmentWritersError { @@ -149,9 +149,15 @@ pub enum CompactionError { FetchLog(#[from] FetchLogError), #[error("Partition error: {0}")] Partition(#[from] PartitionError), + #[error("MaterializeLogs error: {0}")] + MaterializeLogs(#[from] MaterializeLogOperatorError), #[error("WriteSegments error: {0}")] WriteSegments(#[from] WriteSegmentsOperatorError), - #[error("Regester error: {0}")] + #[error("Could not create record segment reader: {0}")] + RecordSegmentReaderCreationFailed(#[from] RecordSegmentReaderCreationError), + #[error("GetSegmentWriters error: {0}")] + GetSegmentWriters(#[from] GetSegmentWritersError), + #[error("Register error: {0}")] Register(#[from] RegisterError), #[error("Error sending message through channel: {0}")] Channel(#[from] ChannelError), @@ -200,8 +206,6 @@ impl CompactOrchestrator { hnsw_index_provider: HnswIndexProvider, dispatcher: ComponentHandle, result_channel: Option>>, - record_segment: Option, - next_offset_id: Arc, max_compaction_size: usize, max_partition_size: usize, ) -> Self { @@ -218,11 +222,10 @@ impl CompactOrchestrator { dispatcher, num_write_tasks: 0, result_channel, - record_segment, - next_offset_id, max_compaction_size, max_partition_size, - writers: None, + cached_segments: None, + writers: OnceCell::new(), } } @@ -240,47 +243,88 @@ impl CompactOrchestrator { self.send(task, ctx).await; } - async fn write( + async fn materialize_log( &mut self, partitions: Vec>, + self_address: Box< + dyn ReceiverForMessage>, + >, ctx: &crate::system::ComponentContext, ) { - self.state = ExecutionState::Write; + self.state = ExecutionState::MaterializeAndWrite; - let init_res = self.init_segment_writers().await; - if self.ok_or_terminate(init_res, ctx).is_none() { - return; - } - let (record_segment_writer, hnsw_segment_writer, metadata_segment_writer) = - match self.writers.clone() { - Some(writers) => ( - Some(writers.record), - Some(writers.vector), - Some(writers.metadata), - ), - None => (None, None, None), - }; + let record_segment_result = self.get_segment(SegmentType::BlockfileRecord).await; + let record_segment = match self.ok_or_terminate(record_segment_result, ctx) { + Some(segment) => segment, + None => return, + }; + + let next_max_offset_id = match self.ok_or_terminate( + match RecordSegmentReader::from_segment(&record_segment, &self.blockfile_provider).await + { + Ok(reader) => { + let current_max_offset_id = reader.get_current_max_offset_id(); + current_max_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(current_max_offset_id) + } + Err(err) => match *err { + RecordSegmentReaderCreationError::UninitializedSegment => { + Ok(Arc::new(AtomicU32::new(0))) + } + _ => Err(*err), + }, + }, + ctx, + ) { + Some(offset) => offset, + None => return, + }; self.num_write_tasks = partitions.len() as i32; - for parition in partitions.iter() { - let operator = WriteSegmentsOperator::new(); - let input = WriteSegmentsInput::new( - record_segment_writer.clone(), - hnsw_segment_writer.clone(), - metadata_segment_writer.clone(), - parition.clone(), + for partition in partitions.iter() { + let operator = MaterializeLogOperator::new(); + let input = MaterializeLogInput::new( + partition.clone(), self.blockfile_provider.clone(), - self.record_segment - .as_ref() - .expect("WriteSegmentsInput: Record segment not set in the input") - .clone(), - self.next_offset_id.clone(), + record_segment.clone(), + next_max_offset_id.clone(), ); - let task = wrap(operator, input, ctx.receiver()); + let task = wrap(operator, input, self_address.clone()); self.send(task, ctx).await; } } + async fn write( + &mut self, + materialized_logs: MaterializeLogsResult, + self_address: Box< + dyn ReceiverForMessage>, + >, + ctx: &crate::system::ComponentContext, + ) { + let writers = self.get_segment_writers().await; + let writers = match self.ok_or_terminate(writers, ctx) { + Some(writers) => writers, + None => return, + }; + + let record_segment = self.get_segment(SegmentType::BlockfileRecord).await; + let record_segment = match self.ok_or_terminate(record_segment, ctx) { + Some(segment) => segment, + None => return, + }; + + let operator = WriteSegmentsOperator::new(); + let input = WriteSegmentsInput::new( + writers, + self.blockfile_provider.clone(), + record_segment, + materialized_logs, + ); + let task = wrap(operator, input, self_address); + self.send(task, ctx).await; + } + async fn flush_s3( &mut self, record_segment_writer: RecordSegmentWriter, @@ -323,139 +367,127 @@ impl CompactOrchestrator { self.send(task, ctx).await; } - async fn init_segment_writers(&mut self) -> Result<(), Box> { - // Care should be taken to use the same writers across the compaction process - // Since the segment writers are stateful, we should not create new writers for each partition - // Nor should we create new writers across different tasks - // This method is for convenience to create the writers in a single place - // It is not meant to be called multiple times in the same compaction job + async fn get_all_segments(&mut self) -> Result, GetSegmentsError> { + if let Some(segments) = &self.cached_segments { + return Ok(segments.clone()); + } let segments = self .sysdb .get_segments(None, None, None, self.collection_id) - .await; - - tracing::info!("Retrived segments: {:?}", segments); + .await?; - let segments = match segments { - Ok(segments) => { - if segments.is_empty() { - return Err(Box::new(GetSegmentWritersError::NoSegmentsFound)); - } - segments - } - Err(e) => { - return Err(Box::new(GetSegmentWritersError::SysDbGetSegmentsError(e))); - } - }; - - let record_segment = segments - .iter() - .find(|segment| segment.r#type == SegmentType::BlockfileRecord); - - tracing::debug!("Found Record Segment: {:?}", record_segment); - - if record_segment.is_none() { - return Err(Box::new(GetSegmentWritersError::NoRecordSegmentFound)); - } - // Create a record segment writer - let record_segment = record_segment.unwrap(); - let record_segment_writer = - match RecordSegmentWriter::from_segment(record_segment, &self.blockfile_provider).await - { - Ok(writer) => writer, - Err(e) => { - tracing::error!("Error creating Record Segment Writer: {:?}", e); - return Err(Box::new(GetSegmentWritersError::RecordSegmentWriterError)); - } - }; - - tracing::debug!("Record Segment Writer created"); - match RecordSegmentReader::from_segment(record_segment, &self.blockfile_provider).await { - Ok(reader) => { - self.next_offset_id = Arc::new(AtomicU32::new( - reader - .get_current_max_offset_id() - .load(atomic::Ordering::SeqCst) - + 1, - )); - } - Err(_) => { - self.next_offset_id = Arc::new(AtomicU32::new(1)); - } - }; - self.record_segment = Some(record_segment.clone()); // auto deref. + self.cached_segments = Some(segments.clone()); + Ok(segments) + } - let metadata_segment = segments + async fn get_segment( + &mut self, + segment_type: SegmentType, + ) -> Result { + let segments = self.get_all_segments().await?; + let segment = segments .iter() - .find(|segment| segment.r#type == SegmentType::BlockfileMetadata); + .find(|segment| segment.r#type == segment_type) + .cloned(); - tracing::debug!("Found metadata segment {:?}", metadata_segment); + tracing::debug!("Found {:?} segment: {:?}", segment_type, segment); - if metadata_segment.is_none() { - return Err(Box::new(GetSegmentWritersError::NoMetadataSegmentFound)); + match segment { + Some(segment) => Ok(segment), + None => Err(GetSegmentWritersError::NoSegmentsFound), } - // Create a record segment writer - let mt_segment = metadata_segment.unwrap(); // safe to unwrap here. - let mt_segment_writer = - match MetadataSegmentWriter::from_segment(mt_segment, &self.blockfile_provider).await { - Ok(writer) => writer, - Err(e) => { - println!("Error creating metadata Segment Writer: {:?}", e); - return Err(Box::new(GetSegmentWritersError::MetadataSegmentWriterError)); - } - }; + } - tracing::debug!("Metadata Segment Writer created"); + async fn get_segment_writers(&mut self) -> Result { + // Care should be taken to use the same writers across the compaction process + // Since the segment writers are stateful, we should not create new writers for each partition + // Nor should we create new writers across different tasks - // Create a hnsw segment writer - let collection_res = self - .sysdb - .get_collections(Some(self.collection_id), None, None, None) - .await; + let blockfile_provider = self.blockfile_provider.clone(); + let hnsw_provider = self.hnsw_index_provider.clone(); + let mut sysdb = self.sysdb.clone(); + + let record_segment = self.get_segment(SegmentType::BlockfileRecord).await?; + let mt_segment = self.get_segment(SegmentType::BlockfileMetadata).await?; + let hnsw_segment = self.get_segment(SegmentType::HnswDistributed).await?; + + let borrowed_writers = self + .writers + .get_or_try_init::(|| async { + // Create a record segment writer + let record_segment_writer = + match RecordSegmentWriter::from_segment(&record_segment, &blockfile_provider) + .await + { + Ok(writer) => writer, + Err(e) => { + tracing::error!("Error creating Record Segment Writer: {:?}", e); + return Err(GetSegmentWritersError::RecordSegmentWriterError); + } + }; + + tracing::debug!("Record Segment Writer created"); + + // Create a record segment writer + let mt_segment_writer = + match MetadataSegmentWriter::from_segment(&mt_segment, &blockfile_provider) + .await + { + Ok(writer) => writer, + Err(e) => { + tracing::error!("Error creating metadata segment writer: {:?}", e); + return Err(GetSegmentWritersError::MetadataSegmentWriterError); + } + }; + + tracing::debug!("Metadata Segment Writer created"); + + // Create a hnsw segment writer + let collection_res = sysdb + .get_collections(Some(self.collection_id), None, None, None) + .await; - let collection_res = match collection_res { - Ok(collections) => { - if collections.is_empty() { - return Err(Box::new(GetSegmentWritersError::CollectionNotFound)); + let collection_res = match collection_res { + Ok(collections) => { + if collections.is_empty() { + return Err(GetSegmentWritersError::CollectionNotFound); + } + collections + } + Err(e) => { + return Err(GetSegmentWritersError::GetCollectionError(e)); + } + }; + let collection = &collection_res[0]; + + if let Some(dimension) = collection.dimension { + let hnsw_segment_writer = match DistributedHNSWSegmentWriter::from_segment( + &hnsw_segment, + dimension as usize, + hnsw_provider, + ) + .await + { + Ok(writer) => writer, + Err(e) => { + tracing::error!("Error creating HNSW segment writer: {:?}", e); + return Err(GetSegmentWritersError::HnswSegmentWriterError); + } + }; + + return Ok(CompactWriters { + metadata: mt_segment_writer, + record: record_segment_writer, + vector: hnsw_segment_writer, + }); } - collections - } - Err(e) => { - return Err(Box::new(GetSegmentWritersError::GetCollectionError(e))); - } - }; - let collection = &collection_res[0]; - let hnsw_segment = segments - .iter() - .find(|segment| segment.r#type == SegmentType::HnswDistributed); - if hnsw_segment.is_none() { - return Err(Box::new(GetSegmentWritersError::NoHnswSegmentFound)); - } - let hnsw_segment = hnsw_segment.unwrap(); - if let Some(dim) = collection.dimension { - let hnsw_segment_writer = match DistributedHNSWSegmentWriter::from_segment( - hnsw_segment, - dim as usize, - self.hnsw_index_provider.clone(), - ) - .await - { - Ok(writer) => writer, - Err(e) => { - println!("Error creating HNSW Segment Writer: {:?}", e); - return Err(Box::new(GetSegmentWritersError::HnswSegmentWriterError)); - } - }; - self.writers = Some(CompactWriters { - metadata: mt_segment_writer, - record: record_segment_writer, - vector: hnsw_segment_writer, + Err(GetSegmentWritersError::CollectionMissingDimension) }) - } + .await?; - Ok(()) + Ok(borrowed_writers.clone()) } } @@ -542,7 +574,37 @@ impl Handler> for CompactOrchestrato Some(recs) => recs.records, None => todo!(), }; - self.write(records, ctx).await; + self.materialize_log(records, ctx.receiver(), ctx).await; + } +} + +#[async_trait] +impl Handler> + for CompactOrchestrator +{ + type Result = (); + + async fn handle( + &mut self, + message: TaskResult, + ctx: &crate::system::ComponentContext, + ) { + let materialized_result = match self.ok_or_terminate(message.into_inner(), ctx) { + Some(result) => result, + None => return, + }; + + if materialized_result.is_empty() { + self.num_write_tasks -= 1; + + if self.num_write_tasks == 0 { + // There is nothing to flush, proceed to register + self.register(self.pulled_log_offset.unwrap(), Arc::new([]), ctx) + .await; + } + } else { + self.write(materialized_result, ctx.receiver(), ctx).await; + } } } @@ -561,17 +623,13 @@ impl Handler> for Co }; self.num_write_tasks -= 1; if self.num_write_tasks == 0 { - if let (Some(rec), Some(hnsw), Some(mt)) = ( - output.record_segment_writer, - output.hnsw_segment_writer, - output.metadata_segment_writer, - ) { - self.flush_s3(rec, hnsw, mt, ctx).await; - } else { - // There is nothing to flush, proceed to register - self.register(self.pulled_log_offset.unwrap(), Arc::new([]), ctx) - .await; - } + self.flush_s3( + output.writers.record, + output.writers.vector, + output.writers.metadata, + ctx, + ) + .await; } } }