-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add variable to disable removing sql source files during ingestion #4216
Add variable to disable removing sql source files during ingestion #4216
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great start @madewithkode! You asked about testing, you should actually be able to test this by enabling the inaturalist_workflow
locally and letting it run! It will have to download a large resource as part of the execution, so just noting that it may take some time. If everything is set up appropriately (e.g. the Variable is correctly set to false
), then the last two steps of the postingestion_tasks
workflow should skip and subsequent runs of the DAG locally should not need to redownload the dataset which was downloaded on the first run!
This is also a note for maintainers: there does not seem to be a way to default a Variable retrieval from within a template, and so the task will fail if the Variable is not defined. We will need to add SQL_RM_SOURCE_DATA_AFTER_INGESTION
to the production Variables with the default of true
, I'll go ahead and do that now since it should have no affect on anything else.
python_callable=(lambda x: x), | ||
op_args=[ | ||
"{{ params.sql_rm_source_data_after_ingesting }}", | ||
"{{ var.json.AIRFLOW_VAR_SQL_RM_SOURCE_DATA_AFTER_INGESTION}}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One spacing nit, along with a fix to the variable name (AIRFLOW_VAR
is only prepended for the environment variable, it gets removed when used within Airflow).
"{{ var.json.AIRFLOW_VAR_SQL_RM_SOURCE_DATA_AFTER_INGESTION}}", | |
"{{ var.json.SQL_RM_SOURCE_DATA_AFTER_INGESTION }}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should actually be able to test this by enabling the inaturalist_workflow locally and letting it run!
Hi @AetherUnbound, Is it possible to outline actual steps to do this? Or maybe point to me to a section of the doc(if any). In the meantime, I have updated the PR with the suggested changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, apologies! You should be able to take the following steps:
- Run
just c
to start the catalog stack - Navigate to http://localhost:9090
- Use
airflow
/airflow
as the username and password respectively to log in - Scroll down to the
inaturalist_workflow
and click on it - Click the "unpause DAG" button in the top right, a run will get kicked off
For more info about how Airflow works in general, check out their documentation on the UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much, this makes more sense.
"{{ params.sql_rm_source_data_after_ingesting }}", | ||
"{{ var.json.AIRFLOW_VAR_SQL_RM_SOURCE_DATA_AFTER_INGESTION}}", | ||
], | ||
python_callable=(lambda x, y: False if not x or not y else True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be simplified:
python_callable=(lambda x, y: False if not x or not y else True), | |
python_callable=(lambda *x: any(x)), |
Oh, it looks like we'll also want to add that condition here too:
Apologies for missing that! Using the Variable there as well will prevent the dataset from being redownloaded every time. |
97a5855
to
477cdc6
Compare
"remove_api_files": any( | ||
"{{ params.sql_rm_source_data_after_ingesting }}", | ||
"{{ var.json.SQL_RM_SOURCE_DATA_AFTER_INGESTION}}", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can call any
here because (for complicated Airflow reasons) the any
gets evaluated when the DAG is parsed, but not when it's run. When the DAG is parsed, the values passed into any
will be the strings we see here. They're only converted into the actual values when the DAG is actually run. I think this should work:
"remove_api_files": any( | |
"{{ params.sql_rm_source_data_after_ingesting }}", | |
"{{ var.json.SQL_RM_SOURCE_DATA_AFTER_INGESTION}}", | |
) | |
"remove_api_files": "{{ params.sql_rm_source_data_after_ingesting and var.json.SQL_RM_SOURCE_DATA_AFTER_INGESTION }}", |
477cdc6
to
cef7e6b
Compare
@AetherUnbound all suggested code changes have been made. |
@madewithkode thanks for making those changes! I was able to test this locally, and opted to make two other changes:
This has the effect making it so that either the parameter or the Variable being set will remove the files, but by default locally, they won't be removed. In production, since we have the Variable set to I was able to verify this behavior by:
All that to say, this looks good to me and can be taken out of draft when you're ready! |
@AetherUnbound All these makes sense. Proceeding to unmark the PR as draft now. Thank you for the extra effort! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I ran into some issues locally with the iNaturalist pull and load TaskGroups, but I confirmed that I see this on main as well so it's probably something in my local. I was able to just manually pass those steps and only test the changes made here. Works great for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hope it's alright to approve this one too 😁 Thanks again @madewithkode!
Fixes
Fixes #3847 by @AetherUnbound
Description
Added an Airflow Variable
AIRFLOW_VAR_SQL_RM_SOURCE_DATA_AFTER_INGESTION
which would be used to control the removal or retention of source files used for ingestion.Testing Instructions
Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin