Skip to content

Commit

Permalink
[ENH]: parallelize applying materialized log to segment writers (#3359)
Browse files Browse the repository at this point in the history
## Description of changes

Applies log updates to segment types in parallel rather than sequentially:

<img width="1409" alt="Screenshot 2024-12-27 at 10 30 03" src="https://github.com/user-attachments/assets/9a521c6e-c454-49b3-8e1b-f2d50072e394" />

Pipelining flushes to S3/applying to blockfile will be in the next PR in this stack.

## Test plan
*How are these changes tested?*

- [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

Also tested with SciDocs.

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*

n/a
  • Loading branch information
codetheweb authored Jan 3, 2025
1 parent 19e1971 commit 894663c
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 216 deletions.
119 changes: 119 additions & 0 deletions rust/worker/src/execution/operators/apply_log_to_segment_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::execution::operator::Operator;
use crate::segment::metadata_segment::MetadataSegmentError;
use crate::segment::record_segment::ApplyMaterializedLogError;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::LogMaterializerError;
use crate::segment::MaterializeLogsResult;
use crate::segment::SegmentWriter;
use async_trait::async_trait;
use chroma_error::ChromaError;
use chroma_error::ErrorCodes;
use thiserror::Error;
use tracing::Instrument;

#[derive(Error, Debug)]
pub enum ApplyLogToSegmentWriterOperatorError {
#[error("Log materialization result is empty")]
LogMaterializationResultEmpty,
#[error("Preparation for log materialization failed {0}")]
LogMaterializationPreparationError(#[from] RecordSegmentReaderCreationError),
#[error("Log materialization failed {0}")]
LogMaterializationError(#[from] LogMaterializerError),
#[error("Materialized logs failed to apply {0}")]
ApplyMaterializedLogsError(#[from] ApplyMaterializedLogError),
#[error("Materialized logs failed to apply {0}")]
ApplyMaterializedLogsErrorMetadataSegment(#[from] MetadataSegmentError),
}

impl ChromaError for ApplyLogToSegmentWriterOperatorError {
fn code(&self) -> ErrorCodes {
match self {
ApplyLogToSegmentWriterOperatorError::LogMaterializationResultEmpty => {
ErrorCodes::Internal
}
ApplyLogToSegmentWriterOperatorError::LogMaterializationPreparationError(e) => e.code(),
ApplyLogToSegmentWriterOperatorError::LogMaterializationError(e) => e.code(),
ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(e) => e.code(),
ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsErrorMetadataSegment(e) => {
e.code()
}
}
}
}

#[derive(Debug)]
pub struct ApplyLogToSegmentWriterOperator {}

impl ApplyLogToSegmentWriterOperator {
pub fn new() -> Box<Self> {
Box::new(ApplyLogToSegmentWriterOperator {})
}
}

#[derive(Debug)]
pub struct ApplyLogToSegmentWriterInput<'bf, Writer: SegmentWriter> {
segment_writer: Writer,
materialized_logs: MaterializeLogsResult,
record_segment_reader: Option<RecordSegmentReader<'bf>>,
}

impl<'bf, Writer: SegmentWriter> ApplyLogToSegmentWriterInput<'bf, Writer> {
pub fn new(
segment_writer: Writer,
materialized_logs: MaterializeLogsResult,
record_segment_reader: Option<RecordSegmentReader<'bf>>,
) -> Self {
ApplyLogToSegmentWriterInput {
segment_writer,
materialized_logs,
record_segment_reader,
}
}
}

#[derive(Debug)]
pub struct ApplyLogToSegmentWriterOutput {}

#[async_trait]
impl<Writer: SegmentWriter + Send + Sync + Clone>
Operator<ApplyLogToSegmentWriterInput<'_, Writer>, ApplyLogToSegmentWriterOutput>
for ApplyLogToSegmentWriterOperator
{
type Error = ApplyLogToSegmentWriterOperatorError;

fn get_name(&self) -> &'static str {
"ApplyLogToSegmentWriterOperator"
}

async fn run(
&self,
input: &ApplyLogToSegmentWriterInput<Writer>,
) -> Result<ApplyLogToSegmentWriterOutput, Self::Error> {
if input.materialized_logs.is_empty() {
return Err(ApplyLogToSegmentWriterOperatorError::LogMaterializationResultEmpty);
}

// Apply materialized records.
match input
.segment_writer
.apply_materialized_log_chunk(&input.record_segment_reader, &input.materialized_logs)
.instrument(tracing::trace_span!(
"Apply materialized logs",
otel.name = format!(
"Apply materialized logs to segment writer {}",
input.segment_writer.get_name()
),
segment = input.segment_writer.get_name()
))
.await
{
Ok(()) => (),
Err(e) => {
return Err(ApplyLogToSegmentWriterOperatorError::ApplyMaterializedLogsError(e));
}
}

Ok(ApplyLogToSegmentWriterOutput {})
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod apply_log_to_segment_writer;
pub(super) mod count_records;
pub(super) mod flush_s3;
pub mod materialize_logs;
Expand All @@ -7,7 +8,6 @@ pub mod spann_bf_pl;
pub(super) mod spann_centers_search;
pub(super) mod spann_fetch_pl;
pub mod spann_knn_merge;
pub(super) mod write_segments;

// Required for benchmark
pub mod fetch_log;
Expand Down
184 changes: 0 additions & 184 deletions rust/worker/src/execution/operators/write_segments.rs

This file was deleted.

Loading

0 comments on commit 894663c

Please sign in to comment.