Skip to content

Commit

Permalink
feat: use dictionary type to store column (#993)
Browse files Browse the repository at this point in the history
## Rationale
close #1039

## Detailed Changes
Add dictionary type 

## Test Plan
ut.
  • Loading branch information
tanruixiang authored Jul 5, 2023
1 parent 9bd8ae1 commit 4a7c6e4
Show file tree
Hide file tree
Showing 20 changed files with 913 additions and 82 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.5"
ceresdbproto = "1.0"
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand Down
20 changes: 19 additions & 1 deletion analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,25 @@ impl HybridRecordDecoder {
.iter()
.map(|f| {
if let DataType::List(nested_field) = f.data_type() {
Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true))
match f.data_type() {
DataType::Dictionary(_, _) => {
assert!(f.dict_id().is_some(), "Dictionary must have dict_id");
assert!(
f.dict_is_ordered().is_some(),
"Dictionary must have dict_is_ordered"
);
let dict_id = f.dict_id().unwrap();
let dict_is_ordered = f.dict_is_ordered().unwrap();
Arc::new(Field::new_dict(
f.name(),
nested_field.data_type().clone(),
true,
dict_id,
dict_is_ordered,
))
}
_ => Arc::new(Field::new(f.name(), nested_field.data_type().clone(), true)),
}
} else {
f.clone()
}
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef {
field.data_type().clone(),
true,
)));
// TODO(tanruixiang): is there need to use new_dict?
Arc::new(Field::new(field.name(), field_type, true))
} else {
field.clone()
Expand Down Expand Up @@ -418,6 +419,7 @@ impl ListArrayBuilder {
let array_len = self.multi_row_arrays.len();
let mut offsets = MutableBuffer::new(array_len * std::mem::size_of::<i32>());
let child_data = self.build_child_data(&mut offsets)?;
// TODO(tanruixiang): is there need to use new_dict?
let field = Arc::new(Field::new(
LIST_ITEM_NAME,
self.datum_kind.to_arrow_data_type(),
Expand Down
94 changes: 80 additions & 14 deletions analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ mod tests {
use common_types::{
bytes::Bytes,
projected_schema::ProjectedSchema,
tests::{build_row, build_schema},
tests::{build_row, build_row_for_dictionary, build_schema, build_schema_with_dictionary},
time::{TimeRange, Timestamp},
};
use common_util::{
Expand Down Expand Up @@ -365,9 +365,10 @@ mod tests {
init_log_for_test();

let runtime = Arc::new(runtime::Builder::default().build().unwrap());
parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3]);
parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 3]);
parquet_write_and_then_read_back(runtime, 5, vec![5, 5, 5]);
parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]);
parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3, 3, 2]);
parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4, 4]);
parquet_write_and_then_read_back(runtime, 5, vec![5, 5, 5, 5]);
}

fn parquet_write_and_then_read_back(
Expand All @@ -390,8 +391,8 @@ mod tests {
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from("data.par");

let schema = build_schema();
let projected_schema = ProjectedSchema::no_projection(schema.clone());
let schema = build_schema_with_dictionary();
let reader_projected_schema = ProjectedSchema::no_projection(schema.clone());
let sst_meta = MetaData {
min_key: Bytes::from_static(b"100"),
max_key: Bytes::from_static(b"200"),
Expand All @@ -410,9 +411,37 @@ mod tests {
// reach here when counter is 9 7 5 3 1
let ts = 100 + counter;
let rows = vec![
build_row(b"a", ts, 10.0, "v4", 1000, 1_000_000),
build_row(b"b", ts, 10.0, "v4", 1000, 1_000_000),
build_row(b"c", ts, 10.0, "v4", 1000, 1_000_000),
build_row_for_dictionary(
b"a",
ts,
10.0,
"v4",
1000,
1_000_000,
Some("tagv1"),
"tagv2",
),
build_row_for_dictionary(
b"b",
ts,
10.0,
"v4",
1000,
1_000_000,
Some("tagv2"),
"tagv4",
),
build_row_for_dictionary(b"c", ts, 10.0, "v4", 1000, 1_000_000, None, "tagv2"),
build_row_for_dictionary(
b"d",
ts,
10.0,
"v4",
1000,
1_000_000,
Some("tagv3"),
"tagv2",
),
];
let batch = build_record_batch_with_key(schema.clone(), rows);
Poll::Ready(Some(Ok(batch)))
Expand All @@ -432,15 +461,15 @@ mod tests {
.await
.unwrap();

assert_eq!(15, sst_info.row_num);
assert_eq!(20, sst_info.row_num);

let scan_options = ScanOptions::default();
// read sst back to test
let sst_read_options = SstReadOptions {
reverse: false,
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: 5,
projected_schema,
projected_schema: reader_projected_schema,
predicate: Arc::new(Predicate::empty()),
meta_cache: None,
scan_options,
Expand Down Expand Up @@ -483,9 +512,46 @@ mod tests {
let mut stream = reader.read().await.unwrap();
let mut expect_rows = vec![];
for counter in &[4, 3, 2, 1, 0] {
expect_rows.push(build_row(b"a", 100 + counter, 10.0, "v4", 1000, 1_000_000));
expect_rows.push(build_row(b"b", 100 + counter, 10.0, "v4", 1000, 1_000_000));
expect_rows.push(build_row(b"c", 100 + counter, 10.0, "v4", 1000, 1_000_000));
expect_rows.push(build_row_for_dictionary(
b"a",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
Some("tagv1"),
"tagv2",
));
expect_rows.push(build_row_for_dictionary(
b"b",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
Some("tagv2"),
"tagv4",
));
expect_rows.push(build_row_for_dictionary(
b"c",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
None,
"tagv2",
));
expect_rows.push(build_row_for_dictionary(
b"d",
100 + counter,
10.0,
"v4",
1000,
1_000_000,
Some("tagv3"),
"tagv2",
));
}
check_stream(&mut stream, expect_rows).await;
});
Expand Down
Loading

0 comments on commit 4a7c6e4

Please sign in to comment.