Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 4: Fix parquet_test.py [databricks] #11519

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ def test_parquet_read_buffer_allocation_empty_blocks(spark_tmp_path, v1_enabled_
lambda spark : spark.read.parquet(data_path).filter("id < 2 or id > 990"),
conf=all_confs)


@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/7733")
Expand Down Expand Up @@ -797,6 +799,8 @@ def test_parquet_read_nano_as_longs_true(std_input_path):
'FileSourceScanExec',
conf=conf)


@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
def test_many_column_project():
def _create_wide_data_frame(spark, num_cols):
schema_dict = {}
Expand Down Expand Up @@ -1285,27 +1289,64 @@ def test_parquet_read_case_insensitivity(spark_tmp_path):
)


# test read INT32 as INT8/INT16/Date
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_int32_downcast(spark_tmp_path, reader_confs, v1_enabled_list):
def run_test_parquet_int32_downcast(spark_tmp_path,
reader_confs,
v1_enabled_list,
ansi_conf):
"""
This tests whether Parquet files with columns written as INT32 can be
read as having INT8, INT16 and DATE columns, with ANSI mode enabled/disabled.
"""
data_path = spark_tmp_path + '/PARQUET_DATA'
write_schema = [("d", date_gen), ('s', short_gen), ('b', byte_gen)]

# For test setup, write with ANSI disabled.
# Otherwise, CAST(d AS INT) will fail on Spark CPU.
with_cpu_session(
lambda spark: gen_df(spark, write_schema).selectExpr(
"cast(d as Int) as d",
"cast(s as Int) as s",
"cast(b as Int) as b").write.parquet(data_path))
"cast(b as Int) as b").write.parquet(data_path), conf=ansi_disabled_conf)

read_schema = StructType([StructField("d", DateType()),
StructField("s", ShortType()),
StructField("b", ByteType())])
conf = copy_and_update(reader_confs,
{'spark.sql.sources.useV1SourceList': v1_enabled_list})
{'spark.sql.sources.useV1SourceList': v1_enabled_list},
ansi_conf)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema(read_schema).parquet(data_path),
conf=conf)


@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_int32_downcast_ansi_disabled(spark_tmp_path, reader_confs, v1_enabled_list):
"""
This tests whether Parquet files with columns written as INT32 can be
read as having INT8, INT16 and DATE columns, with ANSI mode disabled.
"""
run_test_parquet_int32_downcast(spark_tmp_path,
reader_confs,
v1_enabled_list,
ansi_disabled_conf)


def test_parquet_int32_downcast_ansi_enabled(spark_tmp_path):
"""
This is the flipside of test_parquet_int32_downcast_ansi_disabled.
This tests whether Parquet files with columns written as INT32 can be
read as having INT8, INT16 and DATE columns, now tested with ANSI
enabled.
A limited combination of test parameters is used to test ANSI enabled,
in the interest of brevity.
"""
run_test_parquet_int32_downcast(spark_tmp_path,
reader_confs=native_parquet_file_reader_conf,
v1_enabled_list="",
ansi_conf=ansi_disabled_conf)


@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize("types", [("byte", "short"), ("byte", "int"), ("short", "int")], ids=idfn)
Expand Down Expand Up @@ -1340,6 +1381,10 @@ def test_parquet_nested_column_missing(spark_tmp_path, reader_confs, v1_enabled_
lambda spark: spark.read.schema(read_schema).parquet(data_path),
conf=conf)

@pytest.mark.skipif(condition=is_databricks_runtime() and is_databricks_version_or_later(14,3),
reason="https://github.com/NVIDIA/spark-rapids/issues/11512")
@pytest.mark.skipif(condition=is_spark_400_or_later(),
reason="https://github.com/NVIDIA/spark-rapids/issues/11512")
def test_parquet_check_schema_compatibility(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
gen_list = [('int', int_gen), ('long', long_gen), ('dec32', decimal_gen_32bit)]
Expand Down Expand Up @@ -1431,13 +1476,16 @@ def test_parquet_read_encryption(spark_tmp_path, reader_confs, v1_enabled_list):
assert_spark_exception(
lambda: with_gpu_session(
lambda spark: spark.read.parquet(data_path).collect()),
error_message='Could not read footer for file')
error_message='Could not read footer') # Common message fragment between all Spark versions.
# Note that this isn't thrown explicitly by the plugin.

assert_spark_exception(
lambda: with_gpu_session(
lambda spark: spark.read.parquet(data_path).collect(), conf=conf),
error_message='The GPU does not support reading encrypted Parquet files')


@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
def test_parquet_read_count(spark_tmp_path):
parquet_gens = [int_gen, string_gen, double_gen]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
Expand Down