diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 7e4597a5a0e49..32d7f80ecc8b5 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -36,7 +36,7 @@ use crate::store::compaction::{CompactionService, CompactionServiceImpl}; use crate::store::{ChunkDataStore, ChunkStore, WALDataStore, WALStore}; use crate::streaming::kafka::{KafkaClientService, KafkaClientServiceImpl}; use crate::streaming::{KsqlClient, KsqlClientImpl, StreamingService, StreamingServiceImpl}; -use crate::table::parquet::{CubestoreParquetMetadataCache, CubestoreParquetMetadataCacheImpl}; +use crate::table::parquet::{CubestoreMetadataCacheFactory, CubestoreMetadataCacheFactoryImpl, CubestoreParquetMetadataCache, CubestoreParquetMetadataCacheImpl}; use crate::telemetry::tracing::{TracingHelper, TracingHelperImpl}; use crate::telemetry::{ start_agent_event_loop, start_track_event_loop, stop_agent_event_loop, stop_track_event_loop, @@ -45,7 +45,7 @@ use crate::util::memory::{MemoryHandler, MemoryHandlerImpl}; use crate::CubeError; use cuberockstore::rocksdb::{Options, DB}; use datafusion::cube_ext; -use datafusion::physical_plan::parquet::{LruParquetMetadataCache, NoopParquetMetadataCache}; +use datafusion::physical_plan::parquet::{BasicMetadataCacheFactory, LruParquetMetadataCache, NoopParquetMetadataCache}; use futures::future::join_all; use log::Level; use log::{debug, error}; @@ -534,8 +534,8 @@ pub trait ConfigObj: DIService { fn remote_files_cleanup_interval_secs(&self) -> u64; - fn local_files_cleanup_size_threshold(&self) -> u64; - + fn local_files_cleanup_size_threshold(&self) -> u64 +; fn local_files_cleanup_delay_secs(&self) -> u64; fn remote_files_cleanup_delay_secs(&self) -> u64; @@ -2000,13 +2000,21 @@ impl Config { }) .await; + self.injector + .register_typed::(async move |i| { + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())) + }) + .await; + self.injector .register_typed::(async move |i| { + let metadata_cache_factory = i.get_service_typed::().await.cache_factory(); ChunkStore::new( i.get_service_typed().await, i.get_service_typed().await, i.get_service_typed().await, i.get_service_typed().await, + metadata_cache_factory, i.get_service_typed::() .await .wal_split_threshold() as usize, @@ -2017,13 +2025,11 @@ impl Config { self.injector .register_typed::(async move |i| { let c = i.get_service_typed::().await; + let metadata_cache_factory = i.get_service_typed::().await.cache_factory(); CubestoreParquetMetadataCacheImpl::new( match c.metadata_cache_max_capacity_bytes() { - 0 => NoopParquetMetadataCache::new(), - max_cached_metadata => LruParquetMetadataCache::new( - max_cached_metadata, - Duration::from_secs(c.metadata_cache_time_to_idle_secs()), - ), + 0 => metadata_cache_factory.make_noop_cache(), + max_cached_metadata => metadata_cache_factory.make_lru_cache(max_cached_metadata, Duration::from_secs(c.metadata_cache_time_to_idle_secs())), }, ) }) @@ -2031,11 +2037,13 @@ impl Config { self.injector .register_typed::(async move |i| { + let metadata_cache_factory = i.get_service_typed::().await.cache_factory(); CompactionServiceImpl::new( i.get_service_typed().await, i.get_service_typed().await, i.get_service_typed().await, i.get_service_typed().await, + metadata_cache_factory, ) }) .await; @@ -2160,11 +2168,13 @@ impl Config { let query_cache_to_move = query_cache.clone(); self.injector .register_typed::(async move |i| { + let metadata_cache_factory = i.get_service_typed::().await.cache_factory(); QueryPlannerImpl::new( i.get_service_typed().await, i.get_service_typed().await, i.get_service_typed().await, query_cache_to_move, + metadata_cache_factory, ) }) .await; diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index 50139d8b1df9e..cbc9c8f871e46 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -3,6 +3,7 @@ mod optimizations; pub mod panic; mod partition_filter; mod planning; +use datafusion::physical_plan::parquet::MetadataCacheFactory; pub use planning::PlanningMeta; mod check_memory; pub mod physical_plan_flags; @@ -98,6 +99,7 @@ pub struct QueryPlannerImpl { cache_store: Arc, config: Arc, cache: Arc, + metadata_cache_factory: Arc, } crate::di_service!(QueryPlannerImpl, [QueryPlanner]); @@ -179,12 +181,14 @@ impl QueryPlannerImpl { cache_store: Arc, config: Arc, cache: Arc, + metadata_cache_factory: Arc, ) -> Arc { Arc::new(QueryPlannerImpl { meta_store, cache_store, config, cache, + metadata_cache_factory, }) } } @@ -193,6 +197,7 @@ impl QueryPlannerImpl { async fn execution_context(&self) -> Result, CubeError> { Ok(Arc::new(ExecutionContext::with_config( ExecutionConfig::new() + .with_metadata_cache_factory(self.metadata_cache_factory.clone()) .add_optimizer_rule(Arc::new(MaterializeNow {})) .add_optimizer_rule(Arc::new(FlattenUnion {})), ))) diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 6db62961e9842..7e8f90829e0ce 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -327,7 +327,7 @@ impl QueryExecutorImpl { serialized_plan: Arc, ) -> Result, CubeError> { Ok(Arc::new(ExecutionContext::with_config( - ExecutionConfig::new() + ExecutionConfig::new() // TODO: Set encryption param .with_batch_size(4096) .with_concurrency(1) .with_query_planner(Arc::new(CubeQueryPlanner::new_on_router( @@ -344,7 +344,7 @@ impl QueryExecutorImpl { data_loaded_size: Option>, ) -> Result, CubeError> { Ok(Arc::new(ExecutionContext::with_config( - ExecutionConfig::new() + ExecutionConfig::new() // TODO: Set encryption param .with_batch_size(4096) .with_concurrency(1) .with_query_planner(Arc::new(CubeQueryPlanner::new_on_worker( @@ -365,7 +365,7 @@ pub struct CubeTable { worker_partition_ids: Vec<(u64, RowFilter)>, #[serde(skip, default)] chunk_id_to_record_batches: HashMap>, - #[serde(skip, default = "NoopParquetMetadataCache::new")] + #[serde(skip, default = "NoopParquetMetadataCache::new")] // TODO: Triple check the initialization of this field. parquet_metadata_cache: Arc, } diff --git a/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs b/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs index c3afbee4646a7..8f7f94719f5bc 100644 --- a/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs +++ b/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs @@ -868,6 +868,7 @@ mod tests { use datafusion::physical_plan::aggregates::AggregateFunction; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::memory::MemoryExec; + use datafusion::physical_plan::parquet::BasicMetadataCacheFactory; use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::ExecutionPlan; use futures::StreamExt; @@ -1240,6 +1241,7 @@ mod tests { aggregate_functions: Default::default(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + metadata_cache_factory: Arc::new(BasicMetadataCacheFactory::new()), }; let agg_exprs = aggs .iter() diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index f4e54deb3cc98..17e4be0067cfc 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1659,6 +1659,7 @@ mod tests { use crate::store::compaction::CompactionService; use async_compression::tokio::write::GzipEncoder; use cuberockstore::rocksdb::{Options, DB}; + use datafusion::physical_plan::parquet::BasicMetadataCacheFactory; use futures_timer::Delay; use itertools::Itertools; use pretty_assertions::assert_eq; @@ -1723,6 +1724,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), rows_per_chunk, ); let limits = Arc::new(ConcurrencyLimits::new(4)); @@ -1800,6 +1802,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), rows_per_chunk, ); let limits = Arc::new(ConcurrencyLimits::new(4)); @@ -1907,6 +1910,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), rows_per_chunk, ); let limits = Arc::new(ConcurrencyLimits::new(4)); diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index 95eabcf9d352f..69e720e938e14 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -33,7 +33,7 @@ use datafusion::physical_plan::hash_aggregate::{ }; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::merge_sort::{LastRowByUniqueKeyExec, MergeSortExec}; -use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::physical_plan::parquet::{MetadataCacheFactory, ParquetExec}; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{ AggregateExpr, ExecutionPlan, PhysicalExpr, SendableRecordBatchStream, @@ -75,6 +75,7 @@ pub struct CompactionServiceImpl { chunk_store: Arc, remote_fs: Arc, config: Arc, + metadata_cache_factory: Arc, } crate::di_service!(CompactionServiceImpl, [CompactionService]); @@ -85,12 +86,14 @@ impl CompactionServiceImpl { chunk_store: Arc, remote_fs: Arc, config: Arc, + metadata_cache_factory: Arc, ) -> Arc { Arc::new(CompactionServiceImpl { meta_store, chunk_store, remote_fs, config, + metadata_cache_factory, }) } @@ -571,7 +574,7 @@ impl CompactionService for CompactionServiceImpl { } } - let store = ParquetTableStore::new(index.get_row().clone(), ROW_GROUP_SIZE); + let store = ParquetTableStore::new(index.get_row().clone(), ROW_GROUP_SIZE, self.metadata_cache_factory.clone()); let old_partition_remote = match &new_chunk { Some(_) => None, None => partition.get_row().get_full_name(partition.get_id()), @@ -640,13 +643,14 @@ impl CompactionService for CompactionServiceImpl { let schema = Arc::new(arrow_schema(index.get_row())); let main_table: Arc = match old_partition_local { Some(file) => { - let parquet_exec = Arc::new(ParquetExec::try_from_path( + let parquet_exec = Arc::new(ParquetExec::try_from_path_with_cache( file.as_str(), None, None, ROW_GROUP_SIZE, 1, None, + self.metadata_cache_factory.make_noop_cache(), )?); Arc::new(TraceDataLoadedExec::new( @@ -850,7 +854,7 @@ impl CompactionService for CompactionServiceImpl { // TODO deactivate corrupt tables let files = download_files(&partitions, self.remote_fs.clone()).await?; let keys = find_partition_keys( - keys_with_counts(&files, key_len).await?, + keys_with_counts(&files, self.metadata_cache_factory.as_ref(), key_len).await?, key_len, // TODO should it respect table partition_split_threshold? self.config.partition_split_threshold() as usize, @@ -893,6 +897,7 @@ impl CompactionService for CompactionServiceImpl { let mut s = MultiSplit::new( self.meta_store.clone(), self.remote_fs.clone(), + self.metadata_cache_factory.clone(), keys, key_len, multi_partition_id, @@ -935,6 +940,7 @@ impl CompactionService for CompactionServiceImpl { let mut s = MultiSplit::new( self.meta_store.clone(), self.remote_fs.clone(), + self.metadata_cache_factory.clone(), keys, key_len, multi_partition_id, @@ -979,19 +985,21 @@ async fn find_partition_keys( async fn read_files( files: &[String], + metadata_cache_factory: &dyn MetadataCacheFactory, key_len: usize, projection: Option>, ) -> Result, CubeError> { assert!(!files.is_empty()); let mut inputs = Vec::>::with_capacity(files.len()); for f in files { - inputs.push(Arc::new(ParquetExec::try_from_files( + inputs.push(Arc::new(ParquetExec::try_from_files_with_cache( &[f.as_str()], projection.clone(), None, ROW_GROUP_SIZE, 1, None, + metadata_cache_factory.make_noop_cache(), )?)); } let plan = Arc::new(UnionExec::new(inputs)); @@ -1008,10 +1016,11 @@ async fn read_files( /// this key in the input files. async fn keys_with_counts( files: &[String], + metadata_cache_factory: &dyn MetadataCacheFactory, key_len: usize, ) -> Result { let projection = (0..key_len).collect_vec(); - let plan = read_files(files, key_len, Some(projection.clone())).await?; + let plan = read_files(files, metadata_cache_factory, key_len, Some(projection.clone())).await?; let fields = plan.schema(); let fields = fields.fields(); @@ -1400,6 +1409,8 @@ mod tests { use arrow::record_batch::RecordBatch; use cuberockstore::rocksdb::{Options, DB}; use datafusion::physical_plan::collect; + use datafusion::physical_plan::parquet::BasicMetadataCacheFactory; + use datafusion::physical_plan::parquet::NoopParquetMetadataCache; use std::fs; use std::path::{Path, PathBuf}; @@ -1513,6 +1524,7 @@ mod tests { Arc::new(chunk_store), remote_fs, Arc::new(config), + Arc::new(BasicMetadataCacheFactory::new()), ); compaction_service .compact(1, DataLoadedSize::new()) @@ -1652,6 +1664,7 @@ mod tests { remote_fs.clone(), Arc::new(cluster), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), 10, ); metastore @@ -1738,6 +1751,7 @@ mod tests { chunk_store.clone(), remote_fs, config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), ); compaction_service .compact_in_memory_chunks(partition.get_id()) @@ -1825,6 +1839,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), 50, ); @@ -1926,6 +1941,7 @@ mod tests { chunk_store.clone(), remote_fs.clone(), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), ); compaction_service .compact(partition.get_id(), DataLoadedSize::new()) @@ -1949,7 +1965,7 @@ mod tests { .await .unwrap(); let reader = Arc::new( - ParquetExec::try_from_path(local.as_str(), None, None, ROW_GROUP_SIZE, 1, None) + ParquetExec::try_from_path_with_cache(local.as_str(), None, None, ROW_GROUP_SIZE, 1, None, NoopParquetMetadataCache::new()) .unwrap(), ); let res_data = &collect(reader).await.unwrap()[0]; @@ -2148,6 +2164,7 @@ mod tests { struct MultiSplit { meta: Arc, fs: Arc, + metadata_cache_factory: Arc, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2163,6 +2180,7 @@ impl MultiSplit { fn new( meta: Arc, fs: Arc, + metadata_cache_factory: Arc, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2172,6 +2190,7 @@ impl MultiSplit { MultiSplit { meta, fs, + metadata_cache_factory, keys, key_len, multi_partition_id, @@ -2225,10 +2244,11 @@ impl MultiSplit { } }); - let store = ParquetTableStore::new(p.index.get_row().clone(), ROW_GROUP_SIZE); + let store = ParquetTableStore::new(p.index.get_row().clone(), ROW_GROUP_SIZE, self.metadata_cache_factory.clone()); let records = if !in_files.is_empty() { read_files( &in_files.into_iter().map(|(f, _)| f).collect::>(), + self.metadata_cache_factory.as_ref(), self.key_len, None, ) diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 5c203a6c68261..96796587e3c98 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -10,6 +10,7 @@ use datafusion::physical_plan::hash_aggregate::{ AggregateMode, AggregateStrategy, HashAggregateExec, }; use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::parquet::MetadataCacheFactory; use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr}; use serde::{de, Deserialize, Serialize}; extern crate bincode; @@ -182,6 +183,7 @@ pub struct ChunkStore { remote_fs: Arc, cluster: Arc, config: Arc, + metadata_cache_factory: Arc, memory_chunks: RwLock>, chunk_size: usize, } @@ -342,6 +344,7 @@ impl ChunkStore { remote_fs: Arc, cluster: Arc, config: Arc, + metadata_cache_factory: Arc, chunk_size: usize, ) -> Arc { let store = ChunkStore { @@ -349,6 +352,7 @@ impl ChunkStore { remote_fs, cluster, config, + metadata_cache_factory, memory_chunks: RwLock::new(HashMap::new()), chunk_size, }; @@ -588,8 +592,9 @@ impl ChunkDataStore for ChunkStore { )))]) } else { let (local_file, index) = self.download_chunk(chunk, partition, index).await?; + let metadata_cache_factory: Arc = self.metadata_cache_factory.clone(); Ok(cube_ext::spawn_blocking(move || -> Result<_, CubeError> { - let parquet = ParquetTableStore::new(index, ROW_GROUP_SIZE); + let parquet = ParquetTableStore::new(index, ROW_GROUP_SIZE, metadata_cache_factory); Ok(parquet.read_columns(&local_file)?) }) .await??) @@ -800,6 +805,7 @@ mod tests { use crate::{metastore::ColumnType, table::TableValue}; use arrow::array::{Int64Array, StringArray}; use cuberockstore::rocksdb::{Options, DB}; + use datafusion::physical_plan::parquet::BasicMetadataCacheFactory; use std::fs; use std::path::{Path, PathBuf}; @@ -938,6 +944,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), 10, ); @@ -1040,6 +1047,7 @@ mod tests { remote_fs.clone(), Arc::new(MockCluster::new()), config.config_obj(), + Arc::new(BasicMetadataCacheFactory::new()), 10, ); @@ -1368,8 +1376,9 @@ impl ChunkStore { let local_file = self.remote_fs.temp_upload_path(remote_path.clone()).await?; let local_file = scopeguard::guard(local_file, ensure_temp_file_is_dropped); let local_file_copy = local_file.clone(); + let metadata_cache_factory: Arc = self.metadata_cache_factory.clone(); cube_ext::spawn_blocking(move || -> Result<(), CubeError> { - let parquet = ParquetTableStore::new(index.get_row().clone(), ROW_GROUP_SIZE); + let parquet = ParquetTableStore::new(index.get_row().clone(), ROW_GROUP_SIZE, metadata_cache_factory); parquet.write_data(&local_file_copy, data)?; Ok(()) }) diff --git a/rust/cubestore/cubestore/src/table/parquet.rs b/rust/cubestore/cubestore/src/table/parquet.rs index fcf0be8396054..2b2be01fe02a0 100644 --- a/rust/cubestore/cubestore/src/table/parquet.rs +++ b/rust/cubestore/cubestore/src/table/parquet.rs @@ -4,7 +4,7 @@ use crate::CubeError; use arrow::array::ArrayRef; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; -use datafusion::physical_plan::parquet::{NoopParquetMetadataCache, ParquetMetadataCache}; +use datafusion::physical_plan::parquet::{MetadataCacheFactory, ParquetMetadataCache}; use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; use parquet::file::properties::{WriterProperties, WriterVersion}; use std::fs::File; @@ -36,15 +36,41 @@ impl CubestoreParquetMetadataCache for CubestoreParquetMetadataCacheImpl { } } +pub trait CubestoreMetadataCacheFactory: DIService + Send + Sync { + fn cache_factory(&self) -> Arc; +} + +pub struct CubestoreMetadataCacheFactoryImpl { + metadata_cache_factory: Arc, +} + +crate::di_service!( + CubestoreMetadataCacheFactoryImpl, + [CubestoreMetadataCacheFactory] +); + +impl CubestoreMetadataCacheFactoryImpl { + pub fn new(metadata_cache_factory: Arc) -> Arc { + Arc::new(CubestoreMetadataCacheFactoryImpl { metadata_cache_factory }) + } +} + +impl CubestoreMetadataCacheFactory for CubestoreMetadataCacheFactoryImpl { + fn cache_factory(&self) -> Arc { + self.metadata_cache_factory.clone() + } +} + pub struct ParquetTableStore { table: Index, row_group_size: usize, + metadata_cache_factory: Arc, } impl ParquetTableStore { pub fn read_columns(&self, path: &str) -> Result, CubeError> { let mut r = ParquetFileArrowReader::new(Arc::new( - NoopParquetMetadataCache::new().file_reader(path)?, + self.metadata_cache_factory.make_noop_cache().file_reader(path)?, )); let mut batches = Vec::new(); for b in r.get_record_reader(self.row_group_size)? { @@ -55,10 +81,11 @@ impl ParquetTableStore { } impl ParquetTableStore { - pub fn new(table: Index, row_group_size: usize) -> ParquetTableStore { + pub fn new(table: Index, row_group_size: usize, metadata_cache_factory: Arc) -> ParquetTableStore { ParquetTableStore { table, row_group_size, + metadata_cache_factory, } } @@ -77,16 +104,16 @@ impl ParquetTableStore { } pub fn writer_props(&self) -> WriterProperties { - WriterProperties::builder() + self.metadata_cache_factory.build_writer_props(WriterProperties::builder() .set_max_row_group_size(self.row_group_size) - .set_writer_version(WriterVersion::PARQUET_2_0) - .build() + .set_writer_version(WriterVersion::PARQUET_2_0)) } pub fn write_data(&self, dest_file: &str, columns: Vec) -> Result<(), CubeError> { let schema = Arc::new(arrow_schema(&self.table)); let batch = RecordBatch::try_new(schema.clone(), columns.to_vec())?; + // TODO: Just look for every place SerializedFileWriter is constructed and see if we missed one. let mut w = ArrowWriter::try_new(File::create(dest_file)?, schema, Some(self.writer_props()))?; w.write(&batch)?; @@ -116,6 +143,7 @@ mod tests { TimestampMicrosecondArray, }; use arrow::record_batch::RecordBatch; + use datafusion::physical_plan::parquet::BasicMetadataCacheFactory; use itertools::Itertools; use parquet::data_type::DataType; use parquet::file::reader::FileReader; @@ -153,7 +181,7 @@ mod tests { .unwrap(); let dest_file = NamedTempFile::new().unwrap(); - let store = ParquetTableStore::new(index, ROW_GROUP_SIZE); + let store = ParquetTableStore::new(index, ROW_GROUP_SIZE, Arc::new(BasicMetadataCacheFactory::new())); let data: Vec = vec![ Arc::new(StringArray::from(vec![ @@ -243,6 +271,7 @@ mod tests { ) .unwrap(), row_group_size: 10, + metadata_cache_factory: Arc::new(BasicMetadataCacheFactory::new()), }; let file = NamedTempFile::new().unwrap(); let file_name = file.path().to_str().unwrap(); @@ -302,7 +331,7 @@ mod tests { let count_min = compaction::write_to_files( to_stream(to_split_batch).await, to_split.len(), - ParquetTableStore::new(store.table.clone(), store.row_group_size), + ParquetTableStore::new(store.table.clone(), store.row_group_size, Arc::new(BasicMetadataCacheFactory::new())), vec![split_1.to_string(), split_2.to_string()], ) .await @@ -364,7 +393,7 @@ mod tests { ) .unwrap(); let tmp_file = NamedTempFile::new().unwrap(); - let store = ParquetTableStore::new(index.clone(), NUM_ROWS); + let store = ParquetTableStore::new(index.clone(), NUM_ROWS, Arc::new(BasicMetadataCacheFactory::new())); store .write_data( tmp_file.path().to_str().unwrap(), @@ -421,7 +450,7 @@ mod tests { let data = rows_to_columns(&index.columns(), &rows); - let w = ParquetTableStore::new(index.clone(), NUM_ROWS); + let w = ParquetTableStore::new(index.clone(), NUM_ROWS, Arc::new(BasicMetadataCacheFactory::new())); w.write_data(file, data.clone()).unwrap(); let r = concat_record_batches(&w.read_columns(file).unwrap()); assert_eq_columns!(r.columns(), &data);