Skip to content

Commit

Permalink
simplify parquet_hybrid_test.py to speedup the execution (#7)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <[email protected]>
  • Loading branch information
sperlingxx authored May 30, 2024
1 parent 1e55c64 commit f344b84
Showing 1 changed file with 9 additions and 115 deletions.
124 changes: 9 additions & 115 deletions integration_tests/src/main/python/parquet_hybrid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,6 @@
from pyspark.sql.functions import *
from spark_session import *


def read_parquet_df(data_path):
return lambda spark : spark.read.parquet(data_path)

def read_parquet_sql(data_path):
return lambda spark : spark.sql('select * from parquet.`{}`'.format(data_path))


rebase_write_corrected_conf = {
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'
}

rebase_write_legacy_conf = {
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY',
'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'LEGACY'
}

# Like the standard map_gens_sample but with timestamps limited
parquet_map_gens = [MapGen(f(nullable=False), f()) for f in [
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen,
Expand All @@ -62,92 +44,6 @@ def read_parquet_sql(data_path):
] + parquet_map_gens
]

pattern = "[a-z0-9A-Z]{1,10}"
array_string_debug = [
[ArrayGen(StringGen(pattern=pattern, nullable=False), nullable=False, max_length=3)],
[ArrayGen(StringGen(pattern=pattern, nullable=(True, 50.0)), nullable=False, max_length=3)],
[ArrayGen(StringGen(pattern=pattern, nullable=False), nullable=(True, 50.0), max_length=3)],
[ArrayGen(StringGen(pattern=pattern, nullable=(True, 50.0)), nullable=(True, 50.0), max_length=3)],
[ArrayGen(StringGen(pattern=pattern, nullable=False), nullable=False)],
[ArrayGen(StringGen(pattern=pattern, nullable=(True, 50.0)), nullable=False)],
[ArrayGen(StringGen(pattern=pattern, nullable=False), nullable=(True, 50.0))],
[ArrayGen(StringGen(pattern=pattern, nullable=(True, 50.0)), nullable=(True, 50.0))],
[StringGen(pattern=pattern, nullable=False)],
[StringGen(pattern=pattern, nullable=(True, 50.0))],
]

map_simple_debug = [[g] for g in all_basic_map_gens] + [
[MapGen(StringGen(pattern='key_[0-9]', nullable=False),
StringGen(nullable=(True, 50.0)),
max_length=3,
nullable=(True, 50.0))
]
]

map_nested_debug = [
[ArrayGen(ArrayGen(LongGen(), max_length=5), max_length=5)],
[ArrayGen(StringGen(pattern=pattern), max_length=5)],
[ArrayGen(ArrayGen(StringGen(pattern=pattern), max_length=3, nullable=False), max_length=3, nullable=False)],
[ArrayGen(ArrayGen(StringGen(pattern=pattern), max_length=3), max_length=3)],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(StringGen(pattern=pattern)), max_length=10)],
[MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10)],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)],
[MapGen(IntegerGen(False), ArrayGen(int_gen, max_length=3), max_length=3)],
[MapGen(ByteGen(False), MapGen(FloatGen(False), date_gen, max_length=3), max_length=3)],
]

struct_debug = [
[all_basic_struct_gen],
]



# test with original parquet file reader, the multi-file parallel reader for cloud, and coalesce file reader for
# non-cloud
original_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}
multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED',
'spark.rapids.sql.reader.multithreaded.combine.sizeBytes': '0',
'spark.rapids.sql.reader.multithreaded.read.keepOrder': True}
coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'}
coalesce_parquet_file_reader_multithread_filter_chunked_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING',
'spark.rapids.sql.coalescing.reader.numFilterParallel': '2',
'spark.rapids.sql.reader.chunked': True}
coalesce_parquet_file_reader_multithread_filter_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING',
'spark.rapids.sql.coalescing.reader.numFilterParallel': '2',
'spark.rapids.sql.reader.chunked': False}
native_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE',
'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'}
native_multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED',
'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE',
'spark.rapids.sql.reader.multithreaded.combine.sizeBytes': '0',
'spark.rapids.sql.reader.multithreaded.read.keepOrder': True}
native_coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING',
'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'}
native_coalesce_parquet_file_reader_chunked_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING',
'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE',
'spark.rapids.sql.reader.chunked': True}
combining_multithreaded_parquet_file_reader_conf_ordered = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED',
'spark.rapids.sql.reader.multithreaded.combine.sizeBytes': '64m',
'spark.rapids.sql.reader.multithreaded.read.keepOrder': True}
combining_multithreaded_parquet_file_reader_conf_unordered = pytest.param({'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED',
'spark.rapids.sql.reader.multithreaded.combine.sizeBytes': '64m',
'spark.rapids.sql.reader.multithreaded.read.keepOrder': False}, marks=pytest.mark.ignore_order(local=True))
combining_multithreaded_parquet_file_reader_deprecated_conf_ordered = {
'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED',
'spark.rapids.sql.format.parquet.multithreaded.combine.sizeBytes': '64m',
'spark.rapids.sql.format.parquet.multithreaded.read.keepOrder': True}


# For now the native configs are not compatible with spark.sql.parquet.writeLegacyFormat written files
# for nested types
reader_opt_confs_native = [native_multithreaded_parquet_file_reader_conf]

reader_opt_confs_no_native = [multithreaded_parquet_file_reader_conf,
combining_multithreaded_parquet_file_reader_conf_ordered,
combining_multithreaded_parquet_file_reader_deprecated_conf_ordered]

reader_opt_confs = reader_opt_confs_native + reader_opt_confs_no_native

hybrid_opt_confs = [
{
'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED',
Expand All @@ -162,27 +58,28 @@ def read_parquet_sql(data_path):
}
for unsafe_decompress in ["true", "false"]
for dict_lat_mat in ["false", "true"]
for bs in [256, 256 << 3, 256 << 6, 256 << 10, 256 << 12, 256 << 14]
for bs in [256, 256 << 4, 256 << 10]
]


@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df])
@pytest.mark.parametrize('reader_confs', hybrid_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["parquet", ""])
@pytest.mark.parametrize('length', [10, 100, 500, 1000, 2000])
@pytest.mark.parametrize('v1_enabled_list', ["parquet"])
@pytest.mark.parametrize('length', [10, 100, 500])
@pytest.mark.parametrize('parquet_codec', ["zstd", "snappy", "uncompressed"])
@pytest.mark.parametrize('parquet_block_size', [1048576 * 8, 1048576 * 32, 1048576 * 128])
@pytest.mark.parametrize('parquet_block_size', [1048576 * 8, 1048576 * 128])
@tz_sensitive_test
@allow_non_gpu(*non_utc_allow)
def test_parquet_read_round_trip(spark_tmp_path,
parquet_gens,
read_func,
reader_confs,
v1_enabled_list,
length,
parquet_codec,
parquet_block_size):
rebase_write_corrected_conf = {
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'
}
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
Expand All @@ -197,9 +94,6 @@ def test_parquet_read_round_trip(spark_tmp_path,
conf=rebase_write_corrected_conf)
all_confs = copy_and_update(reader_confs, {
'spark.sql.sources.useV1SourceList': v1_enabled_list,
# set the int96 rebase mode values because its LEGACY in databricks which will preclude this op from running on GPU
'spark.sql.legacy.parquet.int96RebaseModeInRead' : 'CORRECTED',
'spark.sql.legacy.parquet.datetimeRebaseModeInRead': 'CORRECTED'})
# once https://github.com/NVIDIA/spark-rapids/issues/1126 is in we can remove spark.sql.legacy.parquet.datetimeRebaseModeInRead config which is a workaround
# for nested timestamp/date support
assert_gpu_and_cpu_are_equal_collect(read_func(data_path), conf=all_confs)
assert_gpu_and_cpu_are_equal_collect(lambda spark : spark.read.parquet(data_path), conf=all_confs)

0 comments on commit f344b84

Please sign in to comment.