Skip to content

Commit

Permalink
WIP: ParquetTablestore having MetadataCacheFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Aug 23, 2024
1 parent 2bfe6d4 commit 3d7348f
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 32 deletions.
28 changes: 19 additions & 9 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::store::compaction::{CompactionService, CompactionServiceImpl};
use crate::store::{ChunkDataStore, ChunkStore, WALDataStore, WALStore};

Check warning on line 36 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 36 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
use crate::streaming::kafka::{KafkaClientService, KafkaClientServiceImpl};
use crate::streaming::{KsqlClient, KsqlClientImpl, StreamingService, StreamingServiceImpl};
use crate::table::parquet::{CubestoreParquetMetadataCache, CubestoreParquetMetadataCacheImpl};
use crate::table::parquet::{CubestoreMetadataCacheFactory, CubestoreMetadataCacheFactoryImpl, CubestoreParquetMetadataCache, CubestoreParquetMetadataCacheImpl};
use crate::telemetry::tracing::{TracingHelper, TracingHelperImpl};
use crate::telemetry::{
start_agent_event_loop, start_track_event_loop, stop_agent_event_loop, stop_track_event_loop,
Expand All @@ -45,7 +45,7 @@ use crate::util::memory::{MemoryHandler, MemoryHandlerImpl};
use crate::CubeError;

Check warning on line 45 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 45 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
use cuberockstore::rocksdb::{Options, DB};
use datafusion::cube_ext;
use datafusion::physical_plan::parquet::{LruParquetMetadataCache, NoopParquetMetadataCache};
use datafusion::physical_plan::parquet::{BasicMetadataCacheFactory, LruParquetMetadataCache, NoopParquetMetadataCache};
use futures::future::join_all;
use log::Level;
use log::{debug, error};
Expand Down Expand Up @@ -534,8 +534,8 @@ pub trait ConfigObj: DIService {

Check warning on line 534 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 534 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
fn remote_files_cleanup_interval_secs(&self) -> u64;

fn local_files_cleanup_size_threshold(&self) -> u64;

fn local_files_cleanup_size_threshold(&self) -> u64
;
fn local_files_cleanup_delay_secs(&self) -> u64;

fn remote_files_cleanup_delay_secs(&self) -> u64;
Expand Down Expand Up @@ -2000,13 +2000,21 @@ impl Config {
})

Check warning on line 2000 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 2000 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
.await;

self.injector
.register_typed::<dyn CubestoreMetadataCacheFactory, _, _, _>(async move |i| {
CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new()))
})
.await;

Check warning on line 2008 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 2008 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
self.injector
.register_typed::<dyn ChunkDataStore, _, _, _>(async move |i| {
let metadata_cache_factory = i.get_service_typed::<dyn CubestoreMetadataCacheFactory>().await.cache_factory();
ChunkStore::new(
i.get_service_typed().await,
i.get_service_typed().await,
i.get_service_typed().await,
i.get_service_typed().await,
metadata_cache_factory,
i.get_service_typed::<dyn ConfigObj>()
.await
.wal_split_threshold() as usize,
Expand All @@ -2017,25 +2025,25 @@ impl Config {
self.injector

Check warning on line 2025 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 2025 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
.register_typed::<dyn CubestoreParquetMetadataCache, _, _, _>(async move |i| {
let c = i.get_service_typed::<dyn ConfigObj>().await;
let metadata_cache_factory = i.get_service_typed::<dyn CubestoreMetadataCacheFactory>().await.cache_factory();
CubestoreParquetMetadataCacheImpl::new(
match c.metadata_cache_max_capacity_bytes() {
0 => NoopParquetMetadataCache::new(),
max_cached_metadata => LruParquetMetadataCache::new(
max_cached_metadata,
Duration::from_secs(c.metadata_cache_time_to_idle_secs()),
),
0 => metadata_cache_factory.make_noop_cache(),
max_cached_metadata => metadata_cache_factory.make_lru_cache(max_cached_metadata, Duration::from_secs(c.metadata_cache_time_to_idle_secs())),

Check warning on line 2032 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 2032 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
},
)
})
.await;

Check warning on line 2037 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 2037 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
self.injector
.register_typed::<dyn CompactionService, _, _, _>(async move |i| {
let metadata_cache_factory = i.get_service_typed::<dyn CubestoreMetadataCacheFactory>().await.cache_factory();
CompactionServiceImpl::new(
i.get_service_typed().await,
i.get_service_typed().await,
i.get_service_typed().await,
i.get_service_typed().await,
metadata_cache_factory,
)
})
.await;
Expand Down Expand Up @@ -2160,11 +2168,13 @@ impl Config {
let query_cache_to_move = query_cache.clone();

Check warning on line 2168 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs

Check warning on line 2168 in rust/cubestore/cubestore/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/config/mod.rs
self.injector
.register_typed::<dyn QueryPlanner, _, _, _>(async move |i| {
let metadata_cache_factory = i.get_service_typed::<dyn CubestoreMetadataCacheFactory>().await.cache_factory();
QueryPlannerImpl::new(
i.get_service_typed().await,
i.get_service_typed().await,
i.get_service_typed().await,
query_cache_to_move,
metadata_cache_factory,
)
})
.await;
Expand Down
5 changes: 5 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod optimizations;
pub mod panic;
mod partition_filter;
mod planning;
use datafusion::physical_plan::parquet::MetadataCacheFactory;
pub use planning::PlanningMeta;
mod check_memory;
pub mod physical_plan_flags;
Expand Down Expand Up @@ -98,6 +99,7 @@ pub struct QueryPlannerImpl {
cache_store: Arc<dyn CacheStore>,
config: Arc<dyn ConfigObj>,
cache: Arc<SqlResultCache>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
}

crate::di_service!(QueryPlannerImpl, [QueryPlanner]);
Expand Down Expand Up @@ -179,12 +181,14 @@ impl QueryPlannerImpl {
cache_store: Arc<dyn CacheStore>,
config: Arc<dyn ConfigObj>,
cache: Arc<SqlResultCache>,
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
) -> Arc<QueryPlannerImpl> {
Arc::new(QueryPlannerImpl {
meta_store,
cache_store,
config,
cache,
metadata_cache_factory,
})
}
}
Expand All @@ -193,6 +197,7 @@ impl QueryPlannerImpl {
async fn execution_context(&self) -> Result<Arc<ExecutionContext>, CubeError> {
Ok(Arc::new(ExecutionContext::with_config(
ExecutionConfig::new()
.with_metadata_cache_factory(self.metadata_cache_factory.clone())
.add_optimizer_rule(Arc::new(MaterializeNow {}))
.add_optimizer_rule(Arc::new(FlattenUnion {})),
)))
Expand Down
6 changes: 3 additions & 3 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl QueryExecutorImpl {
serialized_plan: Arc<SerializedPlan>,

Check warning on line 327 in rust/cubestore/cubestore/src/queryplanner/query_executor.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Check warning on line 327 in rust/cubestore/cubestore/src/queryplanner/query_executor.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/queryplanner/query_executor.rs
) -> Result<Arc<ExecutionContext>, CubeError> {
Ok(Arc::new(ExecutionContext::with_config(
ExecutionConfig::new()
ExecutionConfig::new() // TODO: Set encryption param
.with_batch_size(4096)
.with_concurrency(1)
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_router(
Expand All @@ -344,7 +344,7 @@ impl QueryExecutorImpl {
data_loaded_size: Option<Arc<DataLoadedSize>>,
) -> Result<Arc<ExecutionContext>, CubeError> {
Ok(Arc::new(ExecutionContext::with_config(
ExecutionConfig::new()
ExecutionConfig::new() // TODO: Set encryption param
.with_batch_size(4096)
.with_concurrency(1)
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_worker(
Expand All @@ -365,7 +365,7 @@ pub struct CubeTable {
worker_partition_ids: Vec<(u64, RowFilter)>,
#[serde(skip, default)]
chunk_id_to_record_batches: HashMap<u64, Vec<RecordBatch>>,
#[serde(skip, default = "NoopParquetMetadataCache::new")]
#[serde(skip, default = "NoopParquetMetadataCache::new")] // TODO: Triple check the initialization of this field.
parquet_metadata_cache: Arc<dyn ParquetMetadataCache>,
}

Expand Down
2 changes: 2 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/topk/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ mod tests {
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::ExecutionPlan;
use futures::StreamExt;
Expand Down Expand Up @@ -1240,6 +1241,7 @@ mod tests {
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
metadata_cache_factory: Arc::new(BasicMetadataCacheFactory::new()),
};
let agg_exprs = aggs
.iter()
Expand Down
4 changes: 4 additions & 0 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,7 @@ mod tests {
use crate::store::compaction::CompactionService;
use async_compression::tokio::write::GzipEncoder;
use cuberockstore::rocksdb::{Options, DB};
use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
use futures_timer::Delay;
use itertools::Itertools;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -1723,6 +1724,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down Expand Up @@ -1800,6 +1802,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down Expand Up @@ -1907,6 +1910,7 @@ mod tests {
remote_fs.clone(),
Arc::new(MockCluster::new()),
config.config_obj(),
Arc::new(BasicMetadataCacheFactory::new()),
rows_per_chunk,
);
let limits = Arc::new(ConcurrencyLimits::new(4));
Expand Down
Loading

0 comments on commit 3d7348f

Please sign in to comment.