Skip to content

Commit

Permalink
Remove SegmentWriter trait
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jan 3, 2025
1 parent 2d911d2 commit 8ff398b
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 180 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::segment::metadata_segment::MetadataSegmentError;
use crate::segment::record_segment::ApplyMaterializedLogError;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::ChromaSegmentWriter;
use crate::segment::LogMaterializerError;
use crate::segment::MaterializeLogsResult;
use crate::segment::SegmentWriter;
use async_trait::async_trait;
use chroma_error::ChromaError;
use chroma_error::ErrorCodes;
Expand Down Expand Up @@ -53,15 +53,15 @@ impl ApplyLogToSegmentWriterOperator {
}

#[derive(Debug)]
pub struct ApplyLogToSegmentWriterInput<'bf, Writer: SegmentWriter> {
segment_writer: Writer,
pub struct ApplyLogToSegmentWriterInput<'bf> {
segment_writer: ChromaSegmentWriter<'bf>,
materialized_logs: MaterializeLogsResult,
record_segment_reader: Option<RecordSegmentReader<'bf>>,
}

impl<'bf, Writer: SegmentWriter> ApplyLogToSegmentWriterInput<'bf, Writer> {
impl<'bf> ApplyLogToSegmentWriterInput<'bf> {
pub fn new(
segment_writer: Writer,
segment_writer: ChromaSegmentWriter<'bf>,
materialized_logs: MaterializeLogsResult,
record_segment_reader: Option<RecordSegmentReader<'bf>>,
) -> Self {
Expand All @@ -79,8 +79,7 @@ pub struct ApplyLogToSegmentWriterOutput {
}

#[async_trait]
impl<Writer: SegmentWriter + Send + Sync + Clone>
Operator<ApplyLogToSegmentWriterInput<'_, Writer>, ApplyLogToSegmentWriterOutput>
impl Operator<ApplyLogToSegmentWriterInput<'_>, ApplyLogToSegmentWriterOutput>
for ApplyLogToSegmentWriterOperator
{
type Error = ApplyLogToSegmentWriterOperatorError;
Expand All @@ -91,7 +90,7 @@ impl<Writer: SegmentWriter + Send + Sync + Clone>

async fn run(
&self,
input: &ApplyLogToSegmentWriterInput<Writer>,
input: &ApplyLogToSegmentWriterInput,
) -> Result<ApplyLogToSegmentWriterOutput, Self::Error> {
if input.materialized_logs.is_empty() {
return Err(ApplyLogToSegmentWriterOperatorError::LogMaterializationResultEmpty);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::execution::operator::Operator;
use crate::segment::ChromaSegmentFlusher;
use crate::segment::ChromaSegmentWriter;
use crate::segment::SegmentWriter;
use async_trait::async_trait;
use chroma_error::ChromaError;
use chroma_error::ErrorCodes;
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
operator::Operator,
operators::count_records::{CountRecordsInput, CountRecordsOperator},
},
segment::{record_segment::RecordSegmentWriter, SegmentWriter},
segment::record_segment::RecordSegmentWriter,
};
use chroma_blockstore::provider::BlockfileProvider;
use chroma_types::{Chunk, CollectionUuid, LogRecord, Operation, OperationRecord, SegmentUuid};
Expand Down
31 changes: 15 additions & 16 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::segment::ChromaSegmentFlusher;
use crate::segment::ChromaSegmentWriter;
use crate::segment::MaterializeLogsResult;
use crate::segment::SegmentFlusher;
use crate::segment::SegmentWriter;
use crate::sysdb::sysdb::GetCollectionsError;
use crate::sysdb::sysdb::GetSegmentsError;
use crate::sysdb::sysdb::SysDb;
Expand Down Expand Up @@ -361,16 +360,17 @@ impl CompactOrchestrator {

{
self.num_uncompleted_tasks_by_segment
.entry(writers.metadata.get_id())
.entry(writers.metadata.id)
.and_modify(|v| {
*v += 1;
})
.or_insert(1);

let span = self.get_segment_writer_span(&writers.metadata);
let writer = ChromaSegmentWriter::MetadataSegment(writers.metadata);
let span = self.get_segment_writer_span(&writer);
let operator = ApplyLogToSegmentWriterOperator::new();
let input = ApplyLogToSegmentWriterInput::new(
writers.metadata,
writer,
materialized_logs.clone(),
record_segment_reader.clone(),
);
Expand All @@ -384,16 +384,17 @@ impl CompactOrchestrator {

{
self.num_uncompleted_tasks_by_segment
.entry(writers.record.get_id())
.entry(writers.record.id)
.and_modify(|v| {
*v += 1;
})
.or_insert(1);

let span = self.get_segment_writer_span(&writers.record);
let writer = ChromaSegmentWriter::RecordSegment(writers.record);
let span = self.get_segment_writer_span(&writer);
let operator = ApplyLogToSegmentWriterOperator::new();
let input = ApplyLogToSegmentWriterInput::new(
writers.record,
writer,
materialized_logs.clone(),
record_segment_reader.clone(),
);
Expand All @@ -413,13 +414,11 @@ impl CompactOrchestrator {
})
.or_insert(1);

let span = self.get_segment_writer_span(writers.vector.as_ref());
let writer = ChromaSegmentWriter::DistributedHNSWSegment(writers.vector);
let span = self.get_segment_writer_span(&writer);
let operator = ApplyLogToSegmentWriterOperator::new();
let input = ApplyLogToSegmentWriterInput::new(
*writers.vector,
materialized_logs,
record_segment_reader,
);
let input =
ApplyLogToSegmentWriterInput::new(writer, materialized_logs, record_segment_reader);
let task = wrap(operator, input, self_address);
let res = self.dispatcher().send(task, Some(span)).await;
self.ok_or_terminate(res, ctx);
Expand Down Expand Up @@ -612,11 +611,11 @@ impl CompactOrchestrator {
) -> Result<ChromaSegmentWriter<'static>, GetSegmentWritersError> {
let writers = self.get_segment_writers().await?;

if writers.metadata.get_id() == segment_id {
if writers.metadata.id == segment_id {
return Ok(ChromaSegmentWriter::MetadataSegment(writers.metadata));
}

if writers.record.get_id() == segment_id {
if writers.record.id == segment_id {
return Ok(ChromaSegmentWriter::RecordSegment(writers.record));
}

Expand All @@ -627,7 +626,7 @@ impl CompactOrchestrator {
Err(GetSegmentWritersError::NoSegmentsFound)
}

fn get_segment_writer_span<W: SegmentWriter>(&mut self, writer: &W) -> Span {
fn get_segment_writer_span(&mut self, writer: &ChromaSegmentWriter) -> Span {
let span = self
.segment_spans
.entry(writer.get_id())
Expand Down
18 changes: 3 additions & 15 deletions rust/worker/src/segment/distributed_hnsw_segment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::record_segment::{ApplyMaterializedLogError, RecordSegmentReader};
use super::utils::hnsw_params_from_segment;
use super::{MaterializeLogsResult, SegmentFlusher, SegmentWriter};
use super::{MaterializeLogsResult, SegmentFlusher};
use crate::segment::utils::distance_function_from_segment;
use async_trait::async_trait;
use chroma_distance::DistanceFunctionError;
Expand Down Expand Up @@ -190,20 +190,8 @@ impl DistributedHNSWSegmentWriter {
)))
}
}
}

impl SegmentWriter for DistributedHNSWSegmentWriter {
type Flusher = DistributedHNSWSegmentWriter;

fn get_id(&self) -> SegmentUuid {
self.id
}

fn get_name(&self) -> &'static str {
"DistributedHNSWSegmentWriter"
}

async fn apply_materialized_log_chunk(
pub async fn apply_materialized_log_chunk(
&self,
record_segment_reader: &Option<RecordSegmentReader<'_>>,
materialized: &MaterializeLogsResult,
Expand Down Expand Up @@ -266,7 +254,7 @@ impl SegmentWriter for DistributedHNSWSegmentWriter {
Ok(())
}

async fn commit(self) -> Result<Self::Flusher, Box<dyn ChromaError>> {
pub async fn commit(self) -> Result<DistributedHNSWSegmentWriter, Box<dyn ChromaError>> {
let res = self.hnsw_index_provider.commit(self.index.clone());
match res {
Ok(_) => Ok(self),
Expand Down
21 changes: 4 additions & 17 deletions rust/worker/src/segment/metadata_segment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::super::execution::operators::filter::MetadataProvider;
use super::record_segment::ApplyMaterializedLogError;
use super::types::SegmentWriter;
use super::SegmentFlusher;
use crate::execution::operators::filter::RoaringMetadataFilter;
use crate::segment::record_segment::RecordSegmentReader;
Expand Down Expand Up @@ -470,20 +469,8 @@ impl<'me> MetadataSegmentWriter<'me> {
// Insert new value.
Ok(self.set_metadata(key, new_value, offset_id).await?)
}
}

impl SegmentWriter for MetadataSegmentWriter<'_> {
type Flusher = MetadataSegmentFlusher;

fn get_id(&self) -> SegmentUuid {
self.id
}

fn get_name(&self) -> &'static str {
"MetadataSegmentWriter"
}

async fn apply_materialized_log_chunk(
pub async fn apply_materialized_log_chunk(
&self,
record_segment_reader: &Option<RecordSegmentReader<'_>>,
materialized: &MaterializeLogsResult,
Expand Down Expand Up @@ -662,7 +649,7 @@ impl SegmentWriter for MetadataSegmentWriter<'_> {
Ok(())
}

async fn finish(&mut self) -> Result<(), Box<dyn ChromaError>> {
pub async fn finish(&mut self) -> Result<(), Box<dyn ChromaError>> {
let mut full_text_index_writer = match self.full_text_index_writer.take() {
Some(writer) => writer,
None => return Err(Box::new(MetadataSegmentError::NoWriter)),
Expand Down Expand Up @@ -721,7 +708,7 @@ impl SegmentWriter for MetadataSegmentWriter<'_> {
Ok(())
}

async fn commit(self) -> Result<Self::Flusher, Box<dyn ChromaError>> {
pub async fn commit(self) -> Result<MetadataSegmentFlusher, Box<dyn ChromaError>> {
let full_text_flusher = match self.full_text_index_writer {
Some(flusher) => match flusher.commit().await {
Ok(flusher) => flusher,
Expand Down Expand Up @@ -1127,7 +1114,7 @@ mod test {
record_segment::{
RecordSegmentReader, RecordSegmentReaderCreationError, RecordSegmentWriter,
},
SegmentFlusher, SegmentWriter,
SegmentFlusher,
};
use chroma_blockstore::{
arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider},
Expand Down
Loading

0 comments on commit 8ff398b

Please sign in to comment.