Skip to content

Commit

Permalink
Add integration tests for glue/Athena
Browse files Browse the repository at this point in the history
This covers a few use cases, including updating the schema and reading
reading real data back from Athena.
  • Loading branch information
mgmarino committed Jan 22, 2024
1 parent defdac4 commit 40ab6e6
Showing 1 changed file with 159 additions and 3 deletions.
162 changes: 159 additions & 3 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
# specific language governing permissions and limitations
# under the License.

from typing import Generator, List
import time
from typing import Any, Dict, Generator, List
from uuid import uuid4

import boto3
import pyarrow as pa
import pytest
from botocore.exceptions import ClientError

Expand All @@ -30,6 +33,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import clean_up, get_bucket_name, get_s3_path
Expand All @@ -52,8 +56,62 @@ def fixture_test_catalog() -> Generator[Catalog, None, None]:
clean_up(test_catalog)


class AthenaQueryHelper:

This comment has been minimized.

Copy link
@nicor88

nicor88 Jan 22, 2024

did you consider to use PyAthena, it's a python wrapper on athena and it allows you to avoid to re-invent the wheel, for example the query result fetching is implemented already.

This comment has been minimized.

Copy link
@mgmarino

mgmarino Jan 22, 2024

Author Contributor

I didn't want to add a dependency. awswrangler would've been another option here.

Happy to do so if @HonahX would rather go that route, but I'm always a little hesitant to add another (even test) dep on a public project. :-)

This comment has been minimized.

Copy link
@nicor88

nicor88 Jan 22, 2024

I think that awswrangler add to much complexity, I find pyathena quite good and well maintain to motivate the usage.
Worth to chime in @Fokko too.

_athena_client: boto3.client
_s3_resource: boto3.resource
_output_bucket: str
_output_path: str

def __init__(self) -> None:
self._s3_resource = boto3.resource("s3")
self._athena_client = boto3.client("athena")
self._output_bucket = get_bucket_name()
self._output_path = f"athena_results_{uuid4()}"

def get_query_results(self, query: str) -> List[Dict[str, Any]]:
query_execution_id = self._athena_client.start_query_execution(
QueryString=query, ResultConfiguration={"OutputLocation": f"s3://{self._output_bucket}/{self._output_path}"}
)["QueryExecutionId"]

while True:
result = self._athena_client.get_query_execution(QueryExecutionId=query_execution_id)["QueryExecution"]["Status"]
query_status = result["State"]
assert query_status not in [
"FAILED",
"CANCELLED",
], f"""
Athena query with the string failed or was cancelled:
Query: {query}
Status: {query_status}
Reason: {result["StateChangeReason"]}"""

if query_status not in ["QUEUED", "RUNNING"]:
break
time.sleep(0.5)

# No pagination for now, assume that we are not doing large queries
return self._athena_client.get_query_results(QueryExecutionId=query_execution_id)["ResultSet"]["Rows"]

def clean_up(self) -> None:
bucket = self._s3_resource.Bucket(self._output_bucket)
for obj in bucket.objects.filter(Prefix=f"{self._output_path}/"):
self._s3_resource.Object(bucket.name, obj.key).delete()


@pytest.fixture(name="athena", scope="module")
def fixture_athena_helper() -> Generator[AthenaQueryHelper, None, None]:
query_helper = AthenaQueryHelper()
yield query_helper
query_helper.clean_up()


def test_create_table(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, table_name: str, database_name: str
test_catalog: Catalog,
s3: boto3.client,
table_schema_nested: Schema,
table_name: str,
database_name: str,
athena: AthenaQueryHelper,
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
Expand All @@ -64,6 +122,48 @@ def test_create_table(
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[
{
"foo": "foo_val",
"bar": 1,
"baz": False,
"qux": ["x", "y"],
"quux": {"key": {"subkey": 2}},
"location": [{"latitude": 1.1}],
"person": {"name": "some_name", "age": 23},
}
],
schema=schema_to_pyarrow(table.schema()),
),
)

assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [
{
"Data": [
{"VarCharValue": "foo"},
{"VarCharValue": "bar"},
{"VarCharValue": "baz"},
{"VarCharValue": "qux"},
{"VarCharValue": "quux"},
{"VarCharValue": "location"},
{"VarCharValue": "person"},
]
},
{
"Data": [
{"VarCharValue": "foo_val"},
{"VarCharValue": "1"},
{"VarCharValue": "false"},
{"VarCharValue": "[x, y]"},
{"VarCharValue": "{key={subkey=2}}"},
{"VarCharValue": "[{latitude=1.1, longitude=null}]"},
{"VarCharValue": "{name=some_name, age=23}"},
]
},
]


def test_create_table_with_invalid_location(table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
Expand Down Expand Up @@ -269,7 +369,7 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str)


def test_commit_table_update_schema(
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str, athena: AthenaQueryHelper
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
Expand All @@ -279,6 +379,20 @@ def test_commit_table_update_schema(
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert original_table_metadata.current_schema_id == 0

assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [
{
"Data": [
{"VarCharValue": "foo"},
{"VarCharValue": "bar"},
{"VarCharValue": "baz"},
{"VarCharValue": "qux"},
{"VarCharValue": "quux"},
{"VarCharValue": "location"},
{"VarCharValue": "person"},
]
}
]

transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
Expand All @@ -295,6 +409,48 @@ def test_commit_table_update_schema(
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()

table.append(
pa.Table.from_pylist(
[
{
"foo": "foo_val",
"bar": 1,
"location": [{"latitude": 1.1}],
"person": {"name": "some_name", "age": 23},
"b": 2,
}
],
schema=schema_to_pyarrow(new_schema),
),
)

assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [
{
"Data": [
{"VarCharValue": "foo"},
{"VarCharValue": "bar"},
{"VarCharValue": "baz"},
{"VarCharValue": "qux"},
{"VarCharValue": "quux"},
{"VarCharValue": "location"},
{"VarCharValue": "person"},
{"VarCharValue": "b"},
]
},
{
"Data": [
{"VarCharValue": "foo_val"},
{"VarCharValue": "1"},
{},
{"VarCharValue": "[]"},
{"VarCharValue": "{}"},
{"VarCharValue": "[{latitude=1.1, longitude=null}]"},
{"VarCharValue": "{name=some_name, age=23}"},
{"VarCharValue": "2"},
]
},
]


def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
Expand Down

0 comments on commit 40ab6e6

Please sign in to comment.