Skip to content

Commit

Permalink
refactor(cubestore): Move build_writer_props to CubestoreMetadataCach…
Browse files Browse the repository at this point in the history
…eFactory
  • Loading branch information
srh committed Sep 28, 2024
1 parent 9ac3a1f commit c6234ab
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 77 deletions.
21 changes: 11 additions & 10 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2007,8 +2007,7 @@ impl Config {
.register_typed::<dyn ChunkDataStore, _, _, _>(async move |i| {
let metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.await;
ChunkStore::new(
i.get_service_typed().await,
i.get_service_typed().await,
Expand All @@ -2025,10 +2024,10 @@ impl Config {
self.injector
.register_typed::<dyn CubestoreParquetMetadataCache, _, _, _>(async move |i| {
let c = i.get_service_typed::<dyn ConfigObj>().await;
let metadata_cache_factory = i
let cubestore_metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.await;
let metadata_cache_factory: &_ = cubestore_metadata_cache_factory.cache_factory();
CubestoreParquetMetadataCacheImpl::new(
match c.metadata_cache_max_capacity_bytes() {
0 => metadata_cache_factory.make_noop_cache(),
Expand All @@ -2045,8 +2044,7 @@ impl Config {
.register_typed::<dyn CompactionService, _, _, _>(async move |i| {
let metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.await;
CompactionServiceImpl::new(
i.get_service_typed().await,
i.get_service_typed().await,
Expand Down Expand Up @@ -2093,7 +2091,8 @@ impl Config {
i.get_service_typed().await,
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory(),
.cache_factory()
.clone(),
)
})
.await;
Expand Down Expand Up @@ -2195,7 +2194,8 @@ impl Config {
let metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.cache_factory()
.clone();
QueryPlannerImpl::new(
i.get_service_typed().await,
i.get_service_typed().await,
Expand All @@ -2211,7 +2211,8 @@ impl Config {
QueryExecutorImpl::new(
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory(),
.cache_factory()
.clone(),
i.get_service_typed().await,
i.get_service_typed().await,
)
Expand Down
7 changes: 4 additions & 3 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ mod tests {

use crate::metastore::job::JobType;
use crate::store::compaction::CompactionService;
use crate::table::parquet::CubestoreMetadataCacheFactoryImpl;
use async_compression::tokio::write::GzipEncoder;
use cuberockstore::rocksdb::{Options, DB};
use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
Expand Down Expand Up @@ -1728,7 +1729,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down Expand Up @@ -1807,7 +1808,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down Expand Up @@ -1917,7 +1918,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down
67 changes: 47 additions & 20 deletions rust/cubestore/cubestore/src/store/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
use crate::store::{min_max_values_from_data, ChunkDataStore, ChunkStore, ROW_GROUP_SIZE};
use crate::table::data::{cmp_min_rows, cmp_partition_key};
use crate::table::parquet::{arrow_schema, ParquetTableStore};
use crate::table::parquet::{arrow_schema, CubestoreMetadataCacheFactory, ParquetTableStore};
use crate::table::redistribute::redistribute;
use crate::table::{Row, TableValue};
use crate::util::batch_memory::record_batch_buffer_size;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct CompactionServiceImpl {
chunk_store: Arc<dyn ChunkDataStore>,
remote_fs: Arc<dyn RemoteFs>,
config: Arc<dyn ConfigObj>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
}

crate::di_service!(CompactionServiceImpl, [CompactionService]);
Expand All @@ -86,7 +86,7 @@ impl CompactionServiceImpl {
chunk_store: Arc<dyn ChunkDataStore>,
remote_fs: Arc<dyn RemoteFs>,
config: Arc<dyn ConfigObj>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
) -> Arc<CompactionServiceImpl> {
Arc::new(CompactionServiceImpl {
meta_store,
Expand Down Expand Up @@ -658,7 +658,9 @@ impl CompactionService for CompactionServiceImpl {
ROW_GROUP_SIZE,
1,
None,
self.metadata_cache_factory.make_noop_cache(),
self.metadata_cache_factory
.cache_factory()
.make_noop_cache(),
)?);

Arc::new(TraceDataLoadedExec::new(
Expand All @@ -680,8 +682,14 @@ impl CompactionService for CompactionServiceImpl {
};
let records =
merge_chunks(key_size, main_table, new, unique_key, aggregate_columns).await?;
let count_and_min =
write_to_files(records, total_rows as usize, store, new_local_files2).await?;
let count_and_min = write_to_files(
records,
total_rows as usize,
store,
&table,
new_local_files2,
)
.await?;

if let Some(c) = &new_chunk {
assert_eq!(new_local_files.len(), 1);
Expand Down Expand Up @@ -862,7 +870,12 @@ impl CompactionService for CompactionServiceImpl {
// TODO deactivate corrupt tables
let files = download_files(&partitions, self.remote_fs.clone()).await?;
let keys = find_partition_keys(
keys_with_counts(&files, self.metadata_cache_factory.as_ref(), key_len).await?,
keys_with_counts(
&files,
self.metadata_cache_factory.cache_factory().as_ref(),
key_len,
)
.await?,
key_len,
// TODO should it respect table partition_split_threshold?
self.config.partition_split_threshold() as usize,
Expand Down Expand Up @@ -1108,6 +1121,7 @@ pub(crate) async fn write_to_files(
records: SendableRecordBatchStream,
num_rows: usize,
store: ParquetTableStore,
table: &IdRow<Table>,
files: Vec<String>,
) -> Result<Vec<(usize, Vec<TableValue>, Vec<TableValue>)>, CubeError> {
let rows_per_file = div_ceil(num_rows as usize, files.len());
Expand Down Expand Up @@ -1165,7 +1179,7 @@ pub(crate) async fn write_to_files(
};
};

write_to_files_impl(records, store, files, pick_writer).await?;
write_to_files_impl(records, store, files, table, pick_writer).await?;

let mut stats = take(stats.lock().unwrap().deref_mut());
if stats.last().unwrap().0 == 0 {
Expand All @@ -1185,10 +1199,11 @@ async fn write_to_files_impl(
records: SendableRecordBatchStream,
store: ParquetTableStore,
files: Vec<String>,
table: &IdRow<Table>,
mut pick_writer: impl FnMut(&RecordBatch) -> WriteBatchTo,
) -> Result<(), CubeError> {
let schema = Arc::new(store.arrow_schema());
let writer_props = store.writer_props()?;
let writer_props = store.writer_props(table).await?;
let mut writers = files.into_iter().map(move |f| -> Result<_, CubeError> {
Ok(ArrowWriter::try_new(
File::create(f)?,
Expand Down Expand Up @@ -1254,6 +1269,7 @@ async fn write_to_files_impl(
async fn write_to_files_by_keys(
records: SendableRecordBatchStream,
store: ParquetTableStore,
table: &IdRow<Table>,
files: Vec<String>,
keys: Vec<Row>,
) -> Result<Vec<usize>, CubeError> {
Expand Down Expand Up @@ -1297,7 +1313,7 @@ async fn write_to_files_by_keys(
panic!("impossible")
};
let num_files = files.len();
write_to_files_impl(records, store, files, pick_writer).await?;
write_to_files_impl(records, store, files, table, pick_writer).await?;

let mut row_counts: Vec<usize> = take(row_counts.lock().unwrap().as_mut());
assert!(
Expand Down Expand Up @@ -1418,6 +1434,7 @@ mod tests {
use crate::remotefs::LocalDirRemoteFs;
use crate::store::MockChunkDataStore;
use crate::table::data::rows_to_columns;
use crate::table::parquet::CubestoreMetadataCacheFactoryImpl;
use crate::table::{cmp_same_types, Row, TableValue};
use cuberockstore::rocksdb::{Options, DB};
use datafusion::arrow::array::{Int64Array, StringArray};
Expand Down Expand Up @@ -1540,7 +1557,7 @@ mod tests {
Arc::new(chunk_store),
remote_fs,
Arc::new(config),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
);
compaction_service
.compact(1, DataLoadedSize::new())
Expand Down Expand Up @@ -1680,7 +1697,7 @@ mod tests {
remote_fs.clone(),
Arc::new(cluster),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
10,
);
metastore
Expand Down Expand Up @@ -1768,7 +1785,7 @@ mod tests {
chunk_store.clone(),
remote_fs,
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
);
compaction_service
.compact_in_memory_chunks(partition.get_id())
Expand Down Expand Up @@ -1856,7 +1873,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
50,
);

Expand Down Expand Up @@ -1959,7 +1976,7 @@ mod tests {
chunk_store.clone(),
remote_fs.clone(),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
);
compaction_service
.compact(partition.get_id(), DataLoadedSize::new())
Expand Down Expand Up @@ -2190,7 +2207,7 @@ mod tests {
struct MultiSplit {
meta: Arc<dyn MetaStore>,
fs: Arc<dyn RemoteFs>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
keys: Vec<Row>,
key_len: usize,
multi_partition_id: u64,
Expand All @@ -2206,7 +2223,7 @@ impl MultiSplit {
fn new(
meta: Arc<dyn MetaStore>,
fs: Arc<dyn RemoteFs>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
keys: Vec<Row>,
key_len: usize,
multi_partition_id: u64,
Expand Down Expand Up @@ -2270,6 +2287,10 @@ impl MultiSplit {
}
});

let table = self
.meta
.get_table_by_id(p.index.get_row().table_id())
.await?;
let store = ParquetTableStore::new(
p.index.get_row().clone(),
ROW_GROUP_SIZE,
Expand All @@ -2278,7 +2299,7 @@ impl MultiSplit {
let records = if !in_files.is_empty() {
read_files(
&in_files.into_iter().map(|(f, _)| f).collect::<Vec<_>>(),
self.metadata_cache_factory.as_ref(),
self.metadata_cache_factory.cache_factory().as_ref(),
self.key_len,
None,
)
Expand All @@ -2290,8 +2311,14 @@ impl MultiSplit {
.execute(0)
.await?
};
let row_counts =
write_to_files_by_keys(records, store, out_files.to_vec(), self.keys.clone()).await?;
let row_counts = write_to_files_by_keys(
records,
store,
&table,
out_files.to_vec(),
self.keys.clone(),
)
.await?;

for i in 0..row_counts.len() {
mrow_counts[i] += row_counts[i] as u64;
Expand Down
Loading

0 comments on commit c6234ab

Please sign in to comment.