Skip to content

Commit

Permalink
SNOW-933566 Make test_async_job_suite runnable for stored proc (#1083)
Browse files Browse the repository at this point in the history
Description

There are certain limitations in stored proc testing, and this
change adds the necessery changes that includes:

- Skip stored proc testing if not supported
- Skip localfs testing if stage access is required
- Adapt temp object name

Testing

integ test
  • Loading branch information
sfc-gh-sfan authored Oct 11, 2023
1 parent 3bd6378 commit 697294d
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions tests/integ/scala/test_async_job_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
StructField,
StructType,
)
from tests.utils import TestFiles, Utils
from tests.utils import IS_IN_STORED_PROC, IS_IN_STORED_PROC_LOCALFS, TestFiles, Utils

test_file_csv = "testCSV.csv"
tmp_stage_name1 = Utils.random_stage_name()
Expand Down Expand Up @@ -107,6 +107,7 @@ def test_async_to_pandas_common(session):
)


@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires large result")
@pytest.mark.skipif(not is_pandas_available, reason="Pandas is not available")
def test_async_to_pandas_batches(session):
df = session.range(100000).cache_result()
Expand Down Expand Up @@ -176,15 +177,16 @@ def test_async_first(session):


def test_async_table_operations(session):
table_name = Utils.random_table_name()
# merge operation
schema = StructType(
[StructField("key", IntegerType()), StructField("value", StringType())]
)
target_df = session.create_dataframe(
[(10, "old"), (10, "too_old"), (11, "old")], schema=schema
)
target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
target = session.table("my_table")
target_df.write.save_as_table(table_name, mode="overwrite", table_type="temporary")
target = session.table(table_name)
source = session.create_dataframe(
[(10, "new"), (12, "new"), (13, "old")], schema=schema
)
Expand Down Expand Up @@ -212,8 +214,8 @@ def test_async_table_operations(session):
target_df = session.create_dataframe(
[(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)], schema=["a", "b"]
)
target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
target = session.table("my_table")
target_df.write.save_as_table(table_name, mode="overwrite", table_type="temporary")
target = session.table(table_name)
res = target.delete(target["a"] == 1, block=False)
assert res.result() == DeleteResult(rows_deleted=2)
Utils.check_answer(
Expand All @@ -224,8 +226,8 @@ def test_async_table_operations(session):
target_df = session.create_dataframe(
[(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)], schema=["a", "b"]
)
target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
target = session.table("my_table")
target_df.write.save_as_table(table_name, mode="overwrite", table_type="temporary")
target = session.table(table_name)
res = target.update({"b": 0, "a": target.a + target.b}, block=False)
assert res.result() == UpdateResult(rows_updated=6, multi_joined_rows_updated=0)
Utils.check_answer(
Expand Down Expand Up @@ -255,6 +257,7 @@ def test_async_save_as_table(session):
Utils.check_answer(table_df, df)


@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires stage access")
def test_async_copy_into_location(session):
remote_location = f"{session.get_session_stage()}/{random_name_for_temp_object(TempObjectType.TABLE)}.csv"
df = session.create_dataframe(
Expand All @@ -278,6 +281,11 @@ def test_async_copy_into_location(session):
Utils.check_answer(res, df)


@pytest.mark.skipif(
IS_IN_STORED_PROC,
reason="TODO(SNOW-933567): result_scan for child job of multistmt is not supported",
)
@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires stage access")
@pytest.mark.skipif(not is_pandas_available, reason="to_pandas requires pandas")
def test_multiple_queries(session, resources_path):
user_schema = StructType(
Expand Down Expand Up @@ -332,6 +340,10 @@ def test_async_batch_insert(session):
analyzer.ARRAY_BIND_THRESHOLD = original_value


@pytest.mark.skipif(
IS_IN_STORED_PROC,
reason="TODO(SNOW-932722): Cancel query is not allowed in stored proc",
)
def test_async_is_running_and_cancel(session):
async_job = session.sql("select SYSTEM$WAIT(3)").collect_nowait()
while not async_job.is_done():
Expand All @@ -350,6 +362,11 @@ def test_async_is_running_and_cancel(session):
assert async_job2.is_done()


@pytest.mark.skipif(
IS_IN_STORED_PROC,
reason="TODO(SNOW-933569): large result in multi-stmt is not supported in stored proc",
)
@pytest.mark.skipif(IS_IN_STORED_PROC_LOCALFS, reason="Requires large result")
def test_async_place_holder(session):
exp = session.sql("show functions").where("1=1").collect()
async_job = session.sql("show functions").where("1=1").collect_nowait()
Expand Down Expand Up @@ -416,6 +433,7 @@ def test_create_async_job_negative(session):
async_job.result()


@pytest.mark.skipif(IS_IN_STORED_PROC, reason="caplog is not supported")
@pytest.mark.parametrize("create_async_job_from_query_id", [True, False])
def test_get_query_from_async_job(session, create_async_job_from_query_id, caplog):
query_text = "select 1, 2, 3"
Expand All @@ -433,6 +451,7 @@ def test_get_query_from_async_job(session, create_async_job_from_query_id, caplo
assert "result is empty" in caplog.text


@pytest.mark.skipif(IS_IN_STORED_PROC, reason="caplog is not supported")
def test_get_query_from_async_job_negative(session, caplog):
invalid_query_id = "negative_test_invalid_query_id"
async_job = session.create_async_job(invalid_query_id)
Expand Down

0 comments on commit 697294d

Please sign in to comment.