Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-481] Utilize glue's resolve choice in dynamic frame input in JSON to Parquet #151

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

rxu17
Copy link
Contributor

@rxu17 rxu17 commented Aug 25, 2023

Purpose: This PR casts the fields in a dynamic frame object input to the data types in the schema crawled by the glue crawler in our glue tables. This solves the issue of glue splitting a field out into multiple columns thinking it has more than one data type.

This PR has three new core functions that convert from the hive schema format to json schema format, and then parses it to get the data types from the glue tables crawled by the glue crawlers so we can use it in the glue function resolveChoice.

  • parse_hive_schema
  • convert_json_schema_to_specs
  • cast_glue_data_types (calls the other two)

A couple of caveats are that:

  • We only expect to convert fields in a non-array format so something like: fieldName: [1, 2, 3] won't be resolved
  • We also won't be able to resolve choice in a field to be an array or a struct

You can see these conditions handled in the convert_json_schema_to_specs functions

This is a draft. I still need to add a timeout condition for the while loops/recursive functionality. One question is if the placement of the cast_glue_data_types function is in the expected spot in the main json to parquet code.

@rxu17 rxu17 temporarily deployed to develop August 25, 2023 20:38 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 25, 2023 20:41 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 25, 2023 20:41 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 25, 2023 20:44 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 25, 2023 20:45 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 29, 2023 01:08 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 29, 2023 01:11 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 29, 2023 01:11 — with GitHub Actions Inactive
@rxu17 rxu17 temporarily deployed to develop August 29, 2023 01:14 — with GitHub Actions Inactive
Copy link
Member

@thomasyu888 thomasyu888 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Leaving some initial comments.

return df_with_index


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

glue_table = dynamic_frame.resolveChoice(specs=specs)
return glue_table


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is there a python package that does this already?

Copy link
Contributor Author

@rxu17 rxu17 Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to parse_hive_schema? I did look for one and as a result of my research it sounds like there wasn't anything that does this (converting from json hive schema string to dictionary format) out of the box and it has to be manually done in this way. I tried looking into some of the custom parsing libraries like pyparsing but they didn't seem to support this use case.


iteration = 0
while to_parse:
if iteration > MAX_ITERATIONS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you determine the max iterations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just set it to 100 for now because I don't imagine we'll have a super nested schema to parse? I could also keep it consistent and just set it to the same as the default recursion limit that's already in python: sys.getrecursionlimit() which is 1000.

)
left, operator, to_parse = r.match(to_parse).groups()

if operator == "struct<" or operator == "array<":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If key is not None to be precise.

hive_str = f"struct<{top_level_field}:{hive_str}>"

to_parse = hive_str
parents = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I may be missing something, but is this line required?

Copy link
Contributor Author

@rxu17 rxu17 Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which line? curr_elem = root? Not sure why, but with my github view of the comments, I'm not sure which line(s) the comments are pointing at


if operator == "struct<" or operator == "array<":
parents.append(curr_elem)
new_elem = dict() if operator == "struct<" else list()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here, you set the key and then you write over the existing dict with an empty one?

(path_to_field1 : cast:<field1_type>),
(path_to_field2 : cast:<field2_type>)
]
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we only need to support json with length one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON with length one? Nope this is in theory supposed to support a json schema that has many nested fields

if operator == ">":
curr_elem = parents.pop()
iteration += 1
return root
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a bit risky to have an empty list as a default value: https://www.codiga.io/blog/python-avoid-empty-array-parameter/

@@ -1,4 +1,4 @@
FROM amazon/aws-glue-libs:glue_libs_3.0.0_image_01

RUN pip3 install synapseclient==2.5.* pytest-datadir
RUN pip3 install synapseclient==2.5.* pyspark==3.* pyarrow==9.* pytest-datadir
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: may want to use the ~ syntax

@@ -21,19 +22,22 @@
from awsglue.utils import getResolvedOptions
from pyspark import SparkContext


# maximum iterations for any loop in order to timeout a loop
MAX_ITERATIONS = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this defined globally? This seems like it should be a default parameter for the parse_hive_schema function.

return root


def convert_json_schema_to_specs(json_schema: dict, path: list = []) -> list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the concept of a "JSON schema" in other components of BD, but the json_schema you are referring to here is a very different thing. Let's rename this (dict_schema?) so that we don't get our wires crossed in the future.

You refer to json_schema in other parts of this script so be sure to update those references as well.

curr_elem = new_elem
elif isinstance(curr_elem, list):
curr_elem.append(new_elem)
curr_elem = new_elem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening in these two conditions? It looks like you either add to a dict or list, but then you proceed to overwrite that dict or list with new_elem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, I think I understand what's happening. You're updating the pointer. Is there a way to make this clearer in the code?

dynamic_frame=table,
glue_database_name=table_name,
glue_table_name=workflow_run_properties["database"],
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be replacing L81-83 with the cast_glue_data_types logic. Otherwise we've already resolved the types (incorrectly) in get_table above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that was what I wasn't sure about it in the end, because I thought the get_table's resolveChoice wouldn't permanently cast it

aws configure set aws_session_token $AWS_SESSION_TOKEN
aws configure set region $AWS_REGION
```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also have the credentials configured as your default profile in the credentials file and mount your credentials file:

docker run ...
-v ~/.aws:/home/glue_user/.aws

probably it's enough to say "configure your AWS credentials" and let developers read AWS documentation if they don't know what that means.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants