Skip to content

Commit

Permalink
Remove SegmentFlusher trait
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jan 3, 2025
1 parent 8ff398b commit be300a3
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 94 deletions.
1 change: 0 additions & 1 deletion rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {
mod tests {
use crate::segment::materialize_logs;
use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError};
use crate::segment::types::SegmentFlusher;
use crate::{
execution::{
operator::Operator,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::execution::operator::Operator;
use crate::segment::types::SegmentFlusher;
use crate::segment::ChromaSegmentFlusher;
use async_trait::async_trait;
use chroma_error::ChromaError;
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::segment::record_segment::RecordSegmentWriter;
use crate::segment::ChromaSegmentFlusher;
use crate::segment::ChromaSegmentWriter;
use crate::segment::MaterializeLogsResult;
use crate::segment::SegmentFlusher;
use crate::sysdb::sysdb::GetCollectionsError;
use crate::sysdb::sysdb::GetSegmentsError;
use crate::sysdb::sysdb::SysDb;
Expand Down
16 changes: 2 additions & 14 deletions rust/worker/src/segment/distributed_hnsw_segment.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::record_segment::{ApplyMaterializedLogError, RecordSegmentReader};
use super::utils::hnsw_params_from_segment;
use super::{MaterializeLogsResult, SegmentFlusher};
use super::MaterializeLogsResult;
use crate::segment::utils::distance_function_from_segment;
use async_trait::async_trait;
use chroma_distance::DistanceFunctionError;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::hnsw_provider::{
Expand Down Expand Up @@ -261,19 +260,8 @@ impl DistributedHNSWSegmentWriter {
Err(e) => Err(e),
}
}
}

#[async_trait]
impl SegmentFlusher for DistributedHNSWSegmentWriter {
fn get_id(&self) -> SegmentUuid {
self.id
}

fn get_name(&self) -> &'static str {
"DistributedHNSWSegmentWriter"
}

async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
pub async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
let hnsw_index_id = self.index.inner.read().id;
match self.hnsw_index_provider.flush(&hnsw_index_id).await {
Ok(_) => {}
Expand Down
18 changes: 3 additions & 15 deletions rust/worker/src/segment/metadata_segment.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use super::super::execution::operators::filter::MetadataProvider;
use super::record_segment::ApplyMaterializedLogError;
use super::SegmentFlusher;
use crate::execution::operators::filter::RoaringMetadataFilter;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::MaterializeLogsResult;
use async_trait::async_trait;
use chroma_blockstore::provider::{BlockfileProvider, CreateError, OpenError};
use chroma_blockstore::BlockfileWriterOptions;
use chroma_error::{ChromaError, ErrorCodes};
Expand Down Expand Up @@ -761,7 +759,7 @@ impl<'me> MetadataSegmentWriter<'me> {
}

pub struct MetadataSegmentFlusher {
id: SegmentUuid,
pub id: SegmentUuid,
pub(crate) full_text_index_flusher: FullTextIndexFlusher,
pub(crate) string_metadata_index_flusher: MetadataIndexFlusher,
pub(crate) bool_metadata_index_flusher: MetadataIndexFlusher,
Expand All @@ -777,17 +775,8 @@ impl Debug for MetadataSegmentFlusher {
}
}

#[async_trait]
impl SegmentFlusher for MetadataSegmentFlusher {
fn get_id(&self) -> SegmentUuid {
self.id
}

fn get_name(&self) -> &'static str {
"MetadataSegmentFlusher"
}

async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
impl MetadataSegmentFlusher {
pub async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
let full_text_pls_id = self.full_text_index_flusher.pls_id();
let string_metadata_id = self.string_metadata_index_flusher.id();
let bool_metadata_id = self.bool_metadata_index_flusher.id();
Expand Down Expand Up @@ -1114,7 +1103,6 @@ mod test {
record_segment::{
RecordSegmentReader, RecordSegmentReaderCreationError, RecordSegmentWriter,
},
SegmentFlusher,
};
use chroma_blockstore::{
arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider},
Expand Down
20 changes: 4 additions & 16 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use super::spann_segment::SpannSegmentWriterError;
use super::{
HydratedMaterializedLogRecord, LogMaterializerError, MaterializeLogsResult, SegmentFlusher,
};
use async_trait::async_trait;
use super::{HydratedMaterializedLogRecord, LogMaterializerError, MaterializeLogsResult};
use chroma_blockstore::provider::{BlockfileProvider, CreateError, OpenError};
use chroma_blockstore::{
BlockfileFlusher, BlockfileReader, BlockfileWriter, BlockfileWriterOptions,
Expand Down Expand Up @@ -559,7 +556,7 @@ impl ChromaError for ApplyMaterializedLogError {
}

pub struct RecordSegmentFlusher {
id: SegmentUuid,
pub id: SegmentUuid,
user_id_to_id_flusher: BlockfileFlusher,
id_to_user_id_flusher: BlockfileFlusher,
id_to_data_flusher: BlockfileFlusher,
Expand All @@ -572,17 +569,8 @@ impl Debug for RecordSegmentFlusher {
}
}

#[async_trait]
impl SegmentFlusher for RecordSegmentFlusher {
fn get_name(&self) -> &'static str {
"RecordSegmentFlusher"
}

fn get_id(&self) -> SegmentUuid {
self.id
}

async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
impl RecordSegmentFlusher {
pub async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
let user_id_to_id_bf_id = self.user_id_to_id_flusher.id();
let id_to_user_id_bf_id = self.id_to_user_id_flusher.id();
let id_to_data_bf_id = self.id_to_data_flusher.id();
Expand Down
32 changes: 10 additions & 22 deletions rust/worker/src/segment/spann_segment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::HashMap;

use async_trait::async_trait;
use super::record_segment::RecordSegmentReader;
use super::{
record_segment::ApplyMaterializedLogError,
utils::{distance_function_from_segment, hnsw_params_from_segment},
};
use super::{BorrowedMaterializedLogRecord, HydratedMaterializedLogRecord, MaterializeLogsResult};
use chroma_blockstore::provider::BlockfileProvider;
use chroma_distance::DistanceFunctionError;
use chroma_error::{ChromaError, ErrorCodes};
Expand All @@ -11,17 +14,10 @@ use chroma_index::IndexUuid;
use chroma_index::{hnsw_provider::HnswIndexProvider, spann::types::SpannIndexWriter};
use chroma_types::SegmentUuid;
use chroma_types::{MaterializedLogOperation, Segment, SegmentScope, SegmentType};
use std::collections::HashMap;
use thiserror::Error;
use uuid::Uuid;

use super::record_segment::RecordSegmentReader;
use super::{
record_segment::ApplyMaterializedLogError,
utils::{distance_function_from_segment, hnsw_params_from_segment},
SegmentFlusher,
};
use super::{BorrowedMaterializedLogRecord, HydratedMaterializedLogRecord, MaterializeLogsResult};

const HNSW_PATH: &str = "hnsw_path";
const VERSION_MAP_PATH: &str = "version_map_path";
const POSTING_LIST_PATH: &str = "posting_list_path";
Expand Down Expand Up @@ -292,20 +288,13 @@ impl SpannSegmentWriter {
}

pub struct SpannSegmentFlusher {
#[allow(dead_code)]
id: SegmentUuid,
index_flusher: SpannIndexFlusher,
}

#[async_trait]
impl SegmentFlusher for SpannSegmentFlusher {
fn get_id(&self) -> SegmentUuid {
self.id
}

fn get_name(&self) -> &'static str {
"SpannSegmentFlusher"
}

impl SpannSegmentFlusher {
#[allow(dead_code)]
async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
let index_flusher_res = self
.index_flusher
Expand Down Expand Up @@ -517,7 +506,6 @@ mod test {
use crate::segment::{
materialize_logs,
spann_segment::{SpannSegmentReader, SpannSegmentWriter},
SegmentFlusher,
};

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/segment/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::log::test::{LogGenerator, TEST_EMBEDDING_DIMENSION};

use super::{
distributed_hnsw_segment::DistributedHNSWSegmentWriter, materialize_logs,
metadata_segment::MetadataSegmentWriter, record_segment::RecordSegmentWriter, SegmentFlusher,
metadata_segment::MetadataSegmentWriter, record_segment::RecordSegmentWriter,
};

#[derive(Clone)]
Expand Down
34 changes: 11 additions & 23 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::{
Chunk, DataRecord, DeletedMetadata, LogRecord, MaterializedLogOperation, Metadata,
Expand Down Expand Up @@ -863,14 +862,6 @@ pub async fn materialize_logs(
})
}

// This needs to be public for testing
#[async_trait]
pub trait SegmentFlusher {
fn get_id(&self) -> SegmentUuid;
fn get_name(&self) -> &'static str;
async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>>;
}

#[derive(Clone, Debug)]
pub enum ChromaSegmentWriter<'bf> {
RecordSegment(RecordSegmentWriter),
Expand All @@ -883,7 +874,7 @@ impl<'a> ChromaSegmentWriter<'a> {
match self {
ChromaSegmentWriter::RecordSegment(writer) => writer.id,
ChromaSegmentWriter::MetadataSegment(writer) => writer.id,
ChromaSegmentWriter::DistributedHNSWSegment(writer) => writer.get_id(),
ChromaSegmentWriter::DistributedHNSWSegment(writer) => writer.id,
}
}

Expand Down Expand Up @@ -952,27 +943,24 @@ pub enum ChromaSegmentFlusher {
DistributedHNSWSegment(Box<DistributedHNSWSegmentWriter>),
}

#[async_trait]
impl SegmentFlusher for ChromaSegmentFlusher {
fn get_id(&self) -> SegmentUuid {
impl ChromaSegmentFlusher {
pub fn get_id(&self) -> SegmentUuid {
match self {
ChromaSegmentFlusher::RecordSegment(flusher) => flusher.get_id(),
ChromaSegmentFlusher::MetadataSegment(flusher) => flusher.get_id(),
ChromaSegmentFlusher::DistributedHNSWSegment(flusher) => flusher.get_id(),
ChromaSegmentFlusher::RecordSegment(flusher) => flusher.id,
ChromaSegmentFlusher::MetadataSegment(flusher) => flusher.id,
ChromaSegmentFlusher::DistributedHNSWSegment(flusher) => flusher.id,
}
}

fn get_name(&self) -> &'static str {
pub fn get_name(&self) -> &'static str {
match self {
ChromaSegmentFlusher::RecordSegment(flusher) => flusher.get_name(),
ChromaSegmentFlusher::MetadataSegment(flusher) => flusher.get_name(),
ChromaSegmentFlusher::DistributedHNSWSegment(flusher) => {
SegmentFlusher::get_name(flusher.as_ref())
}
ChromaSegmentFlusher::RecordSegment(_) => "RecordSegmentFlusher",
ChromaSegmentFlusher::MetadataSegment(_) => "MetadataSegmentFlusher",
ChromaSegmentFlusher::DistributedHNSWSegment(_) => "DistributedHNSWSegmentFlusher",
}
}

async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
pub async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
match self {
ChromaSegmentFlusher::RecordSegment(flusher) => flusher.flush().await,
ChromaSegmentFlusher::MetadataSegment(flusher) => flusher.flush().await,
Expand Down

0 comments on commit be300a3

Please sign in to comment.