Skip to content

Commit

Permalink
Use a make_insert_query function in test_sql.py (#3983)
Browse files Browse the repository at this point in the history
* added make_insert_query function

* used make_insert_query function in first occurance of load_data_query

* added types and docstring

* refactored all remanining places to use the new make_insert_query function

* edited docstring to please linter

---------

Co-authored-by: kuephi <[email protected]>
  • Loading branch information
kuephi and philipp-kuebler authored Mar 28, 2024
1 parent abfefc0 commit 3c00cb4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 67 deletions.
100 changes: 34 additions & 66 deletions catalog/tests/dags/common/loader/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,7 @@ def test_upsert_records_inserts_one_record_to_empty_image_table(
col.HEIGHT.db_name: HEIGHT,
}
)
load_data_query = f"""INSERT INTO {load_table} VALUES(
{query_values}
);"""
load_data_query = utils.make_insert_query(load_table, query_values)

_set_up_std_popularity_func(
postgres_with_load_and_image_table,
Expand Down Expand Up @@ -686,9 +684,8 @@ def test_upsert_records_replaces_data(
col.HEIGHT.db_name: HEIGHT_A,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values)

_set_up_std_popularity_func(
postgres_with_load_and_image_table,
load_data_query_a,
Expand Down Expand Up @@ -728,9 +725,8 @@ def test_upsert_records_replaces_data(
col.HEIGHT.db_name: HEIGHT_B,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values)

postgres_with_load_and_image_table.cursor.execute(f"DELETE FROM {load_table};")
postgres_with_load_and_image_table.connection.commit()
postgres_with_load_and_image_table.cursor.execute(load_data_query_b)
Expand Down Expand Up @@ -817,9 +813,8 @@ def test_upsert_records_does_not_replace_with_nulls(
col.HEIGHT.db_name: HEIGHT_A,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

_set_up_std_popularity_func(
postgres_with_load_and_image_table,
load_data_query_a,
Expand Down Expand Up @@ -849,9 +844,8 @@ def test_upsert_records_does_not_replace_with_nulls(
col.SOURCE.db_name: SOURCE,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)

postgres_with_load_and_image_table.cursor.execute(f"DELETE FROM {load_table};")
postgres_with_load_and_image_table.connection.commit()
postgres_with_load_and_image_table.cursor.execute(load_data_query_b)
Expand Down Expand Up @@ -910,9 +904,7 @@ def test_upsert_records_merges_meta_data(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

query_values_b = utils.create_query_values(
{
Expand All @@ -923,9 +915,8 @@ def test_upsert_records_merges_meta_data(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)

_set_up_std_popularity_func(
postgres_with_load_and_image_table,
load_data_query_a,
Expand Down Expand Up @@ -991,9 +982,7 @@ def test_upsert_records_does_not_replace_with_null_values_in_meta_data(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

query_values_b = utils.create_query_values(
{
Expand All @@ -1004,9 +993,8 @@ def test_upsert_records_does_not_replace_with_null_values_in_meta_data(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)

_set_up_std_popularity_func(
postgres_with_load_and_image_table,
load_data_query_a,
Expand Down Expand Up @@ -1081,9 +1069,7 @@ def test_upsert_records_merges_tags(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

query_values_b = utils.create_query_values(
{
Expand All @@ -1094,9 +1080,7 @@ def test_upsert_records_merges_tags(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)
_set_up_std_popularity_func(
postgres_with_load_and_image_table,
load_data_query_a,
Expand Down Expand Up @@ -1169,9 +1153,7 @@ def test_upsert_records_does_not_replace_tags_with_null(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

query_values_b = utils.create_query_values(
{
Expand All @@ -1181,9 +1163,8 @@ def test_upsert_records_does_not_replace_tags_with_null(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)

_set_up_std_popularity_func(
postgres_with_load_and_image_table,
load_data_query_a,
Expand Down Expand Up @@ -1253,9 +1234,8 @@ def test_upsert_records_replaces_null_tags(
}
)
logging.info(f"Query values a: {query_values_a}")
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

query_values_b = utils.create_query_values(
{
col.FOREIGN_ID.db_name: FID,
Expand All @@ -1265,9 +1245,7 @@ def test_upsert_records_replaces_null_tags(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)

_set_up_std_popularity_func(
postgres_with_load_and_image_table,
Expand Down Expand Up @@ -1342,9 +1320,7 @@ def test_upsert_records_handles_duplicate_url_and_does_not_merge(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

query_values_b = utils.create_query_values(
{
Expand All @@ -1355,9 +1331,7 @@ def test_upsert_records_handles_duplicate_url_and_does_not_merge(
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)

# Simulate a DAG run where A is ingested into the loading table, upserted into
# the image table, and finally the loading table is cleared for the next DAG run.
Expand Down Expand Up @@ -1438,9 +1412,7 @@ def test_upsert_records_handles_duplicate_urls_in_a_single_batch_and_does_not_me
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_a = f"""INSERT INTO {load_table} VALUES(
{query_values_a}
);"""
load_data_query_a = utils.make_insert_query(load_table, query_values_a)

query_values_b = utils.create_query_values(
{
Expand All @@ -1451,9 +1423,7 @@ def test_upsert_records_handles_duplicate_urls_in_a_single_batch_and_does_not_me
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_b = f"""INSERT INTO {load_table} VALUES(
{query_values_b}
);"""
load_data_query_b = utils.make_insert_query(load_table, query_values_b)

# C is not a duplicate of anything, just a normal image
query_values_c = utils.create_query_values(
Expand All @@ -1465,9 +1435,7 @@ def test_upsert_records_handles_duplicate_urls_in_a_single_batch_and_does_not_me
col.PROVIDER.db_name: PROVIDER,
}
)
load_data_query_c = f"""INSERT INTO {load_table} VALUES(
{query_values_c}
);"""
load_data_query_c = utils.make_insert_query(load_table, query_values_c)

# Simulate a DAG run where duplicates (A and B) and a non-duplicate (C) are all
# ingested in a single batch from the provider script, and we attempt to upsert
Expand Down Expand Up @@ -1595,13 +1563,13 @@ def test_upsert_records_calculates_standardized_popularity(
)

# Queries to insert the two records into the load table.
load_data_query_old_record = f"""INSERT INTO {load_table} VALUES(
{query_values_update_old_record}
);"""
load_data_query_old_record = utils.make_insert_query(
load_table, query_values_update_old_record
)

load_data_query_new_record = f"""INSERT INTO {load_table} VALUES(
{query_values_update_new_record}
);"""
load_data_query_new_record = utils.make_insert_query(
load_table, query_values_update_new_record
)

# Now actually insert the records into the load table. This simulates a DagRun which ingests both
# records.
Expand Down
7 changes: 6 additions & 1 deletion catalog/tests/test_utils/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,18 @@ def create_query_values(
return ",".join(result)


def make_insert_query(table: str, values: str) -> str:
"""Return an SQL insert statement for the given table with the given values"""
return f"INSERT INTO {table} VALUES({values});"


def _get_insert_query(image_table, values: dict):
# Append the required identifier
values[col.IDENTIFIER.db_name] = uuid.uuid4()

query_values = create_query_values(values, columns=IMAGE_TABLE_COLUMNS)

return f"INSERT INTO {image_table} VALUES({query_values});"
return make_insert_query(image_table, query_values)


def load_sample_data_into_image_table(image_table, postgres, records):
Expand Down

0 comments on commit 3c00cb4

Please sign in to comment.