Skip to content

Commit

Permalink
Fix ancestor_nulls.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Dec 10, 2024
1 parent 03df851 commit 815442d
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 22 deletions.
30 changes: 25 additions & 5 deletions be/src/vec/exec/format/parquet/parquet_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ostream>
#include <regex>
#include <string>
#include <unordered_set>
#include <vector>

#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -102,13 +103,15 @@ class ColumnSelectVector {
ColumnSelectVector() = default;

Status init(const std::vector<uint16_t>& run_length_null_map, size_t num_values,
NullMap* null_map, FilterMap* filter_map, size_t filter_map_index) {
NullMap* null_map, FilterMap* filter_map, size_t filter_map_index,
const std::unordered_set<size_t>* skipped_indices = nullptr) {
_num_values = num_values;
_num_nulls = 0;
_read_index = 0;
size_t map_index = 0;
bool is_null = false;
_has_filter = filter_map->has_filter();

if (filter_map->has_filter()) {
// No run length null map is generated when _filter_all = true
DCHECK(!filter_map->filter_all());
Expand All @@ -126,19 +129,36 @@ class ColumnSelectVector {
}
is_null = !is_null;
}

size_t num_read = 0;
DCHECK_LE(filter_map_index + num_values, filter_map->filter_map_size());
for (size_t i = 0; i < num_values; ++i) {
if (filter_map->filter_map_data()[filter_map_index++]) {
_data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : CONTENT;
size_t i = 0;
size_t valid_count = 0;

while (valid_count < num_values) {
DCHECK_LT(filter_map_index + i, filter_map->filter_map_size());

if (skipped_indices != nullptr &&
skipped_indices->count(filter_map_index + i) > 0) {
++i;
continue;
}

if (filter_map->filter_map_data()[filter_map_index + i]) {
_data_map[valid_count] =
_data_map[valid_count] == FILTERED_NULL ? NULL_DATA : CONTENT;
num_read++;
}
++valid_count;
++i;
}

_num_filtered = num_values - num_read;

if (null_map != nullptr && num_read > 0) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_read);

if (_num_nulls == 0) {
memset(map_data_column.data() + null_map_index, 0, num_read);
} else if (_num_nulls == num_values) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
size_t nonnull_size = 0;
null_map.emplace_back(0);
bool prev_is_null = false;
std::unordered_set<size_t> ancestor_null_indices;

while (has_read < origin_size + parsed_values) {
level_t def_level = _def_levels[has_read++];
Expand All @@ -421,6 +422,9 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
}

if (def_level < _field_schema->repeated_parent_def_level) {
for (size_t i = 0; i < loop_read; i++) {
ancestor_null_indices.insert(has_read - loop_read + i);
}
ancestor_nulls += loop_read;
continue;
}
Expand Down Expand Up @@ -469,7 +473,8 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
SCOPED_RAW_TIMER(&_decode_null_map_time);
RETURN_IF_ERROR(
select_vector.init(null_map, num_values, map_data_column, current_filter_map,
_nested_filter_map_data ? origin_size : _filter_map_index));
_nested_filter_map_data ? origin_size : _filter_map_index,
&ancestor_null_indices));
}

RETURN_IF_ERROR(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,19 @@ LOCATION
'/user/doris/suites/multi_catalog/nested_cross_page2_parquet';

msck repair table nested_cross_page2_parquet;

CREATE TABLE `nested_cross_page3_parquet`(
`id` int,
`array_col` array<int>,
`description` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'/user/doris/suites/multi_catalog/nested_cross_page3_parquet';

msck repair table nested_cross_page3_parquet;

Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import pyarrow as pa
import pyarrow.parquet as pq
import subprocess
import argparse
import json

# Define the output file path as a constant
OUTPUT_PARQUET_FILE = 'nested_cross_page_test3.parquet'

def generate_cross_page_test_data(output_file):
# Create test data
data = {
# id column (INT32)
'id': [1, None, 3],

# array column (ARRAY<INT>)
'array_col': [
# Row 1 - Large array to force cross-page
[1, None, 3, 4, 5] * 200, # 1000 elements

# Row 2 - Null array
None,

# Row 3 - Another large array with nulls
[6, None, 8, None, 10] * 200 # 1000 elements
],

# description column (STRING)
'description': [
'This is a large array with repeated sequence [1,null,3,4,5]',
None,
'This is another large array with repeated sequence [6,null,8,null,10]'
]
}

# Create table structure
table = pa.Table.from_pydict({
'id': pa.array(data['id'], type=pa.int32()),
'array_col': pa.array(data['array_col'], type=pa.list_(pa.int32())),
'description': pa.array(data['description'], type=pa.string())
})

# Write to parquet file
pq.write_table(
table,
output_file,
compression=None, # No compression for predictable size
version='2.6',
write_statistics=True,
row_group_size=3, # All data in one row group
data_page_size=100, # Very small page size
write_batch_size=10 # Small batch size
)

def inspect_parquet_file(file_path):
"""Inspect the structure of generated parquet file"""
pf = pq.ParquetFile(file_path)
print(f"\nFile: {file_path}")
print(f"Number of row groups: {pf.num_row_groups}")

metadata = pf.metadata
schema = pf.schema
print(f"\nSchema: {schema}")
print(f"\nDetailed schema:")
for i in range(len(schema)):
print(f"Column {i}: {schema[i]}")

for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
print(f"\nRow Group {i}:")
print(f"Num rows: {rg.num_rows}")

for j in range(rg.num_columns):
col = rg.column(j)
print(f"\nColumn {j}:")
print(f"Path: {schema[j].name}")
print(f"Type: {col.physical_type}")
print(f"Encodings: {col.encodings}")
print(f"Total compressed size: {col.total_compressed_size}")
print(f"Total uncompressed size: {col.total_uncompressed_size}")
print(f"Number of values: {col.num_values}")
print(f"Data page offset: {col.data_page_offset}")
if col.dictionary_page_offset is not None:
print(f"Dictionary page offset: {col.dictionary_page_offset}")

def read_and_print_file(file_path):
"""Read and print file content"""
table = pq.read_table(file_path)
df = table.to_pandas()
print("\nFile content:")
for i in range(len(df)):
print(f"\nRow {i}:")
print(f"ID: {df.iloc[i]['id']}")
arr = df.iloc[i]['array_col']
if arr is not None:
print(f"Array length: {len(arr)}")
print(f"First few elements: {arr[:5]}...")
print(f"Last few elements: ...{arr[-5:]}")
else:
print("Array: None")
print(f"Description: {df.iloc[i]['description']}")

def inspect_pages_with_cli(file_path, parquet_cli_path=None):
"""
Inspect page information using parquet-cli
Args:
file_path: Path to the parquet file
parquet_cli_path: Optional path to parquet-cli jar file
"""
if not parquet_cli_path:
print("\nSkipping parquet-cli inspection: No parquet-cli path provided")
return

print("\nParquet CLI Output:")
try:
cmd = f"java -jar {parquet_cli_path} pages {file_path}"
result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f"Error running parquet-cli: {e}")
if e.output:
print(f"Error output: {e.output}")
except Exception as e:
print(f"Unexpected error running parquet-cli: {e}")

def save_test_data_info(output_file):
"""Save detailed test data information to text file"""
info = {
"file_format": "Parquet",
"version": "2.6",
"compression": "None",
"row_group_size": 3,
"data_page_size": 100,
"write_batch_size": 10,
"output_file": output_file,
"schema": {
"id": "INT32",
"array_col": "ARRAY<INT32>",
"description": "STRING"
},
"test_cases": [
{
"row": 1,
"description": "Large array with nulls",
"characteristics": [
"1000 elements",
"Repeated sequence [1,null,3,4,5]",
"Forces cross-page scenario"
]
},
{
"row": 2,
"description": "Null array and values",
"characteristics": [
"Entire array is null",
"ID is null",
"Description is null"
]
},
{
"row": 3,
"description": "Another large array with nulls",
"characteristics": [
"1000 elements",
"Repeated sequence [6,null,8,null,10]",
"Forces cross-page scenario"
]
}
]
}

info_file = output_file.replace('.parquet', '_info.json')
with open(info_file, 'w') as f:
json.dump(info, f, indent=2)

if __name__ == '__main__':
# Add command line argument parsing
parser = argparse.ArgumentParser(description='Generate and inspect parquet test data')
parser.add_argument('--parquet-cli',
help='Path to parquet-cli jar file',
default=None)
parser.add_argument('--output',
help='Output parquet file path',
default=OUTPUT_PARQUET_FILE)
args = parser.parse_args()

# Use the output file path from command line or default
output_file = args.output

generate_cross_page_test_data(output_file)
inspect_parquet_file(output_file)
read_and_print_file(output_file)
inspect_pages_with_cli(output_file, args.parquet_cli)
save_test_data_info(output_file)

Loading

0 comments on commit 815442d

Please sign in to comment.