Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cubestore): Move build_writer_props to CubestoreMetadataCacheFactory #8756

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/cubestore/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2007,8 +2007,7 @@ impl Config {
.register_typed::<dyn ChunkDataStore, _, _, _>(async move |i| {
let metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.await;
ChunkStore::new(
i.get_service_typed().await,
i.get_service_typed().await,
Expand All @@ -2025,10 +2024,10 @@ impl Config {
self.injector
.register_typed::<dyn CubestoreParquetMetadataCache, _, _, _>(async move |i| {
let c = i.get_service_typed::<dyn ConfigObj>().await;
let metadata_cache_factory = i
let cubestore_metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.await;
let metadata_cache_factory: &_ = cubestore_metadata_cache_factory.cache_factory();
CubestoreParquetMetadataCacheImpl::new(
match c.metadata_cache_max_capacity_bytes() {
0 => metadata_cache_factory.make_noop_cache(),
Expand All @@ -2045,8 +2044,7 @@ impl Config {
.register_typed::<dyn CompactionService, _, _, _>(async move |i| {
let metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.await;
CompactionServiceImpl::new(
i.get_service_typed().await,
i.get_service_typed().await,
Expand Down Expand Up @@ -2093,7 +2091,8 @@ impl Config {
i.get_service_typed().await,
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory(),
.cache_factory()
.clone(),
)
})
.await;
Expand Down Expand Up @@ -2195,7 +2194,8 @@ impl Config {
let metadata_cache_factory = i
.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory();
.cache_factory()
.clone();
QueryPlannerImpl::new(
i.get_service_typed().await,
i.get_service_typed().await,
Expand All @@ -2211,7 +2211,8 @@ impl Config {
QueryExecutorImpl::new(
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
.await
.cache_factory(),
.cache_factory()
.clone(),
i.get_service_typed().await,
i.get_service_typed().await,
)
Expand Down
7 changes: 4 additions & 3 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ mod tests {

use crate::metastore::job::JobType;
use crate::store::compaction::CompactionService;
use crate::table::parquet::CubestoreMetadataCacheFactoryImpl;
use async_compression::tokio::write::GzipEncoder;
use cuberockstore::rocksdb::{Options, DB};
use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
Expand Down Expand Up @@ -1728,7 +1729,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down Expand Up @@ -1807,7 +1808,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down Expand Up @@ -1917,7 +1918,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down
67 changes: 47 additions & 20 deletions rust/cubestore/cubestore/src/store/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
use crate::store::{min_max_values_from_data, ChunkDataStore, ChunkStore, ROW_GROUP_SIZE};
use crate::table::data::{cmp_min_rows, cmp_partition_key};
use crate::table::parquet::{arrow_schema, ParquetTableStore};
use crate::table::parquet::{arrow_schema, CubestoreMetadataCacheFactory, ParquetTableStore};
use crate::table::redistribute::redistribute;
use crate::table::{Row, TableValue};
use crate::util::batch_memory::record_batch_buffer_size;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct CompactionServiceImpl {
chunk_store: Arc<dyn ChunkDataStore>,
remote_fs: Arc<dyn RemoteFs>,
config: Arc<dyn ConfigObj>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
}

crate::di_service!(CompactionServiceImpl, [CompactionService]);
Expand All @@ -86,7 +86,7 @@ impl CompactionServiceImpl {
chunk_store: Arc<dyn ChunkDataStore>,
remote_fs: Arc<dyn RemoteFs>,
config: Arc<dyn ConfigObj>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
) -> Arc<CompactionServiceImpl> {
Arc::new(CompactionServiceImpl {
meta_store,
Expand Down Expand Up @@ -658,7 +658,9 @@ impl CompactionService for CompactionServiceImpl {
ROW_GROUP_SIZE,
1,
None,
self.metadata_cache_factory.make_noop_cache(),
self.metadata_cache_factory
.cache_factory()
.make_noop_cache(),
)?);

Arc::new(TraceDataLoadedExec::new(
Expand All @@ -680,8 +682,14 @@ impl CompactionService for CompactionServiceImpl {
};
let records =
merge_chunks(key_size, main_table, new, unique_key, aggregate_columns).await?;
let count_and_min =
write_to_files(records, total_rows as usize, store, new_local_files2).await?;
let count_and_min = write_to_files(
records,
total_rows as usize,
store,
&table,
new_local_files2,
)
.await?;

if let Some(c) = &new_chunk {
assert_eq!(new_local_files.len(), 1);
Expand Down Expand Up @@ -862,7 +870,12 @@ impl CompactionService for CompactionServiceImpl {
// TODO deactivate corrupt tables
let files = download_files(&partitions, self.remote_fs.clone()).await?;
let keys = find_partition_keys(
keys_with_counts(&files, self.metadata_cache_factory.as_ref(), key_len).await?,
keys_with_counts(
&files,
self.metadata_cache_factory.cache_factory().as_ref(),
key_len,
)
.await?,
key_len,
// TODO should it respect table partition_split_threshold?
self.config.partition_split_threshold() as usize,
Expand Down Expand Up @@ -1108,6 +1121,7 @@ pub(crate) async fn write_to_files(
records: SendableRecordBatchStream,
num_rows: usize,
store: ParquetTableStore,
table: &IdRow<Table>,
files: Vec<String>,
) -> Result<Vec<(usize, Vec<TableValue>, Vec<TableValue>)>, CubeError> {
let rows_per_file = div_ceil(num_rows as usize, files.len());
Expand Down Expand Up @@ -1165,7 +1179,7 @@ pub(crate) async fn write_to_files(
};
};

write_to_files_impl(records, store, files, pick_writer).await?;
write_to_files_impl(records, store, files, table, pick_writer).await?;

let mut stats = take(stats.lock().unwrap().deref_mut());
if stats.last().unwrap().0 == 0 {
Expand All @@ -1185,10 +1199,11 @@ async fn write_to_files_impl(
records: SendableRecordBatchStream,
store: ParquetTableStore,
files: Vec<String>,
table: &IdRow<Table>,
mut pick_writer: impl FnMut(&RecordBatch) -> WriteBatchTo,
) -> Result<(), CubeError> {
let schema = Arc::new(store.arrow_schema());
let writer_props = store.writer_props()?;
let writer_props = store.writer_props(table).await?;
let mut writers = files.into_iter().map(move |f| -> Result<_, CubeError> {
Ok(ArrowWriter::try_new(
File::create(f)?,
Expand Down Expand Up @@ -1254,6 +1269,7 @@ async fn write_to_files_impl(
async fn write_to_files_by_keys(
records: SendableRecordBatchStream,
store: ParquetTableStore,
table: &IdRow<Table>,
files: Vec<String>,
keys: Vec<Row>,
) -> Result<Vec<usize>, CubeError> {
Expand Down Expand Up @@ -1297,7 +1313,7 @@ async fn write_to_files_by_keys(
panic!("impossible")
};
let num_files = files.len();
write_to_files_impl(records, store, files, pick_writer).await?;
write_to_files_impl(records, store, files, table, pick_writer).await?;

let mut row_counts: Vec<usize> = take(row_counts.lock().unwrap().as_mut());
assert!(
Expand Down Expand Up @@ -1418,6 +1434,7 @@ mod tests {
use crate::remotefs::LocalDirRemoteFs;
use crate::store::MockChunkDataStore;
use crate::table::data::rows_to_columns;
use crate::table::parquet::CubestoreMetadataCacheFactoryImpl;
use crate::table::{cmp_same_types, Row, TableValue};
use cuberockstore::rocksdb::{Options, DB};
use datafusion::arrow::array::{Int64Array, StringArray};
Expand Down Expand Up @@ -1540,7 +1557,7 @@ mod tests {
Arc::new(chunk_store),
remote_fs,
Arc::new(config),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
);
compaction_service
.compact(1, DataLoadedSize::new())
Expand Down Expand Up @@ -1680,7 +1697,7 @@ mod tests {
remote_fs.clone(),
Arc::new(cluster),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
10,
);
metastore
Expand Down Expand Up @@ -1768,7 +1785,7 @@ mod tests {
chunk_store.clone(),
remote_fs,
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
);
compaction_service
.compact_in_memory_chunks(partition.get_id())
Expand Down Expand Up @@ -1856,7 +1873,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
50,
);

Expand Down Expand Up @@ -1959,7 +1976,7 @@ mod tests {
chunk_store.clone(),
remote_fs.clone(),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())),
);
compaction_service
.compact(partition.get_id(), DataLoadedSize::new())
Expand Down Expand Up @@ -2190,7 +2207,7 @@ mod tests {
struct MultiSplit {
meta: Arc<dyn MetaStore>,
fs: Arc<dyn RemoteFs>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
keys: Vec<Row>,
key_len: usize,
multi_partition_id: u64,
Expand All @@ -2206,7 +2223,7 @@ impl MultiSplit {
fn new(
meta: Arc<dyn MetaStore>,
fs: Arc<dyn RemoteFs>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
metadata_cache_factory: Arc<dyn CubestoreMetadataCacheFactory>,
keys: Vec<Row>,
key_len: usize,
multi_partition_id: u64,
Expand Down Expand Up @@ -2270,6 +2287,10 @@ impl MultiSplit {
}
});

let table = self
.meta
.get_table_by_id(p.index.get_row().table_id())
.await?;
let store = ParquetTableStore::new(
p.index.get_row().clone(),
ROW_GROUP_SIZE,
Expand All @@ -2278,7 +2299,7 @@ impl MultiSplit {
let records = if !in_files.is_empty() {
read_files(
&in_files.into_iter().map(|(f, _)| f).collect::<Vec<_>>(),
self.metadata_cache_factory.as_ref(),
self.metadata_cache_factory.cache_factory().as_ref(),
self.key_len,
None,
)
Expand All @@ -2290,8 +2311,14 @@ impl MultiSplit {
.execute(0)
.await?
};
let row_counts =
write_to_files_by_keys(records, store, out_files.to_vec(), self.keys.clone()).await?;
let row_counts = write_to_files_by_keys(
records,
store,
&table,
out_files.to_vec(),
self.keys.clone(),
)
.await?;

for i in 0..row_counts.len() {
mrow_counts[i] += row_counts[i] as u64;
Expand Down
Loading
Loading