Skip to content

Commit

Permalink
[ENH]: move materialization into operator (#3357)
Browse files Browse the repository at this point in the history
## Description of changes

Log materialization is now in its own operator. Having materialization in its own operator unlocks two main benefits:

- allows us to pipeline log applications across segment types (currently unrealized)
- we can easily bail for any partition with an empty set of materialized records before constructing writers

## Test plan
*How are these changes tested?*

- [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

Tested locally with SciDocs as well.

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
codetheweb authored Jan 3, 2025
1 parent 6145277 commit 19e1971
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 291 deletions.
4 changes: 0 additions & 4 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::instrument;
Expand Down Expand Up @@ -123,8 +121,6 @@ impl CompactionManager {
self.hnsw_index_provider.clone(),
dispatcher,
None,
None,
Arc::new(AtomicU32::new(1)),
self.max_compaction_size,
self.max_partition_size,
);
Expand Down
97 changes: 97 additions & 0 deletions rust/worker/src/execution/operators/materialize_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use crate::execution::operator::Operator;
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::{materialize_logs, record_segment::RecordSegmentReader};
use crate::segment::{LogMaterializerError, MaterializeLogsResult};
use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_error::ChromaError;
use chroma_types::{Chunk, LogRecord, Segment};
use futures::TryFutureExt;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum MaterializeLogOperatorError {
#[error("Could not create record segment reader: {0}")]
RecordSegmentReaderCreationFailed(#[from] RecordSegmentReaderCreationError),
#[error("Log materialization failed: {0}")]
LogMaterializationFailed(#[from] LogMaterializerError),
}

impl ChromaError for MaterializeLogOperatorError {
fn code(&self) -> chroma_error::ErrorCodes {
match self {
MaterializeLogOperatorError::RecordSegmentReaderCreationFailed(e) => e.code(),
MaterializeLogOperatorError::LogMaterializationFailed(e) => e.code(),
}
}
}

#[derive(Debug)]
pub struct MaterializeLogOperator {}

impl MaterializeLogOperator {
pub fn new() -> Box<Self> {
Box::new(MaterializeLogOperator {})
}
}

#[derive(Debug)]
pub struct MaterializeLogInput {
logs: Chunk<LogRecord>,
provider: BlockfileProvider,
record_segment: Segment,
offset_id: Arc<AtomicU32>,
}

impl MaterializeLogInput {
pub fn new(
logs: Chunk<LogRecord>,
provider: BlockfileProvider,
record_segment: Segment,
offset_id: Arc<AtomicU32>,
) -> Self {
MaterializeLogInput {
logs,
provider,
record_segment,
offset_id,
}
}
}

#[async_trait]
impl Operator<MaterializeLogInput, MaterializeLogsResult> for MaterializeLogOperator {
type Error = MaterializeLogOperatorError;

async fn run(&self, input: &MaterializeLogInput) -> Result<MaterializeLogsResult, Self::Error> {
tracing::debug!("Materializing {} log entries", input.logs.total_len());

let record_segment_reader =
match RecordSegmentReader::from_segment(&input.record_segment, &input.provider).await {
Ok(reader) => Some(reader),
Err(e) => {
match *e {
// Uninitialized segment is fine and means that the record
// segment is not yet initialized in storage.
RecordSegmentReaderCreationError::UninitializedSegment => None,
err => {
tracing::error!("Error creating record segment reader: {:?}", err);
return Err(
MaterializeLogOperatorError::RecordSegmentReaderCreationFailed(err),
);
}
}
}
};

materialize_logs(
&record_segment_reader,
input.logs.clone(),
Some(input.offset_id.clone()),
)
.map_err(MaterializeLogOperatorError::LogMaterializationFailed)
.await
}
}
1 change: 1 addition & 0 deletions rust/worker/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(super) mod count_records;
pub(super) mod flush_s3;
pub mod materialize_logs;
pub(super) mod partition;
pub(super) mod register;
pub mod spann_bf_pl;
Expand Down
161 changes: 54 additions & 107 deletions rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,42 @@
use crate::segment::materialize_logs;
use crate::execution::operator::Operator;
use crate::execution::orchestration::CompactWriters;
use crate::segment::metadata_segment::MetadataSegmentError;
use crate::segment::metadata_segment::MetadataSegmentWriter;
use crate::segment::record_segment::ApplyMaterializedLogError;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::LogMaterializerError;
use crate::segment::MaterializeLogsResult;
use crate::segment::SegmentWriter;
use crate::{
execution::operator::Operator,
segment::{
distributed_hnsw_segment::DistributedHNSWSegmentWriter, record_segment::RecordSegmentWriter,
},
};
use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_error::ChromaError;
use chroma_error::ErrorCodes;
use chroma_types::Chunk;
use chroma_types::LogRecord;
use chroma_types::Segment;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use thiserror::Error;
use tracing::Instrument;
use tracing::Span;

#[derive(Error, Debug)]
pub enum WriteSegmentsOperatorError {
#[error("Log materialization result empty")]
LogMaterializationResultEmpty,
#[error("Preparation for log materialization failed {0}")]
LogMaterializationPreparationError(#[from] RecordSegmentReaderCreationError),
#[error("Log materialization failed {0}")]
LogMaterializationError(#[from] LogMaterializerError),
#[error("Materialized logs failed to apply {0}")]
ApplyMaterializatedLogsError(#[from] ApplyMaterializedLogError),
ApplyMaterializedLogsError(#[from] ApplyMaterializedLogError),
#[error("Materialized logs failed to apply {0}")]
ApplyMaterializatedLogsErrorMetadataSegment(#[from] MetadataSegmentError),
#[error("Unitialized writer")]
UnintializedWriter,
ApplyMaterializedLogsErrorMetadataSegment(#[from] MetadataSegmentError),
}

impl ChromaError for WriteSegmentsOperatorError {
fn code(&self) -> ErrorCodes {
match self {
WriteSegmentsOperatorError::LogMaterializationResultEmpty => ErrorCodes::Internal,
WriteSegmentsOperatorError::LogMaterializationPreparationError(e) => e.code(),
WriteSegmentsOperatorError::LogMaterializationError(e) => e.code(),
WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e) => e.code(),
WriteSegmentsOperatorError::ApplyMaterializatedLogsErrorMetadataSegment(e) => e.code(),
WriteSegmentsOperatorError::UnintializedWriter => ErrorCodes::Internal,
WriteSegmentsOperatorError::ApplyMaterializedLogsError(e) => e.code(),
WriteSegmentsOperatorError::ApplyMaterializedLogsErrorMetadataSegment(e) => e.code(),
}
}
}
Expand All @@ -62,42 +52,31 @@ impl WriteSegmentsOperator {

#[derive(Debug)]
pub struct WriteSegmentsInput {
record_segment_writer: Option<RecordSegmentWriter>,
hnsw_segment_writer: Option<Box<DistributedHNSWSegmentWriter>>,
metadata_segment_writer: Option<MetadataSegmentWriter<'static>>,
chunk: Chunk<LogRecord>,
writers: CompactWriters,
provider: BlockfileProvider,
record_segment: Segment,
next_offset_id: Arc<AtomicU32>,
materialized_logs: MaterializeLogsResult,
}

impl WriteSegmentsInput {
pub fn new(
record_segment_writer: Option<RecordSegmentWriter>,
hnsw_segment_writer: Option<Box<DistributedHNSWSegmentWriter>>,
metadata_segment_writer: Option<MetadataSegmentWriter<'static>>,
chunk: Chunk<LogRecord>,
writers: CompactWriters,
provider: BlockfileProvider,
record_segment: Segment,
next_offset_id: Arc<AtomicU32>,
materialized_logs: MaterializeLogsResult,
) -> Self {
WriteSegmentsInput {
record_segment_writer,
hnsw_segment_writer,
metadata_segment_writer,
chunk,
writers,
provider,
record_segment,
next_offset_id,
materialized_logs,
}
}
}

#[derive(Debug)]
pub struct WriteSegmentsOutput {
pub(crate) record_segment_writer: Option<RecordSegmentWriter>,
pub(crate) hnsw_segment_writer: Option<Box<DistributedHNSWSegmentWriter>>,
pub(crate) metadata_segment_writer: Option<MetadataSegmentWriter<'static>>,
pub(crate) writers: CompactWriters,
}

#[async_trait]
Expand All @@ -109,7 +88,10 @@ impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator
}

async fn run(&self, input: &WriteSegmentsInput) -> Result<WriteSegmentsOutput, Self::Error> {
tracing::debug!("Materializing N Records: {:?}", input.chunk.len());
if input.materialized_logs.is_empty() {
return Err(WriteSegmentsOperatorError::LogMaterializationResultEmpty);
}

// Prepare for log materialization.
let record_segment_reader: Option<RecordSegmentReader>;
match RecordSegmentReader::from_segment(&input.record_segment, &input.provider).await {
Expand Down Expand Up @@ -160,78 +142,43 @@ impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator
};
}
};
// Materialize the logs.
let res = match materialize_logs(
&record_segment_reader,
input.chunk.clone(),
Some(input.next_offset_id.clone()),
)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Ok(records) => records,
Err(e) => {
tracing::error!("Error materializing records {}", e);
return Err(WriteSegmentsOperatorError::LogMaterializationError(e));
}
};

if !res.is_empty() {
// Apply materialized records.
match input
.record_segment_writer
.as_ref()
.ok_or(WriteSegmentsOperatorError::UnintializedWriter)?
.apply_materialized_log_chunk(&record_segment_reader, &res)
.instrument(tracing::trace_span!(
"Apply materialized logs to record segment"
))
.await
{
Ok(()) => (),
Err(e) => {
return Err(WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e));
}
}
tracing::debug!("Applied materialized records to record segment");
match input
.metadata_segment_writer
.as_ref()
.ok_or(WriteSegmentsOperatorError::UnintializedWriter)?
.apply_materialized_log_chunk(&record_segment_reader, &res)
.instrument(tracing::trace_span!(
"Apply materialized logs to metadata segment"
))
.await
{
Ok(()) => (),
Err(e) => {
return Err(WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e));
}
}
tracing::debug!("Applied materialized records to metadata segment");
match input
.hnsw_segment_writer
.as_ref()
.ok_or(WriteSegmentsOperatorError::UnintializedWriter)?
.apply_materialized_log_chunk(&record_segment_reader, &res)
.instrument(tracing::trace_span!(
"Apply materialized logs to HNSW segment"
))
.await
{
Ok(()) => (),
Err(e) => {
return Err(WriteSegmentsOperatorError::ApplyMaterializatedLogsError(e));
}
}
}
// Apply materialized records.
input
.writers
.record
.apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs)
.instrument(tracing::trace_span!(
"Apply materialized logs to record segment"
))
.await
.map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?;
tracing::debug!("Applied materialized records to record segment");

input
.writers
.metadata
.apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs)
.instrument(tracing::trace_span!(
"Apply materialized logs to metadata segment"
))
.await
.map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?;
tracing::debug!("Applied materialized records to metadata segment");

input
.writers
.vector
.apply_materialized_log_chunk(&record_segment_reader, &input.materialized_logs)
.instrument(tracing::trace_span!(
"Apply materialized logs to HNSW segment"
))
.await
.map_err(WriteSegmentsOperatorError::ApplyMaterializedLogsError)?;
tracing::debug!("Applied Materialized Records to HNSW Segment");

Ok(WriteSegmentsOutput {
record_segment_writer: input.record_segment_writer.clone(),
hnsw_segment_writer: input.hnsw_segment_writer.clone(),
metadata_segment_writer: input.metadata_segment_writer.clone(),
writers: input.writers.clone(),
})
}
}
Loading

0 comments on commit 19e1971

Please sign in to comment.