Skip to content

Commit

Permalink
fix(mito): pruning for mito2 (GreptimeTeam#2525)
Browse files Browse the repository at this point in the history
* fix: pruning for mito2

* chore: refactor projection parameters; add some tests; customize row group size for each flush task.

* chore: pass whole RegionFlushRequest

---------

Co-authored-by: Lei, HUANG <[email protected]>
  • Loading branch information
evenyag and v0y4g3r authored Oct 8, 2023
1 parent 0292445 commit 0593c3b
Show file tree
Hide file tree
Showing 18 changed files with 531 additions and 49 deletions.
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod prune_test;
#[cfg(test)]
mod truncate_test;

use std::sync::Arc;
Expand Down
14 changes: 12 additions & 2 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ async fn put_and_flush(
put_rows(engine, region_id, rows).await;

let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap()
else {
Expand Down Expand Up @@ -79,7 +84,12 @@ async fn delete_and_flush(
assert_eq!(row_cnt, rows_affected);

let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap()
else {
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/drop_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn test_engine_drop_region() {
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;

// drop the created region.
engine
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn test_manual_flush() {
};
put_rows(&engine, region_id, rows).await;

flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;

let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
Expand Down Expand Up @@ -164,7 +164,7 @@ async fn test_write_stall() {
tokio::spawn(async move {
listener.wait().await;

flush_region(&engine_cloned, region_id).await;
flush_region(&engine_cloned, region_id, None).await;
});

// Triggers write stall.
Expand Down Expand Up @@ -212,7 +212,7 @@ async fn test_flush_empty() {
.await
.unwrap();

flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;

let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
Expand Down Expand Up @@ -247,7 +247,7 @@ async fn test_flush_reopen_region() {
};
put_rows(&engine, region_id, rows).await;

flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;
let check_region = || {
let region = engine.get_region(region_id).unwrap();
let version_data = region.version_control.current();
Expand Down
102 changes: 102 additions & 0 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::Rows;
use common_query::logical_plan::DfExpr;
use common_query::prelude::Expr;
use common_recordbatch::RecordBatches;
use datafusion_common::ScalarValue;
use datafusion_expr::lit;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};

async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 10),
},
)
.await;
flush_region(&engine, region_id, Some(5)).await;

let stream = engine
.handle_query(
region_id,
ScanRequest {
filters: vec![Expr::from(expr)],
..Default::default()
},
)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_read_parquet_stats() {
common_telemetry::init_default_ut_logging();

check_prune_row_groups(
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;

check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
}
14 changes: 12 additions & 2 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,12 @@ async fn test_engine_truncate_after_flush() {

// Flush the region.
engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();

Expand Down Expand Up @@ -304,7 +309,12 @@ async fn test_engine_truncate_during_flush() {
let flush_task = tokio::spawn(async move {
info!("do flush task!!!!");
engine_cloned
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
});

Expand Down
7 changes: 6 additions & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ pub(crate) struct RegionFlushTask {
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
pub(crate) row_group_size: Option<usize>,
}

impl RegionFlushTask {
Expand Down Expand Up @@ -272,7 +273,10 @@ impl RegionFlushTask {
/// Flushes memtables to level 0 SSTs.
async fn flush_memtables(&self, version: &VersionRef) -> Result<Vec<FileMeta>> {
// TODO(yingwen): Make it configurable.
let write_opts = WriteOptions::default();
let mut write_opts = WriteOptions::default();
if let Some(row_group_size) = self.row_group_size {
write_opts.row_group_size = row_group_size;
}
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());

Expand Down Expand Up @@ -689,6 +693,7 @@ mod tests {
memtable_builder: builder.memtable_builder(),
file_purger: builder.file_purger(),
listener: WorkerListener::default(),
row_group_size: None,
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ impl ScanRegion {
.collect();

debug!(
"Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}",
"Seq scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, total_ssts: {}",
self.version.metadata.region_id,
self.request,
memtables.len(),
files.len(),
total_ssts
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
mod format;
pub mod reader;
mod stats;
pub mod writer;

use common_base::readable_size::ReadableSize;
Expand Down
Loading

0 comments on commit 0593c3b

Please sign in to comment.