From 697294d8ee89396deab4309dea384b1eadb75d6b Mon Sep 17 00:00:00 2001 From: Shixuan Fan Date: Wed, 11 Oct 2023 09:12:57 -0700 Subject: [PATCH] SNOW-933566 Make test_async_job_suite runnable for stored proc (#1083) Description There are certain limitations in stored proc testing, and this change adds the necessery changes that includes: - Skip stored proc testing if not supported - Skip localfs testing if stage access is required - Adapt temp object name Testing integ test --- tests/integ/scala/test_async_job_suite.py | 33 ++++++++++++++++++----- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/tests/integ/scala/test_async_job_suite.py b/tests/integ/scala/test_async_job_suite.py index b7edf450225..955fb3034d2 100644 --- a/tests/integ/scala/test_async_job_suite.py +++ b/tests/integ/scala/test_async_job_suite.py @@ -32,7 +32,7 @@ StructField, StructType, ) -from tests.utils import TestFiles, Utils +from tests.utils import IS_IN_STORED_PROC, IS_IN_STORED_PROC_LOCALFS, TestFiles, Utils test_file_csv = "testCSV.csv" tmp_stage_name1 = Utils.random_stage_name() @@ -107,6 +107,7 @@ def test_async_to_pandas_common(session): ) +@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires large result") @pytest.mark.skipif(not is_pandas_available, reason="Pandas is not available") def test_async_to_pandas_batches(session): df = session.range(100000).cache_result() @@ -176,6 +177,7 @@ def test_async_first(session): def test_async_table_operations(session): + table_name = Utils.random_table_name() # merge operation schema = StructType( [StructField("key", IntegerType()), StructField("value", StringType())] @@ -183,8 +185,8 @@ def test_async_table_operations(session): target_df = session.create_dataframe( [(10, "old"), (10, "too_old"), (11, "old")], schema=schema ) - target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary") - target = session.table("my_table") + target_df.write.save_as_table(table_name, mode="overwrite", table_type="temporary") + target = session.table(table_name) source = session.create_dataframe( [(10, "new"), (12, "new"), (13, "old")], schema=schema ) @@ -212,8 +214,8 @@ def test_async_table_operations(session): target_df = session.create_dataframe( [(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)], schema=["a", "b"] ) - target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary") - target = session.table("my_table") + target_df.write.save_as_table(table_name, mode="overwrite", table_type="temporary") + target = session.table(table_name) res = target.delete(target["a"] == 1, block=False) assert res.result() == DeleteResult(rows_deleted=2) Utils.check_answer( @@ -224,8 +226,8 @@ def test_async_table_operations(session): target_df = session.create_dataframe( [(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)], schema=["a", "b"] ) - target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary") - target = session.table("my_table") + target_df.write.save_as_table(table_name, mode="overwrite", table_type="temporary") + target = session.table(table_name) res = target.update({"b": 0, "a": target.a + target.b}, block=False) assert res.result() == UpdateResult(rows_updated=6, multi_joined_rows_updated=0) Utils.check_answer( @@ -255,6 +257,7 @@ def test_async_save_as_table(session): Utils.check_answer(table_df, df) +@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires stage access") def test_async_copy_into_location(session): remote_location = f"{session.get_session_stage()}/{random_name_for_temp_object(TempObjectType.TABLE)}.csv" df = session.create_dataframe( @@ -278,6 +281,11 @@ def test_async_copy_into_location(session): Utils.check_answer(res, df) +@pytest.mark.skipif( + IS_IN_STORED_PROC, + reason="TODO(SNOW-933567): result_scan for child job of multistmt is not supported", +) +@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires stage access") @pytest.mark.skipif(not is_pandas_available, reason="to_pandas requires pandas") def test_multiple_queries(session, resources_path): user_schema = StructType( @@ -332,6 +340,10 @@ def test_async_batch_insert(session): analyzer.ARRAY_BIND_THRESHOLD = original_value +@pytest.mark.skipif( + IS_IN_STORED_PROC, + reason="TODO(SNOW-932722): Cancel query is not allowed in stored proc", +) def test_async_is_running_and_cancel(session): async_job = session.sql("select SYSTEM$WAIT(3)").collect_nowait() while not async_job.is_done(): @@ -350,6 +362,11 @@ def test_async_is_running_and_cancel(session): assert async_job2.is_done() +@pytest.mark.skipif( + IS_IN_STORED_PROC, + reason="TODO(SNOW-933569): large result in multi-stmt is not supported in stored proc", +) +@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires large result") def test_async_place_holder(session): exp = session.sql("show functions").where("1=1").collect() async_job = session.sql("show functions").where("1=1").collect_nowait() @@ -416,6 +433,7 @@ def test_create_async_job_negative(session): async_job.result() +@pytest.mark.skipif(IS_IN_STORED_PROC, reason="caplog is not supported") @pytest.mark.parametrize("create_async_job_from_query_id", [True, False]) def test_get_query_from_async_job(session, create_async_job_from_query_id, caplog): query_text = "select 1, 2, 3" @@ -433,6 +451,7 @@ def test_get_query_from_async_job(session, create_async_job_from_query_id, caplo assert "result is empty" in caplog.text +@pytest.mark.skipif(IS_IN_STORED_PROC, reason="caplog is not supported") def test_get_query_from_async_job_negative(session, caplog): invalid_query_id = "negative_test_invalid_query_id" async_job = session.create_async_job(invalid_query_id)