From 42712acec7b43389980a73b50ac4ffe272621cc8 Mon Sep 17 00:00:00 2001 From: Sameer Raheja Date: Wed, 7 Aug 2024 06:17:20 -0700 Subject: [PATCH 1/3] remove conditions for spark 3.1.x in tests Signed-off-by: Sameer Raheja --- integration_tests/src/main/python/array_test.py | 9 +-------- integration_tests/src/main/python/cmp_test.py | 10 ++-------- integration_tests/src/main/python/dpp_test.py | 9 ++------- integration_tests/src/main/python/spark_session.py | 9 --------- 4 files changed, 5 insertions(+), 32 deletions(-) diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 906f74567dc..eb460e4f7f6 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -530,7 +530,7 @@ def q1(spark): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313() or is_spark_330() or is_spark_330cdh(), reason="NaN equality is only handled in Spark 3.1.3+ and SPARK-39976 issue with null and ArrayIntersect in Spark 3.3.0") +@pytest.mark.skipif(is_spark_330() or is_spark_330cdh(), reason="SPARK-39976 issue with null and ArrayIntersect in Spark 3.3.0") def test_array_intersect(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -570,7 +570,6 @@ def test_array_intersect_spark330(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_intersect_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -590,7 +589,6 @@ def test_array_intersect_before_spark313(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_union(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -610,7 +608,6 @@ def test_array_union(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_union_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -630,7 +627,6 @@ def test_array_union_before_spark313(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_except(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -650,7 +646,6 @@ def test_array_except(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_except_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -670,7 +665,6 @@ def test_array_except_before_spark313(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_arrays_overlap(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -691,7 +685,6 @@ def test_arrays_overlap(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_arrays_overlap_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), diff --git a/integration_tests/src/main/python/cmp_test.py b/integration_tests/src/main/python/cmp_test.py index 1e1549f28be..391fbbef2d9 100644 --- a/integration_tests/src/main/python/cmp_test.py +++ b/integration_tests/src/main/python/cmp_test.py @@ -17,7 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from conftest import is_not_utc from data_gen import * -from spark_session import with_cpu_session, is_before_spark_313, is_before_spark_330 +from spark_session import with_cpu_session, is_before_spark_330 from pyspark.sql.types import * from marks import datagen_overrides, allow_non_gpu import pyspark.sql.functions as f @@ -335,16 +335,10 @@ def test_in(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) -# We avoid testing inset with NaN in Spark < 3.1.3 since it has issue with NaN comparisons. -# See https://github.com/NVIDIA/spark-rapids/issues/9687. -test_inset_data_gen = [gen for gen in eq_gens_with_decimal_gen if gen != float_gen if gen != double_gen] + \ - [FloatGen(no_nans=True), DoubleGen(no_nans=True)] \ - if is_before_spark_313() else eq_gens_with_decimal_gen - # Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf # This is to test entries over that value. @allow_non_gpu(*non_utc_allow) -@pytest.mark.parametrize('data_gen', test_inset_data_gen, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn) def test_in_set(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1 diff --git a/integration_tests/src/main/python/dpp_test.py b/integration_tests/src/main/python/dpp_test.py index b362a4175f3..5cb1cb2cf58 100644 --- a/integration_tests/src/main/python/dpp_test.py +++ b/integration_tests/src/main/python/dpp_test.py @@ -20,7 +20,7 @@ from conftest import spark_tmp_table_factory from data_gen import * from marks import ignore_order, allow_non_gpu, datagen_overrides, disable_ansi_mode -from spark_session import is_before_spark_320, with_cpu_session, is_before_spark_312, is_databricks_runtime, is_databricks113_or_later +from spark_session import is_before_spark_320, with_cpu_session, is_databricks_runtime, is_databricks113_or_later # non-positive values here can produce a degenerative join, so here we ensure that most values are # positive to ensure the join will produce rows. See https://github.com/NVIDIA/spark-rapids/issues/10147 @@ -174,11 +174,7 @@ def fn(spark): @datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/10147") @pytest.mark.parametrize('store_format', ['parquet', 'orc'], ids=idfn) @pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn) -@pytest.mark.parametrize('aqe_enabled', [ - 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) -], ids=idfn) +@pytest.mark.parametrize('aqe_enabled', ['false', pytest.param('true')], ids=idfn) def test_dpp_reuse_broadcast_exchange(spark_tmp_table_factory, store_format, s_index, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() create_fact_table(fact_table, store_format, length=10000) @@ -295,7 +291,6 @@ def test_dpp_skip(spark_tmp_table_factory, store_format, s_index, aqe_enabled): pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) ], ids=idfn) -@pytest.mark.skipif(is_before_spark_312(), reason="DPP over LikeAny/LikeAll filter not enabled until Spark 3.1.2") def test_dpp_like_any(spark_tmp_table_factory, store_format, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() create_fact_table(fact_table, store_format) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 26388617fff..4473767fe65 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -165,15 +165,6 @@ def with_gpu_session(func, conf={}): copy['spark.rapids.sql.test.validateExecsInGpuPlan'] = ','.join(get_validate_execs_in_gpu_plan()) return with_spark_session(func, conf=copy) -def is_before_spark_312(): - return spark_version() < "3.1.2" - -def is_before_spark_313(): - return spark_version() < "3.1.3" - -def is_before_spark_314(): - return spark_version() < "3.1.4" - def is_before_spark_320(): return spark_version() < "3.2.0" From f0824a38e1527a0c2866bd5399f95a4b79964e4a Mon Sep 17 00:00:00 2001 From: Sameer Raheja Date: Wed, 7 Aug 2024 06:17:20 -0700 Subject: [PATCH 2/3] remove conditions for spark 3.1.x in tests Signed-off-by: Sameer Raheja --- integration_tests/src/main/python/array_test.py | 9 +-------- integration_tests/src/main/python/cmp_test.py | 10 ++-------- integration_tests/src/main/python/dpp_test.py | 9 ++------- integration_tests/src/main/python/spark_session.py | 9 --------- 4 files changed, 5 insertions(+), 32 deletions(-) diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 906f74567dc..eb460e4f7f6 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -530,7 +530,7 @@ def q1(spark): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313() or is_spark_330() or is_spark_330cdh(), reason="NaN equality is only handled in Spark 3.1.3+ and SPARK-39976 issue with null and ArrayIntersect in Spark 3.3.0") +@pytest.mark.skipif(is_spark_330() or is_spark_330cdh(), reason="SPARK-39976 issue with null and ArrayIntersect in Spark 3.3.0") def test_array_intersect(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -570,7 +570,6 @@ def test_array_intersect_spark330(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_intersect_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -590,7 +589,6 @@ def test_array_intersect_before_spark313(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_union(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -610,7 +608,6 @@ def test_array_union(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_union_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -630,7 +627,6 @@ def test_array_union_before_spark313(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_except(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -650,7 +646,6 @@ def test_array_except(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_array_except_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -670,7 +665,6 @@ def test_array_except_before_spark313(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_arrays_overlap(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), @@ -691,7 +685,6 @@ def test_arrays_overlap(data_gen): @incompat @pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn) -@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+") def test_arrays_overlap_before_spark313(data_gen): gen = StructGen( [('a', ArrayGen(data_gen, nullable=True)), diff --git a/integration_tests/src/main/python/cmp_test.py b/integration_tests/src/main/python/cmp_test.py index 1e1549f28be..391fbbef2d9 100644 --- a/integration_tests/src/main/python/cmp_test.py +++ b/integration_tests/src/main/python/cmp_test.py @@ -17,7 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from conftest import is_not_utc from data_gen import * -from spark_session import with_cpu_session, is_before_spark_313, is_before_spark_330 +from spark_session import with_cpu_session, is_before_spark_330 from pyspark.sql.types import * from marks import datagen_overrides, allow_non_gpu import pyspark.sql.functions as f @@ -335,16 +335,10 @@ def test_in(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) -# We avoid testing inset with NaN in Spark < 3.1.3 since it has issue with NaN comparisons. -# See https://github.com/NVIDIA/spark-rapids/issues/9687. -test_inset_data_gen = [gen for gen in eq_gens_with_decimal_gen if gen != float_gen if gen != double_gen] + \ - [FloatGen(no_nans=True), DoubleGen(no_nans=True)] \ - if is_before_spark_313() else eq_gens_with_decimal_gen - # Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf # This is to test entries over that value. @allow_non_gpu(*non_utc_allow) -@pytest.mark.parametrize('data_gen', test_inset_data_gen, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn) def test_in_set(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1 diff --git a/integration_tests/src/main/python/dpp_test.py b/integration_tests/src/main/python/dpp_test.py index b362a4175f3..5cb1cb2cf58 100644 --- a/integration_tests/src/main/python/dpp_test.py +++ b/integration_tests/src/main/python/dpp_test.py @@ -20,7 +20,7 @@ from conftest import spark_tmp_table_factory from data_gen import * from marks import ignore_order, allow_non_gpu, datagen_overrides, disable_ansi_mode -from spark_session import is_before_spark_320, with_cpu_session, is_before_spark_312, is_databricks_runtime, is_databricks113_or_later +from spark_session import is_before_spark_320, with_cpu_session, is_databricks_runtime, is_databricks113_or_later # non-positive values here can produce a degenerative join, so here we ensure that most values are # positive to ensure the join will produce rows. See https://github.com/NVIDIA/spark-rapids/issues/10147 @@ -174,11 +174,7 @@ def fn(spark): @datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/10147") @pytest.mark.parametrize('store_format', ['parquet', 'orc'], ids=idfn) @pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn) -@pytest.mark.parametrize('aqe_enabled', [ - 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) -], ids=idfn) +@pytest.mark.parametrize('aqe_enabled', ['false', pytest.param('true')], ids=idfn) def test_dpp_reuse_broadcast_exchange(spark_tmp_table_factory, store_format, s_index, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() create_fact_table(fact_table, store_format, length=10000) @@ -295,7 +291,6 @@ def test_dpp_skip(spark_tmp_table_factory, store_format, s_index, aqe_enabled): pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) ], ids=idfn) -@pytest.mark.skipif(is_before_spark_312(), reason="DPP over LikeAny/LikeAll filter not enabled until Spark 3.1.2") def test_dpp_like_any(spark_tmp_table_factory, store_format, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() create_fact_table(fact_table, store_format) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 26388617fff..4473767fe65 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -165,15 +165,6 @@ def with_gpu_session(func, conf={}): copy['spark.rapids.sql.test.validateExecsInGpuPlan'] = ','.join(get_validate_execs_in_gpu_plan()) return with_spark_session(func, conf=copy) -def is_before_spark_312(): - return spark_version() < "3.1.2" - -def is_before_spark_313(): - return spark_version() < "3.1.3" - -def is_before_spark_314(): - return spark_version() < "3.1.4" - def is_before_spark_320(): return spark_version() < "3.2.0" From ca2a12e578cf949e921f8edec45d56ecda7b8abc Mon Sep 17 00:00:00 2001 From: Sameer Raheja Date: Fri, 9 Aug 2024 20:49:19 -0700 Subject: [PATCH 3/3] Remove is_before_spark_320 Signed-off-by: Sameer Raheja --- .../src/main/python/arithmetic_ops_test.py | 12 +---- .../src/main/python/cast_test.py | 17 +------ .../src/main/python/conditionals_test.py | 3 +- .../src/main/python/delta_lake_delete_test.py | 7 +-- .../src/main/python/delta_lake_merge_test.py | 11 +---- .../src/main/python/delta_lake_update_test.py | 9 +--- .../src/main/python/delta_lake_write_test.py | 47 ++----------------- integration_tests/src/main/python/dpp_test.py | 25 +++++----- .../src/main/python/grouping_sets_test.py | 3 -- .../src/main/python/hashing_test.py | 10 ++-- .../main/python/hive_parquet_write_test.py | 7 +-- .../src/main/python/iceberg_test.py | 13 ++--- .../src/main/python/json_test.py | 11 ++--- integration_tests/src/main/python/map_test.py | 3 +- integration_tests/src/main/python/orc_test.py | 3 +- .../src/main/python/orc_write_test.py | 6 +-- .../src/main/python/parquet_test.py | 12 ++--- .../src/main/python/parquet_write_test.py | 7 +-- .../python/prune_partition_column_test.py | 5 +- .../src/main/python/regexp_test.py | 4 +- .../src/main/python/repart_test.py | 2 +- .../src/main/python/spark_session.py | 3 -- .../src/main/python/string_test.py | 9 ++-- .../src/main/python/window_function_test.py | 5 +- 24 files changed, 52 insertions(+), 182 deletions(-) diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index d7fd941b97b..9c917b1b48a 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -371,9 +371,7 @@ def test_mod_pmod_long_min_value(): 'cast(-12 as {}) % cast(0 as {})'], ids=idfn) def test_mod_pmod_by_zero(data_gen, overflow_exp): string_type = to_cast_string(data_gen.data_type) - if is_before_spark_320(): - exception_str = 'java.lang.ArithmeticException: divide by zero' - elif is_before_spark_330(): + if is_before_spark_330(): exception_str = 'SparkArithmeticException: divide by zero' elif is_before_spark_340() and not is_databricks113_or_later(): exception_str = 'SparkArithmeticException: Division by zero' @@ -571,7 +569,6 @@ def test_abs_ansi_no_overflow_decimal128(data_gen): # Only run this test for Spark v3.2.0 and later to verify abs will # throw exceptions for overflow when ANSI mode is enabled. -@pytest.mark.skipif(is_before_spark_320(), reason='SPARK-33275') @pytest.mark.parametrize('data_type,value', [ (LongType(), LONG_MIN), (IntegerType(), INT_MIN), @@ -1049,9 +1046,7 @@ def _test_div_by_zero(ansi_mode, expr, is_lit=False): ansi_conf = {'spark.sql.ansi.enabled': ansi_mode == 'ansi'} data_gen = lambda spark: two_col_df(spark, IntegerGen(), IntegerGen(min_val=0, max_val=0), length=1) div_by_zero_func = lambda spark: data_gen(spark).selectExpr(expr) - if is_before_spark_320(): - err_message = 'java.lang.ArithmeticException: divide by zero' - elif is_before_spark_330(): + if is_before_spark_330(): err_message = 'SparkArithmeticException: divide by zero' elif is_before_spark_340() and not is_databricks113_or_later(): err_message = 'SparkArithmeticException: Division by zero' @@ -1105,7 +1100,6 @@ def _div_overflow_exception_when(expr, ansi_enabled, is_lit=False): # Only run this test for Spark v3.2.0 and later to verify IntegralDivide will # throw exceptions for overflow when ANSI mode is enabled. -@pytest.mark.skipif(is_before_spark_320(), reason='https://github.com/apache/spark/pull/32260') @pytest.mark.parametrize('expr', ['a DIV CAST(-1 AS INT)', 'a DIV b']) @pytest.mark.parametrize('ansi_enabled', [False, True]) def test_div_overflow_exception_when_ansi(expr, ansi_enabled): @@ -1115,7 +1109,6 @@ def test_div_overflow_exception_when_ansi(expr, ansi_enabled): # throw exceptions for overflow when ANSI mode is enabled. # We have split this test from test_div_overflow_exception_when_ansi because Spark 3.4 # throws a different exception for literals -@pytest.mark.skipif(is_before_spark_320(), reason='https://github.com/apache/spark/pull/32260') @pytest.mark.parametrize('expr', ['CAST(-9223372036854775808L as LONG) DIV -1']) @pytest.mark.parametrize('ansi_enabled', [False, True]) def test_div_overflow_exception_when_ansi_literal(expr, ansi_enabled): @@ -1123,7 +1116,6 @@ def test_div_overflow_exception_when_ansi_literal(expr, ansi_enabled): # Only run this test before Spark v3.2.0 to verify IntegralDivide will NOT # throw exceptions for overflow even ANSI mode is enabled. -@pytest.mark.skipif(not is_before_spark_320(), reason='https://github.com/apache/spark/pull/32260') @pytest.mark.parametrize('expr', ['CAST(-9223372036854775808L as LONG) DIV -1', 'a DIV CAST(-1 AS INT)', 'a DIV b']) @pytest.mark.parametrize('ansi_enabled', ['false', 'true']) def test_div_overflow_no_exception_when_ansi(expr, ansi_enabled): diff --git a/integration_tests/src/main/python/cast_test.py b/integration_tests/src/main/python/cast_test.py index f7784178182..344117f92dd 100644 --- a/integration_tests/src/main/python/cast_test.py +++ b/integration_tests/src/main/python/cast_test.py @@ -86,19 +86,7 @@ def test_cast_string_date_valid_format(): ] values_string_to_data = invalid_values_string_to_date + valid_values_string_to_date -# Spark 320+ and databricks support Ansi mode when casting string to date -# This means an exception will be thrown when casting invalid string to date on Spark 320+ or databricks -# test Spark versions < 3.2.0 and non databricks, ANSI mode -@pytest.mark.skipif(not is_before_spark_320(), reason="ansi cast(string as date) throws exception only in 3.2.0+ or db") -def test_cast_string_date_invalid_ansi_before_320(): - data_rows = [(v,) for v in values_string_to_data] - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.createDataFrame(data_rows, "a string").select(f.col('a').cast(DateType())), - conf={'spark.rapids.sql.hasExtendedYearValues': 'false', - 'spark.sql.ansi.enabled': 'true'}, ) - # test Spark versions >= 320 and databricks, ANSI mode, valid values -@pytest.mark.skipif(is_before_spark_320(), reason="Spark versions(< 320) not support Ansi mode when casting string to date") def test_cast_string_date_valid_ansi(): data_rows = [(v,) for v in valid_values_string_to_date] assert_gpu_and_cpu_are_equal_collect( @@ -107,7 +95,6 @@ def test_cast_string_date_valid_ansi(): 'spark.sql.ansi.enabled': 'true'}) # test Spark versions >= 320, ANSI mode -@pytest.mark.skipif(is_before_spark_320(), reason="ansi cast(string as date) throws exception only in 3.2.0+") @pytest.mark.parametrize('invalid', invalid_values_string_to_date) def test_cast_string_date_invalid_ansi(invalid): assert_gpu_and_cpu_error( @@ -118,7 +105,7 @@ def test_cast_string_date_invalid_ansi(invalid): # test try_cast in Spark versions >= 320 and < 340 -@pytest.mark.skipif(is_before_spark_320() or is_spark_340_or_later() or is_databricks113_or_later(), reason="try_cast only in Spark 3.2+") +@pytest.mark.skipif(is_before_spark_340() or is_databricks113_or_later(), reason="try_cast only in Spark 3.2+") @allow_non_gpu('ProjectExec', 'TryCast') @pytest.mark.parametrize('invalid', invalid_values_string_to_date) def test_try_cast_fallback(invalid): @@ -162,7 +149,6 @@ def test_cast_string_ts_valid_format(data_gen): 'spark.rapids.sql.castStringToTimestamp.enabled': 'true'}) @allow_non_gpu('ProjectExec', 'Cast', 'Alias') -@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0+ do we have issues with extended years") def test_cast_string_date_fallback(): assert_gpu_fallback_collect( # Cast back to String because this goes beyond what python can support for years @@ -170,7 +156,6 @@ def test_cast_string_date_fallback(): 'Cast') @allow_non_gpu('ProjectExec', 'Cast', 'Alias') -@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0+ do we have issues with extended years") def test_cast_string_timestamp_fallback(): assert_gpu_fallback_collect( # Cast back to String because this goes beyond what python can support for years diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index b95ed53f398..6af69eb7b87 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -16,7 +16,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql from data_gen import * -from spark_session import is_before_spark_320, is_jvm_charset_utf8 +from spark_session import is_jvm_charset_utf8 from pyspark.sql.types import * from marks import datagen_overrides, allow_non_gpu import pyspark.sql.functions as f @@ -242,7 +242,6 @@ def test_conditional_with_side_effects_sequence(data_gen): ELSE null END'), conf = ansi_enabled_conf) -@pytest.mark.skipif(is_before_spark_320(), reason='Earlier versions of Spark cannot cast sequence to string') @pytest.mark.parametrize('data_gen', [mk_str_gen('[a-z]{0,3}')], ids=idfn) @allow_non_gpu(*non_utc_allow) def test_conditional_with_side_effects_sequence_cast(data_gen): diff --git a/integration_tests/src/main/python/delta_lake_delete_test.py b/integration_tests/src/main/python/delta_lake_delete_test.py index e45ab926de3..9c33b46fe54 100644 --- a/integration_tests/src/main/python/delta_lake_delete_test.py +++ b/integration_tests/src/main/python/delta_lake_delete_test.py @@ -18,7 +18,7 @@ from data_gen import * from delta_lake_utils import * from marks import * -from spark_session import is_before_spark_320, is_databricks_runtime, supports_delta_lake_deletion_vectors, \ +from spark_session import is_databricks_runtime, supports_delta_lake_deletion_vectors, \ with_cpu_session, with_gpu_session delta_delete_enabled_conf = copy_and_update(delta_writes_enabled_conf, @@ -72,7 +72,6 @@ def checker(data_path, do_delete): {"spark.rapids.sql.command.DeleteCommand": "false"}, delta_writes_enabled_conf # Test disabled by default ], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_delete_disabled_fallback(spark_tmp_path, disable_conf): data_path = spark_tmp_path + "/DELTA_DATA" def setup_tables(spark): @@ -113,7 +112,6 @@ def write_func(spark, path): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_delete_entire_table(spark_tmp_path, use_cdf, partition_columns): def generate_dest_data(spark): return three_col_df(spark, @@ -134,7 +132,6 @@ def generate_dest_data(spark): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [["a"], ["a", "b"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_delete_partitions(spark_tmp_path, use_cdf, partition_columns): def generate_dest_data(spark): return three_col_df(spark, @@ -155,7 +152,6 @@ def generate_dest_data(spark): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884') def test_delta_delete_rows(spark_tmp_path, use_cdf, partition_columns): # Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice @@ -174,7 +170,6 @@ def generate_dest_data(spark): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884') def test_delta_delete_dataframe_api(spark_tmp_path, use_cdf, partition_columns): from delta.tables import DeltaTable diff --git a/integration_tests/src/main/python/delta_lake_merge_test.py b/integration_tests/src/main/python/delta_lake_merge_test.py index 5c3bb915ddb..c012a9d3200 100644 --- a/integration_tests/src/main/python/delta_lake_merge_test.py +++ b/integration_tests/src/main/python/delta_lake_merge_test.py @@ -18,7 +18,7 @@ from delta_lake_merge_common import * from marks import * from pyspark.sql.types import * -from spark_session import is_before_spark_320, is_databricks_runtime, spark_version +from spark_session import is_databricks_runtime, spark_version delta_merge_enabled_conf = copy_and_update(delta_writes_enabled_conf, @@ -36,7 +36,6 @@ {"spark.rapids.sql.command.MergeIntoCommand": "false"}, delta_writes_enabled_conf # Test disabled by default ], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_merge_disabled_fallback(spark_tmp_path, spark_tmp_table_factory, disable_conf): def checker(data_path, do_merge): assert_gpu_fallback_write(do_merge, read_delta_path, data_path, @@ -77,7 +76,6 @@ def checker(data_path, do_merge): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [None, ["a"], ["b"], ["a", "b"]], ids=idfn) @pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn) @@ -103,7 +101,6 @@ def test_delta_merge_partial_fallback_via_conf(spark_tmp_path, spark_tmp_table_f @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("table_ranges", [(range(20), range(10)), # partial insert of source (range(5), range(5)), # no-op insert (range(10), range(20, 30)) # full insert of source @@ -120,7 +117,6 @@ def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_facto @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("table_ranges", [(range(10), range(20)), # partial delete of target (range(5), range(5)), # full delete of target (range(10), range(20, 30)) # no-op delete @@ -137,7 +133,6 @@ def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory, @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn) def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices): @@ -148,7 +143,6 @@ def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, us @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("merge_sql", [ "MERGE INTO {dest_table} d USING {src_table} s ON d.a == s.a" \ @@ -171,7 +165,6 @@ def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_facto @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn) def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices): @@ -183,7 +176,6 @@ def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spa @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf): do_test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf, @@ -192,7 +184,6 @@ def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_fac @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.xfail(not is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/7573") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn) diff --git a/integration_tests/src/main/python/delta_lake_update_test.py b/integration_tests/src/main/python/delta_lake_update_test.py index 5954e0c6217..f4248d0805a 100644 --- a/integration_tests/src/main/python/delta_lake_update_test.py +++ b/integration_tests/src/main/python/delta_lake_update_test.py @@ -18,8 +18,8 @@ from data_gen import * from delta_lake_utils import * from marks import * -from spark_session import is_before_spark_320, is_databricks_runtime, \ - supports_delta_lake_deletion_vectors, with_cpu_session, with_gpu_session +from spark_session import is_databricks_runtime, supports_delta_lake_deletion_vectors, \ + with_cpu_session, with_gpu_session delta_update_enabled_conf = copy_and_update(delta_writes_enabled_conf, {"spark.rapids.sql.command.UpdateCommand": "true", @@ -71,7 +71,6 @@ def checker(data_path, do_update): {"spark.rapids.sql.command.UpdateCommand": "false"}, delta_writes_enabled_conf # Test disabled by default ], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_update_disabled_fallback(spark_tmp_path, disable_conf): data_path = spark_tmp_path + "/DELTA_DATA" def setup_tables(spark): @@ -90,7 +89,6 @@ def write_func(spark, path): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_update_entire_table(spark_tmp_path, use_cdf, partition_columns): def generate_dest_data(spark): return three_col_df(spark, @@ -106,7 +104,6 @@ def generate_dest_data(spark): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [["a"], ["a", "b"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_update_partitions(spark_tmp_path, use_cdf, partition_columns): def generate_dest_data(spark): return three_col_df(spark, @@ -122,7 +119,6 @@ def generate_dest_data(spark): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884') def test_delta_update_rows(spark_tmp_path, use_cdf, partition_columns): # Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice @@ -161,7 +157,6 @@ def generate_dest_data(spark): @ignore_order @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) @pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/10025') def test_delta_update_dataframe_api(spark_tmp_path, use_cdf, partition_columns): from delta.tables import DeltaTable diff --git a/integration_tests/src/main/python/delta_lake_write_test.py b/integration_tests/src/main/python/delta_lake_write_test.py index c2292fe7c15..0fc61000cc0 100644 --- a/integration_tests/src/main/python/delta_lake_write_test.py +++ b/integration_tests/src/main/python/delta_lake_write_test.py @@ -23,7 +23,7 @@ from marks import * from parquet_write_test import parquet_write_gens_list, writer_confs from pyspark.sql.types import * -from spark_session import is_before_spark_320, is_before_spark_330, is_spark_340_or_later, with_cpu_session +from spark_session import is_before_spark_330, is_spark_340_or_later, with_cpu_session delta_write_gens = [x for sublist in parquet_write_gens_list for x in sublist] @@ -76,7 +76,6 @@ def do_sql(spark, q): spark.sql(q) [{"spark.rapids.sql.format.delta.write.enabled": "false"}, {"spark.rapids.sql.format.parquet.enabled": "false"}, {"spark.rapids.sql.format.parquet.write.enabled": "false"}], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_disabled_fallback(spark_tmp_path, disable_conf): data_path = spark_tmp_path + "/DELTA_DATA" assert_gpu_fallback_write( @@ -89,7 +88,6 @@ def test_delta_write_disabled_fallback(spark_tmp_path, disable_conf): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_round_trip_unmanaged(spark_tmp_path): gen_list = [("c" + str(i), gen) for i, gen in enumerate(delta_write_gens)] data_path = spark_tmp_path + "/DELTA_DATA" @@ -104,7 +102,6 @@ def test_delta_write_round_trip_unmanaged(spark_tmp_path): @delta_lake @ignore_order @pytest.mark.parametrize("gens", delta_part_write_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_part_write_round_trip_unmanaged(spark_tmp_path, gens): gen_list = [("a", RepeatSeqGen(gens, 10)), ("b", gens)] data_path = spark_tmp_path + "/DELTA_DATA" @@ -122,7 +119,6 @@ def test_delta_part_write_round_trip_unmanaged(spark_tmp_path, gens): @delta_lake @ignore_order @pytest.mark.parametrize("gens", delta_part_write_gens, ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_multi_part_write_round_trip_unmanaged(spark_tmp_path, gens): gen_list = [("a", RepeatSeqGen(gens, 10)), ("b", gens), ("c", SetValuesGen(StringType(), ["x", "y", "z"]))] data_path = spark_tmp_path + "/DELTA_DATA" @@ -159,14 +155,12 @@ def do_update_round_trip_managed(spark_tmp_path, mode): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_overwrite_round_trip_unmanaged(spark_tmp_path): do_update_round_trip_managed(spark_tmp_path, "overwrite") @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_append_round_trip_unmanaged(spark_tmp_path): do_update_round_trip_managed(spark_tmp_path, "append") @@ -191,21 +185,18 @@ def do_write(spark, path): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_atomic_create_table_as_select(spark_tmp_table_factory, spark_tmp_path): _atomic_write_table_as_select(delta_write_gens, spark_tmp_table_factory, spark_tmp_path, overwrite=False) @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_atomic_replace_table_as_select(spark_tmp_table_factory, spark_tmp_path): _atomic_write_table_as_select(delta_write_gens, spark_tmp_table_factory, spark_tmp_path, overwrite=True) @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) def test_delta_append_data_exec_v1(spark_tmp_path, use_cdf): gen_list = [("c" + str(i), gen) for i, gen in enumerate(delta_write_gens)] @@ -225,7 +216,6 @@ def setup_tables(spark): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("use_cdf", [True, False], ids=idfn) def test_delta_overwrite_by_expression_exec_v1(spark_tmp_table_factory, spark_tmp_path, use_cdf): gen_list = [("c" + str(i), gen) for i, gen in enumerate(delta_write_gens)] @@ -252,7 +242,6 @@ def overwrite_table(spark, path): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_overwrite_dynamic_by_name(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" schema = "id bigint, data string, data2 string" @@ -293,7 +282,6 @@ def setup_tables(spark): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("mode", [ "STATIC", pytest.param("DYNAMIC", marks=pytest.mark.xfail(is_databricks_runtime(), @@ -316,7 +304,6 @@ def setup(spark): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.parametrize("mode", [ "STATIC", pytest.param("DYNAMIC", marks=pytest.mark.xfail(is_databricks_runtime(), @@ -341,7 +328,6 @@ def setup(spark): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(is_databricks_runtime() and is_before_spark_330(), reason="Databricks 10.4 does not properly handle options passed during DataFrame API write") def test_delta_write_round_trip_cdf_write_opt(spark_tmp_path): @@ -376,7 +362,6 @@ def test_delta_write_round_trip_cdf_write_opt(spark_tmp_path): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_round_trip_cdf_table_prop(spark_tmp_path): gen_list = [("ints", int_gen)] data_path = spark_tmp_path + "/DELTA_DATA" @@ -416,7 +401,6 @@ def setup_tables(spark): @delta_lake @ignore_order @pytest.mark.parametrize("ts_write", ["INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_legacy_timestamp(spark_tmp_path, ts_write): gen = TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(2000, 1, 1, tzinfo=timezone.utc)).with_special_case( @@ -439,7 +423,6 @@ def test_delta_write_legacy_timestamp(spark_tmp_path, ts_write): @pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"}, {"parquet.encryption.column.keys": "k2:a"}, {"parquet.encryption.footer.key": "k1", "parquet.encryption.column.keys": "k2:a"}]) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_encryption_option_fallback(spark_tmp_path, write_options): def write_func(spark, path): writer = unary_op_df(spark, int_gen).coalesce(1).write.format("delta") @@ -460,7 +443,6 @@ def write_func(spark, path): @pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"}, {"parquet.encryption.column.keys": "k2:a"}, {"parquet.encryption.footer.key": "k1", "parquet.encryption.column.keys": "k2:a"}]) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_encryption_runtimeconfig_fallback(spark_tmp_path, write_options): data_path = spark_tmp_path + "/DELTA_DATA" assert_gpu_fallback_write( @@ -476,7 +458,6 @@ def test_delta_write_encryption_runtimeconfig_fallback(spark_tmp_path, write_opt @pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"}, {"parquet.encryption.column.keys": "k2:a"}, {"parquet.encryption.footer.key": "k1", "parquet.encryption.column.keys": "k2:a"}]) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_encryption_hadoopconfig_fallback(spark_tmp_path, write_options): data_path = spark_tmp_path + "/DELTA_DATA" def setup_hadoop_confs(spark): @@ -500,7 +481,6 @@ def reset_hadoop_confs(spark): @delta_lake @ignore_order @pytest.mark.parametrize('codec', ['gzip']) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_compression_fallback(spark_tmp_path, codec): data_path = spark_tmp_path + "/DELTA_DATA" confs=copy_and_update(delta_writes_enabled_conf, {"spark.sql.parquet.compression.codec": codec}) @@ -514,7 +494,6 @@ def test_delta_write_compression_fallback(spark_tmp_path, codec): @allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_legacy_format_fallback(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" confs=copy_and_update(delta_writes_enabled_conf, {"spark.sql.parquet.writeLegacyFormat": "true"}) @@ -527,7 +506,6 @@ def test_delta_write_legacy_format_fallback(spark_tmp_path): @allow_non_gpu(*delta_meta_allow) @delta_lake -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_append_only(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" gen = int_gen @@ -545,7 +523,6 @@ def test_delta_write_append_only(spark_tmp_path): @allow_non_gpu(*delta_meta_allow) @delta_lake -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_constraint_not_null(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" not_null_gen = StringGen(nullable=False) @@ -570,7 +547,6 @@ def setup_table(spark): @allow_non_gpu(*delta_meta_allow) @delta_lake -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_constraint_check(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" @@ -600,7 +576,6 @@ def gen_bad_data(spark): @allow_non_gpu(*delta_meta_allow) @delta_lake -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_constraint_check_fallback(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" # create table with check constraint @@ -629,7 +604,6 @@ def gen_bad_data(spark): @delta_lake @ignore_order @pytest.mark.parametrize("num_cols", [-1, 0, 1, 2, 3 ], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_stat_column_limits(num_cols, spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" confs = copy_and_update(delta_writes_enabled_conf, {"spark.databricks.io.skipping.stringPrefixLength": 8}) @@ -653,7 +627,6 @@ def test_delta_write_stat_column_limits(num_cols, spark_tmp_path): @allow_non_gpu("CreateTableExec", *delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_generated_columns(spark_tmp_table_factory, spark_tmp_path): from delta.tables import DeltaTable def write_data(spark, path): @@ -679,8 +652,7 @@ def write_data(spark, path): @allow_non_gpu("CreateTableExec", *delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320() or not is_databricks_runtime(), - reason="Delta Lake identity columns are currently only supported on Databricks") +@pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake identity columns are currently only supported on Databricks") def test_delta_write_identity_columns(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" def create_data(spark, path): @@ -705,8 +677,7 @@ def append_data(spark, path): @allow_non_gpu("CreateTableExec", *delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320() or not is_databricks_runtime(), - reason="Delta Lake identity columns are currently only supported on Databricks") +@pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake identity columns are currently only supported on Databricks") def test_delta_write_multiple_identity_columns(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" def create_data(spark, path): @@ -738,7 +709,6 @@ def append_data(spark, path): @delta_lake @ignore_order @pytest.mark.parametrize("confkey", ["optimizeWrite"], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(is_databricks_runtime(), reason="Optimized write is supported on Databricks") def test_delta_write_auto_optimize_write_opts_fallback(confkey, spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" @@ -757,7 +727,6 @@ def test_delta_write_auto_optimize_write_opts_fallback(confkey, spark_tmp_path): is_databricks_runtime(), reason="Optimize write is supported on Databricks")), pytest.param("delta.autoOptimize.optimizeWrite", marks=pytest.mark.skipif( is_databricks_runtime(), reason="Optimize write is supported on Databricks"))], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Auto optimize only supported on Databricks") def test_delta_write_auto_optimize_table_props_fallback(confkey, spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" @@ -780,7 +749,6 @@ def setup_tables(spark): is_databricks_runtime(), reason="Optimize write is supported on Databricks")), pytest.param("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", marks=pytest.mark.skipif( is_databricks_runtime(), reason="Optimize write is supported on Databricks"))], ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_auto_optimize_sql_conf_fallback(confkey, spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" confs=copy_and_update(delta_writes_enabled_conf, {confkey: "true"}) @@ -794,7 +762,6 @@ def test_delta_write_auto_optimize_sql_conf_fallback(confkey, spark_tmp_path): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_aqe_join(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" confs=copy_and_update(delta_writes_enabled_conf, {"spark.sql.adaptive.enabled": "true"}) @@ -811,7 +778,6 @@ def do_join(spark, path): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks") @pytest.mark.parametrize("enable_conf_key", [ "spark.databricks.delta.optimizeWrite.enabled", @@ -842,7 +808,6 @@ def do_write(data_path, is_optimize_write): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks") def test_delta_write_optimized_supported_types(spark_tmp_path): num_chunks = 20 @@ -869,7 +834,6 @@ def test_delta_write_optimized_supported_types(spark_tmp_path): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks") def test_delta_write_optimized_supported_types_partitioned(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" @@ -889,7 +853,6 @@ def test_delta_write_optimized_supported_types_partitioned(spark_tmp_path): @allow_non_gpu(delta_optimized_write_fallback_allow, *delta_meta_allow) @delta_lake @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks") @pytest.mark.parametrize("gen", [ simple_string_to_string_map_gen, @@ -911,7 +874,6 @@ def test_delta_write_optimized_unsupported_sort_fallback(spark_tmp_path, gen): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks") def test_delta_write_optimized_table_confs(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" @@ -952,7 +914,6 @@ def do_prop_update(spark): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks") def test_delta_write_optimized_partitioned(spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" @@ -983,7 +944,6 @@ def do_write(confs): @allow_non_gpu(*delta_meta_allow) @delta_lake @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_partial_overwrite_replace_where(spark_tmp_path): gen_list = [("a", int_gen), ("b", SetValuesGen(StringType(), ["x", "y", "z"])), @@ -1028,7 +988,6 @@ def test_delta_write_partial_overwrite_replace_where(spark_tmp_path): @delta_lake @ignore_order @pytest.mark.parametrize("mapping", column_mappings) -@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_column_name_mapping(spark_tmp_path, mapping): gen_list = [("a", int_gen), ("b", SetValuesGen(StringType(), ["x", "y", "z"])), diff --git a/integration_tests/src/main/python/dpp_test.py b/integration_tests/src/main/python/dpp_test.py index 5cb1cb2cf58..d8bed5ba030 100644 --- a/integration_tests/src/main/python/dpp_test.py +++ b/integration_tests/src/main/python/dpp_test.py @@ -20,7 +20,7 @@ from conftest import spark_tmp_table_factory from data_gen import * from marks import ignore_order, allow_non_gpu, datagen_overrides, disable_ansi_mode -from spark_session import is_before_spark_320, with_cpu_session, is_databricks_runtime, is_databricks113_or_later +from spark_session import with_cpu_session, is_databricks_runtime, is_databricks113_or_later # non-positive values here can produce a degenerative join, so here we ensure that most values are # positive to ensure the join will produce rows. See https://github.com/NVIDIA/spark-rapids/issues/10147 @@ -220,8 +220,7 @@ def test_dpp_reuse_broadcast_exchange_cpu_scan(spark_tmp_table_factory): @pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn) @pytest.mark.parametrize('aqe_enabled', [ 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) + pytest.param('true', marks=pytest.mark.skipif(not is_databricks_runtime(), reason='AQE+DPP not supported on Databricks')) ], ids=idfn) def test_dpp_bypass(spark_tmp_table_factory, store_format, s_index, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() @@ -245,8 +244,8 @@ def test_dpp_bypass(spark_tmp_table_factory, store_format, s_index, aqe_enabled) @pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn) @pytest.mark.parametrize('aqe_enabled', [ 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) + pytest.param('true', marks=pytest.mark.skipif(not is_databricks_runtime(), + reason='AQE+DPP not supported on Databricks')) ], ids=idfn) def test_dpp_via_aggregate_subquery(spark_tmp_table_factory, store_format, s_index, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() @@ -267,8 +266,8 @@ def test_dpp_via_aggregate_subquery(spark_tmp_table_factory, store_format, s_ind @pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn) @pytest.mark.parametrize('aqe_enabled', [ 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) + pytest.param('true', marks=pytest.mark.skipif(not is_databricks_runtime(), + reason='AQE+DPP not supported on Databricks')) ], ids=idfn) def test_dpp_skip(spark_tmp_table_factory, store_format, s_index, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() @@ -288,8 +287,8 @@ def test_dpp_skip(spark_tmp_table_factory, store_format, s_index, aqe_enabled): @pytest.mark.parametrize('store_format', ['parquet', 'orc'], ids=idfn) @pytest.mark.parametrize('aqe_enabled', [ 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) + pytest.param('true', marks=pytest.mark.skipif(not is_databricks_runtime(), + reason='AQE+DPP not supported on Databricks')) ], ids=idfn) def test_dpp_like_any(spark_tmp_table_factory, store_format, aqe_enabled): fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get() @@ -325,8 +324,8 @@ def create_dim_table_for_like(spark): # Test handling DPP expressions from a HashedRelation that rearranges columns @pytest.mark.parametrize('aqe_enabled', [ 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) + pytest.param('true', marks=pytest.mark.skipif(not is_databricks_runtime(), + reason='AQE+DPP not supported on Databricks')) ], ids=idfn) def test_dpp_from_swizzled_hash_keys(spark_tmp_table_factory, aqe_enabled): dim_table = spark_tmp_table_factory.get() @@ -357,8 +356,8 @@ def setup_tables(spark): # Test handling DPP subquery that could broadcast EmptyRelation rather than a GPU serialized batch @pytest.mark.parametrize('aqe_enabled', [ 'false', - pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(), - reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled')) + pytest.param('true', marks=pytest.mark.skipif(not is_databricks_runtime(), + reason='AQE+DPP not supported on Databricks')) ], ids=idfn) def test_dpp_empty_relation(spark_tmp_table_factory, aqe_enabled): dim_table = spark_tmp_table_factory.get() diff --git a/integration_tests/src/main/python/grouping_sets_test.py b/integration_tests/src/main/python/grouping_sets_test.py index 1821efa5c7a..48ca15dfff2 100644 --- a/integration_tests/src/main/python/grouping_sets_test.py +++ b/integration_tests/src/main/python/grouping_sets_test.py @@ -14,7 +14,6 @@ import pytest -from spark_session import is_before_spark_320 from asserts import assert_gpu_and_cpu_are_equal_sql from data_gen import * from pyspark.sql.types import * @@ -47,8 +46,6 @@ @ignore_order @pytest.mark.parametrize('data_gen', [_grouping_set_gen], ids=idfn) @pytest.mark.parametrize('sql', _grouping_set_sqls, ids=idfn) -@pytest.mark.skipif(is_before_spark_320(), - reason='Nested grouping sets is not supported before spark 3.2.0') def test_nested_grouping_sets_rollup_cube(data_gen, sql): assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, data_gen, length=2048), diff --git a/integration_tests/src/main/python/hashing_test.py b/integration_tests/src/main/python/hashing_test.py index 6bd56da933d..f099a45975f 100644 --- a/integration_tests/src/main/python/hashing_test.py +++ b/integration_tests/src/main/python/hashing_test.py @@ -17,9 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect from data_gen import * from marks import allow_non_gpu, ignore_order -from spark_session import is_before_spark_320 -# Spark 3.1.x does not normalize -0.0 and 0.0 but GPU version does _xxhash_gens = [ null_gen, boolean_gen, @@ -31,9 +29,9 @@ timestamp_gen, decimal_gen_32bit, decimal_gen_64bit, - decimal_gen_128bit] -if not is_before_spark_320(): - _xxhash_gens += [float_gen, double_gen] + decimal_gen_128bit, + float_gen, + double_gen] _struct_of_xxhash_gens = StructGen([(f"c{i}", g) for i, g in enumerate(_xxhash_gens)]) @@ -41,8 +39,6 @@ all_basic_struct_gen, struct_array_gen, _struct_of_xxhash_gens] -if is_before_spark_320(): - _xxhash_fallback_gens += [float_gen, double_gen] @ignore_order(local=True) @pytest.mark.parametrize("gen", _xxhash_gens, ids=idfn) diff --git a/integration_tests/src/main/python/hive_parquet_write_test.py b/integration_tests/src/main/python/hive_parquet_write_test.py index e66b889a986..2388a490c5d 100644 --- a/integration_tests/src/main/python/hive_parquet_write_test.py +++ b/integration_tests/src/main/python/hive_parquet_write_test.py @@ -19,7 +19,7 @@ from data_gen import * from hive_write_test import _restricted_timestamp from marks import allow_non_gpu, ignore_order -from spark_session import with_cpu_session, with_gpu_session, is_before_spark_320, is_spark_350_or_later, is_before_spark_330, is_spark_330_or_later, is_databricks122_or_later +from spark_session import with_cpu_session, with_gpu_session, is_spark_350_or_later, is_before_spark_330, is_spark_330_or_later, is_databricks122_or_later # Disable the meta conversion from Hive write to FrameData write in Spark, to test # "GpuInsertIntoHiveTable" for Parquet write. @@ -154,12 +154,9 @@ def partitioned_write_to_hive_sql(spark, output_table): all_confs) -zstd_param = pytest.param('ZSTD', - marks=pytest.mark.skipif(is_before_spark_320(), reason="zstd is not supported before 320")) - @allow_non_gpu(*(non_utc_allow)) @ignore_order(local=True) -@pytest.mark.parametrize("comp_type", ['UNCOMPRESSED', 'SNAPPY', zstd_param]) +@pytest.mark.parametrize("comp_type", ['UNCOMPRESSED', 'SNAPPY', 'ZSTD']) def test_write_compressed_parquet_into_hive_table(spark_tmp_table_factory, comp_type): # Generate hive table in Parquet format def gen_table(spark): diff --git a/integration_tests/src/main/python/iceberg_test.py b/integration_tests/src/main/python/iceberg_test.py index 3b3f83c6deb..1b2ea7a9355 100644 --- a/integration_tests/src/main/python/iceberg_test.py +++ b/integration_tests/src/main/python/iceberg_test.py @@ -17,7 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_row_counts_equal, assert_gpu_fallback_collect, assert_spark_exception from data_gen import * from marks import allow_non_gpu, iceberg, ignore_order -from spark_session import is_before_spark_320, is_databricks_runtime, with_cpu_session, with_gpu_session +from spark_session import is_databricks_runtime, with_cpu_session, with_gpu_session iceberg_map_gens = [MapGen(f(nullable=False), f()) for f in [ BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, TimestampGen ]] + \ @@ -54,8 +54,8 @@ def setup_iceberg_table(spark): @iceberg @ignore_order(local=True) -@pytest.mark.skipif(is_before_spark_320() or is_databricks_runtime(), - reason="AQE+DPP not supported until Spark 3.2.0+ and AQE+DPP not supported on Databricks") +@pytest.mark.skipif(is_databricks_runtime(), + reason="AQE+DPP not supported on Databricks") @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_iceberg_aqe_dpp(spark_tmp_table_factory, reader_type): table = spark_tmp_table_factory.get() @@ -153,9 +153,7 @@ def setup_iceberg_table(spark): ("uncompressed", None), ("snappy", None), ("gzip", None), - pytest.param(("lz4", "Unsupported compression type"), - marks=pytest.mark.skipif(is_before_spark_320(), - reason="Hadoop with Spark 3.1.x does not support lz4 by default")), + pytest.param(("lz4", "Unsupported compression type")), ("zstd", None)], ids=idfn) @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_iceberg_read_parquet_compression_codec(spark_tmp_table_factory, codec_info, reader_type): @@ -294,7 +292,6 @@ def setup_iceberg_table(spark): @iceberg @ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering -@pytest.mark.skipif(is_before_spark_320(), reason="Spark 3.1.x has a catalog bug precluding scope prefix in table names") @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_iceberg_read_timetravel(spark_tmp_table_factory, reader_type): table = spark_tmp_table_factory.get() @@ -318,7 +315,6 @@ def setup_snapshots(spark): @iceberg @ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering -@pytest.mark.skipif(is_before_spark_320(), reason="Spark 3.1.x has a catalog bug precluding scope prefix in table names") @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_iceberg_incremental_read(spark_tmp_table_factory, reader_type): table = spark_tmp_table_factory.get() @@ -537,7 +533,6 @@ def setup_iceberg_table(spark): conf={'spark.rapids.sql.format.parquet.reader.type': reader_type}) @iceberg -@pytest.mark.skipif(is_before_spark_320(), reason="merge-on-read not supported on Spark 3.1.x") @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_iceberg_v2_delete_unsupported(spark_tmp_table_factory, reader_type): table = spark_tmp_table_factory.get() diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index d20f947737a..549a139bd4b 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -185,7 +185,7 @@ def test_json_input_meta(spark_tmp_path, v1_enabled_list): conf=updated_conf) allow_non_gpu_for_json_scan = ['FileSourceScanExec', 'BatchScanExec'] if is_not_utc() else [] -@pytest.mark.parametrize('date_format', [None, 'yyyy-MM-dd'] if is_before_spark_320 else json_supported_date_formats, ids=idfn) +@pytest.mark.parametrize('date_format', json_supported_date_formats, ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "json"]) @allow_non_gpu(*allow_non_gpu_for_json_scan) def test_json_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list): @@ -475,7 +475,7 @@ def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi '[1-3]{1,2}/[1-3]{1,2}/[1-9]{4}', ]) @pytest.mark.parametrize('schema', [StructType([StructField('value', DateType())])]) -@pytest.mark.parametrize('date_format', [None, 'yyyy-MM-dd'] if is_before_spark_320 else json_supported_date_formats) +@pytest.mark.parametrize('date_format', json_supported_date_formats) @pytest.mark.parametrize('ansi_enabled', [True, False]) @pytest.mark.parametrize('allow_numeric_leading_zeros', [True, False]) @allow_non_gpu(*allow_non_gpu_for_json_scan) @@ -507,7 +507,7 @@ def test_json_read_generated_dates(spark_tmp_table_factory, spark_tmp_path, date @pytest.mark.parametrize('schema', [_date_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('ansi_enabled', ["true", "false"]) -@pytest.mark.parametrize('date_format', [None, 'yyyy-MM-dd'] if is_before_spark_320 else json_supported_date_formats) +@pytest.mark.parametrize('date_format', json_supported_date_formats) @pytest.mark.parametrize('time_parser_policy', [ pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')), pytest.param('CORRECTED', marks=pytest.mark.allow_non_gpu(*not_utc_json_scan_allow)), @@ -718,7 +718,7 @@ def test_from_json_struct_decimal(): # boolean "(true|false)" ]) -@pytest.mark.parametrize('date_format', [None, 'yyyy-MM-dd'] if is_before_spark_320 else json_supported_date_formats) +@pytest.mark.parametrize('date_format', json_supported_date_formats) @allow_non_gpu(*non_utc_project_allow) @pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10535') def test_from_json_struct_date(date_gen, date_format): @@ -783,8 +783,6 @@ def test_from_json_struct_date_fallback_non_default_format(date_gen, date_format "\"" + optional_whitespace_regex + yyyy_start_0001 + optional_whitespace_regex + "\"", # "dd/MM/yyyy" "\"" + optional_whitespace_regex + "[0-9]{2}/[0-9]{2}/[1-8]{1}[0-9]{3}" + optional_whitespace_regex + "\"", - # special constant values - pytest.param("\"" + optional_whitespace_regex + "(now|today|tomorrow|epoch)" + optional_whitespace_regex + "\"", marks=pytest.mark.xfail(condition=is_before_spark_320(), reason="https://github.com/NVIDIA/spark-rapids/issues/9724")), # "nnnnn" (number of days since epoch prior to Spark 3.4, throws exception from 3.4) pytest.param("\"" + optional_whitespace_regex + "[0-9]{5}" + optional_whitespace_regex + "\"", marks=pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9664")), # integral @@ -1381,7 +1379,6 @@ def test_spark_from_json_date_with_locale(data, locale): conf =_enable_all_types_conf) @allow_non_gpu(*non_utc_allow) -@pytest.mark.skipif(is_before_spark_320(), reason="dd/MM/yyyy is supported in 3.2.0 and after") def test_spark_from_json_date_with_format(): data = [["""{"time": "26/08/2015"}"""], ["""{"time": "01/01/2024"}"""]] diff --git a/integration_tests/src/main/python/map_test.py b/integration_tests/src/main/python/map_test.py index fa647761b62..bd01ea5ce10 100644 --- a/integration_tests/src/main/python/map_test.py +++ b/integration_tests/src/main/python/map_test.py @@ -192,8 +192,7 @@ def query_map_scalar(spark): @allow_non_gpu('WindowLocalExec') -@datagen_overrides(seed=0, condition=is_before_spark_314() - or (not is_before_spark_320() and is_before_spark_323()) +@datagen_overrides(seed=0, condition=is_before_spark_322() or (not is_before_spark_330() and is_before_spark_331()), reason="https://issues.apache.org/jira/browse/SPARK-40089") @pytest.mark.parametrize('data_gen', supported_key_map_gens, ids=idfn) @allow_non_gpu(*non_utc_allow) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index 789a261b52b..d4001a09ca5 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -234,7 +234,7 @@ def test_pred_push_round_trip(spark_tmp_path, orc_gen, read_func, v1_enabled_lis orc_compress_options = ['none', 'uncompressed', 'snappy', 'zlib'] # zstd is available in spark 3.2.0 and later. -if not is_before_spark_320() and not is_spark_cdh(): +if not is_spark_cdh(): orc_compress_options.append('zstd') # The following need extra jars 'lzo' @@ -301,7 +301,6 @@ def setup_external_table_with_forced_positions(spark, table_name, data_path): rename_cols_query = "CREATE EXTERNAL TABLE `{}` (`col10` INT, `_c1` STRING, `col30` DOUBLE) STORED AS orc LOCATION '{}'".format(table_name, data_path) spark.sql(rename_cols_query).collect -@pytest.mark.skipif(is_before_spark_320(), reason='ORC forced positional evolution support is added in Spark-3.2') @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('forced_position', ["true", "false"]) @pytest.mark.parametrize('orc_impl', ["native", "hive"]) diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index f4928196c82..352e755f373 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -15,7 +15,7 @@ import pytest from asserts import assert_gpu_and_cpu_writes_are_equal_collect, assert_gpu_fallback_write -from spark_session import is_before_spark_320, is_before_spark_400, is_spark_321cdh, is_spark_cdh, with_cpu_session, with_gpu_session +from spark_session import is_before_spark_400, is_spark_321cdh, is_spark_cdh, with_cpu_session, with_gpu_session from conftest import is_not_utc from datetime import date, datetime, timezone from data_gen import * @@ -155,7 +155,7 @@ def do_writes(spark, path): orc_write_compress_options = ['none', 'uncompressed', 'snappy'] # zstd is available in spark 3.2.0 and later. -if not is_before_spark_320() and not is_spark_cdh(): +if not is_spark_cdh(): orc_write_compress_options.append('zstd') @pytest.mark.parametrize('compress', orc_write_compress_options) @@ -305,7 +305,6 @@ def create_empty_df(spark, path): @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="is only supported in Spark 320+") def test_concurrent_writer(spark_tmp_path): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( @@ -321,7 +320,6 @@ def test_concurrent_writer(spark_tmp_path): @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="is only supported in Spark 320+") def test_fallback_to_single_writer_from_concurrent_writer(spark_tmp_path): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index e21ba622f46..08af37626d2 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -284,10 +284,8 @@ def test_parquet_read_round_trip_binary_as_string(std_input_path, read_func, rea assert_gpu_and_cpu_are_equal_collect(read_func(data_path), conf=all_confs) -parquet_compress_options = ['none', 'uncompressed', 'snappy', 'gzip'] -# zstd is available in spark 3.2.0 and later. -if not is_before_spark_320(): - parquet_compress_options.append('zstd') +parquet_compress_options = ['none', 'uncompressed', 'snappy', 'gzip', 'zstd'] + # The following need extra jars 'lzo', 'lz4', 'brotli', 'zstd' # https://github.com/NVIDIA/spark-rapids/issues/143 @@ -747,7 +745,6 @@ def test_spark_32639(std_input_path): lambda spark: spark.read.schema(schema_str).parquet(data_path), conf=original_parquet_file_reader_conf) -@pytest.mark.skipif(not is_before_spark_320(), reason='Spark 3.1.x does not need special handling') @pytest.mark.skipif(is_dataproc_runtime(), reason='https://github.com/NVIDIA/spark-rapids/issues/8074') def test_parquet_read_nano_as_longs_31x(std_input_path): data_path = "%s/timestamp-nanos.parquet" % (std_input_path) @@ -755,7 +752,6 @@ def test_parquet_read_nano_as_longs_31x(std_input_path): assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.parquet(data_path)) -@pytest.mark.skipif(is_before_spark_320(), reason='Spark 3.1.x supports reading timestamps in nanos') def test_parquet_read_nano_as_longs_false(std_input_path): data_path = "%s/timestamp-nanos.parquet" % (std_input_path) conf = copy_and_update(original_parquet_file_reader_conf, { @@ -767,7 +763,6 @@ def read_timestamp_nano_parquet(spark): conf, error_message="Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))") -@pytest.mark.skipif(is_before_spark_320(), reason='Spark 3.1.x supports reading timestamps in nanos') def test_parquet_read_nano_as_longs_not_configured(std_input_path): data_path = "%s/timestamp-nanos.parquet" % (std_input_path) def read_timestamp_nano_parquet(spark): @@ -777,7 +772,6 @@ def read_timestamp_nano_parquet(spark): conf=original_parquet_file_reader_conf, error_message="Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))") -@pytest.mark.skipif(is_before_spark_320(), reason='Spark 3.1.x supports reading timestamps in nanos') @pytest.mark.skipif(spark_version() >= '3.2.0' and spark_version() < '3.2.4', reason='New config added in 3.2.4') @pytest.mark.skipif(spark_version() >= '3.3.0' and spark_version() < '3.3.2', reason='New config added in 3.3.2') @pytest.mark.skipif(is_databricks_runtime() and spark_version() == '3.3.2', reason='Config not in DB 12.2') @@ -1399,7 +1393,7 @@ def test_parquet_check_schema_compatibility_nested_types(spark_tmp_path): lambda spark: spark.read.schema(read_map_str_str_as_str_int).parquet(data_path).collect()), error_message='Parquet column cannot be converted') -@pytest.mark.skipif(is_before_spark_320() or is_spark_321cdh(), reason='Encryption is not supported before Spark 3.2.0 or Parquet < 1.12') +@pytest.mark.skipif(is_spark_321cdh(), reason='Encryption is not supported before Parquet < 1.12') @pytest.mark.skipif(os.environ.get('INCLUDE_PARQUET_HADOOP_TEST_JAR', 'false') == 'false', reason='INCLUDE_PARQUET_HADOOP_TEST_JAR is disabled') @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 805a0b8137c..47c898dc138 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -236,10 +236,7 @@ def _cache_repr(self): data_path, conf=confs) -parquet_write_compress_options = ['none', 'uncompressed', 'snappy'] -# zstd is available in spark 3.2.0 and later. -if not is_before_spark_320(): - parquet_write_compress_options.append('zstd') +parquet_write_compress_options = ['none', 'uncompressed', 'snappy', 'zstd'] @pytest.mark.parametrize('compress', parquet_write_compress_options) def test_compress_write_round_trip(spark_tmp_path, compress): @@ -671,7 +668,6 @@ def test_write_daytime_interval(spark_tmp_path): conf=writer_confs) @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="is only supported in Spark 320+") def test_concurrent_writer(spark_tmp_path): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( @@ -687,7 +683,6 @@ def test_concurrent_writer(spark_tmp_path): @ignore_order -@pytest.mark.skipif(is_before_spark_320(), reason="is only supported in Spark 320+") @allow_non_gpu(any=True) @pytest.mark.parametrize('aqe_enabled', [True, False]) def test_fallback_to_single_writer_from_concurrent_writer(spark_tmp_path, aqe_enabled): diff --git a/integration_tests/src/main/python/prune_partition_column_test.py b/integration_tests/src/main/python/prune_partition_column_test.py index 63d3d83055e..c4e7f336280 100644 --- a/integration_tests/src/main/python/prune_partition_column_test.py +++ b/integration_tests/src/main/python/prune_partition_column_test.py @@ -19,7 +19,7 @@ from data_gen import * from marks import * from pyspark.sql.types import IntegerType -from spark_session import with_cpu_session, is_before_spark_320 +from spark_session import with_cpu_session from conftest import spark_jvm # Several values to avoid generating too many folders for partitions. @@ -192,7 +192,6 @@ def do_it(spark): # https://github.com/NVIDIA/spark-rapids/issues/8715 @pytest.mark.parametrize('query, expected_schemata', [("friend.First", "struct>>"), ("friend.MIDDLE", "struct>>")]) -@pytest.mark.skipif(is_before_spark_320(), reason='https://issues.apache.org/jira/browse/SPARK-34638') @pytest.mark.parametrize('is_partitioned', [True, False]) @pytest.mark.parametrize('format', ["parquet", "orc"]) def test_nested_column_prune_on_generator_output(format, spark_tmp_path, query, expected_schemata, is_partitioned, spark_tmp_table_factory): @@ -205,4 +204,4 @@ def do_it(spark): return do_it conf = {"spark.sql.caseSensitive": "false", "spark.sql.parquet.enableVectorizedReader": "true"} - create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf, table_name) \ No newline at end of file + create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf, table_name) diff --git a/integration_tests/src/main/python/regexp_test.py b/integration_tests/src/main/python/regexp_test.py index c2062605ca1..c4927d125b4 100644 --- a/integration_tests/src/main/python/regexp_test.py +++ b/integration_tests/src/main/python/regexp_test.py @@ -20,7 +20,7 @@ from data_gen import * from marks import * from pyspark.sql.types import * -from spark_session import is_before_spark_320, is_before_spark_350, is_jvm_charset_utf8, is_databricks_runtime, spark_version +from spark_session import is_before_spark_350, is_jvm_charset_utf8, is_databricks_runtime, spark_version if not is_jvm_charset_utf8(): pytestmark = [pytest.mark.regexp, pytest.mark.skip(reason=str("Current locale doesn't support UTF-8, regexp support is disabled"))] @@ -422,7 +422,6 @@ def test_regexp_replace(): 'regexp_replace(a, "a|b|c", "A")'), conf=_regexp_conf) -@pytest.mark.skipif(is_before_spark_320(), reason='regexp is synonym for RLike starting in Spark 3.2.0') def test_regexp(): gen = mk_str_gen('[abcd]{1,3}') assert_gpu_and_cpu_are_equal_collect( @@ -433,7 +432,6 @@ def test_regexp(): 'regexp(a, "a[bc]d")'), conf=_regexp_conf) -@pytest.mark.skipif(is_before_spark_320(), reason='regexp_like is synonym for RLike starting in Spark 3.2.0') def test_regexp_like(): gen = mk_str_gen('[abcd]{1,3}') assert_gpu_and_cpu_are_equal_collect( diff --git a/integration_tests/src/main/python/repart_test.py b/integration_tests/src/main/python/repart_test.py index 7f299373ff6..6d901df3da3 100644 --- a/integration_tests/src/main/python/repart_test.py +++ b/integration_tests/src/main/python/repart_test.py @@ -15,7 +15,7 @@ import pytest from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect -from spark_session import is_before_spark_320, is_before_spark_330 +from spark_session import is_before_spark_330 from conftest import is_not_utc from data_gen import * from marks import ignore_order, allow_non_gpu diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 4473767fe65..5b4c6d4444a 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -165,9 +165,6 @@ def with_gpu_session(func, conf={}): copy['spark.rapids.sql.test.validateExecsInGpuPlan'] = ','.join(get_validate_execs_in_gpu_plan()) return with_spark_session(func, conf=copy) -def is_before_spark_320(): - return spark_version() < "3.2.0" - def is_before_spark_322(): return spark_version() < "3.2.2" diff --git a/integration_tests/src/main/python/string_test.py b/integration_tests/src/main/python/string_test.py index 4ae4a827aa0..66efb34695c 100644 --- a/integration_tests/src/main/python/string_test.py +++ b/integration_tests/src/main/python/string_test.py @@ -23,7 +23,8 @@ from pyspark.sql.types import * import pyspark.sql.utils import pyspark.sql.functions as f -from spark_session import with_cpu_session, with_gpu_session, is_databricks104_or_later, is_before_spark_320, is_before_spark_400 +from spark_session import with_cpu_session, with_gpu_session, is_databricks104_or_later, \ + is_before_spark_400 _regexp_conf = { 'spark.rapids.sql.regexp.enabled': 'true' } @@ -663,8 +664,6 @@ def assert_gpu_did_fallback(sql_text): @incompat -@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2+ does translate() support unicode \ - characters with code point >= U+10000. See https://issues.apache.org/jira/browse/SPARK-34094") def test_translate_large_codepoints(): gen = mk_str_gen('.{0,5}TEST[\ud720 \U0010FFFF A]{0,5}') assert_gpu_and_cpu_are_equal_collect( @@ -833,9 +832,7 @@ def test_like_complex_escape(): # to_base can be positive and negative @pytest.mark.parametrize('to_base', [10, 16], ids=['to_plus10', 'to_plus16']) def test_conv_dec_to_from_hex(from_base, to_base, pattern): - # before 3.2 leading space are deem the string non-numeric and the result is 0 - if not is_before_spark_320: - pattern = r' ?' + pattern + pattern = r' ?' + pattern gen = mk_str_gen(pattern) assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, gen).select('a', f.conv(f.col('a'), from_base, to_base)), diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 653eaffa940..174790f7077 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -21,7 +21,7 @@ from pyspark.sql.types import DateType, TimestampType, NumericType from pyspark.sql.window import Window import pyspark.sql.functions as f -from spark_session import is_before_spark_320, is_databricks113_or_later, is_databricks133_or_later, is_spark_350_or_later, spark_version, with_cpu_session +from spark_session import is_databricks113_or_later, is_databricks133_or_later, is_spark_350_or_later, spark_version, with_cpu_session import warnings _grpkey_longs_with_no_nulls = [ @@ -1083,7 +1083,6 @@ def do_it(spark): .withColumn('percent_rank_val', f.percent_rank().over(baseWindowSpec)) assert_gpu_and_cpu_are_equal_collect(do_it, conf = {'spark.rapids.sql.batchSizeBytes': '100'}) -@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0 is IGNORE NULLS supported for lead and lag by Spark") @allow_non_gpu('WindowExec', 'Alias', 'WindowExpression', 'Lead', 'Literal', 'WindowSpecDefinition', 'SpecifiedWindowFrame', *non_utc_allow) @ignore_order(local=True) @pytest.mark.parametrize('d_gen', all_basic_gens, ids=meta_idfn('agg:')) @@ -1107,7 +1106,6 @@ def test_window_aggs_lead_ignore_nulls_fallback(a_gen, b_gen, c_gen, d_gen): FROM window_agg_table ''') -@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0 is IGNORE NULLS supported for lead and lag by Spark") @allow_non_gpu('WindowExec', 'Alias', 'WindowExpression', 'Lag', 'Literal', 'WindowSpecDefinition', 'SpecifiedWindowFrame', *non_utc_allow) @ignore_order(local=True) @pytest.mark.parametrize('d_gen', all_basic_gens, ids=meta_idfn('agg:')) @@ -1808,7 +1806,6 @@ def test_window_first_last_nth(data_gen): 'SELECT a, b, c, ' + exprs_for_nth_first_last + 'FROM window_agg_table') -@pytest.mark.skipif(is_before_spark_320(), reason='IGNORE NULLS clause is not supported for FIRST(), LAST() and NTH_VALUE in Spark 3.1.x') @pytest.mark.parametrize('data_gen', all_basic_gens_no_null + decimal_gens + _nested_gens, ids=idfn) def test_window_first_last_nth_ignore_nulls(data_gen): assert_gpu_and_cpu_are_equal_sql(