-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #240 from cityofaustin/vz-data-model-collector-branch
DAG updates for new VZ data model
- Loading branch information
Showing
4 changed files
with
163 additions
and
208 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,101 @@ | ||
"""Process CRIS extract zips—including CSVs and PDFs. | ||
The target DB and S3 bucket subdirectory are controlled by the Airflow `ENVIRONMENT` | ||
env var, which determines which 1pass secrets to apply to the docker runtime env. | ||
Check the 1Pass entry to understand exactly what will happen when you trigger | ||
this DAG in a given context, but the expected behavior is that you may set the | ||
Airflow `ENVIRONMENT` to `production`, `staging`, or `dev`, with the following | ||
results: | ||
- production: use <bucket-name>/prod/inbox and production hasura cluster | ||
- staging: use <bucket-name>/staging/inbox and staging hasura cluster | ||
- dev: use <bucket-name>/dev/inbox and localhost hasura cluster | ||
""" | ||
import os | ||
from pendulum import datetime | ||
from pendulum import datetime, duration | ||
|
||
from airflow.decorators import dag | ||
from airflow.models import DAG | ||
from airflow.operators.docker_operator import DockerOperator | ||
|
||
from utils.onepassword import get_env_vars_task | ||
from utils.slack_operator import task_fail_slack_alert | ||
|
||
|
||
@dag( | ||
dag_id="vz-cris-import", | ||
description="A process which will import the VZ CRIS data into the database on a daily basis using data on the SFTP endpoint", | ||
schedule="0 7 * * *", | ||
start_date=datetime(2023, 1, 1, tz="America/Chicago"), | ||
DEPLOYMENT_ENVIRONMENT = os.getenv("ENVIRONMENT") | ||
secrets_env_prefix = None | ||
|
||
if DEPLOYMENT_ENVIRONMENT == "production": | ||
secrets_env_prefix = "prod" | ||
elif DEPLOYMENT_ENVIRONMENT == "staging": | ||
secrets_env_prefix = "staging" | ||
else: | ||
secrets_env_prefix = "dev" | ||
|
||
|
||
REQUIRED_SECRETS = { | ||
"ENV": { | ||
"opitem": "Vision Zero CRIS Import - v2", | ||
"opfield": f"{secrets_env_prefix}.env", | ||
}, | ||
"AWS_ACCESS_KEY_ID": { | ||
"opitem": "Vision Zero CRIS Import - v2", | ||
"opfield": f"common.AWS_ACCESS_KEY_ID", | ||
}, | ||
"AWS_SECRET_ACCESS_KEY": { | ||
"opitem": "Vision Zero CRIS Import - v2", | ||
"opfield": f"common.AWS_SECRET_ACCESS_KEY", | ||
}, | ||
"BUCKET_NAME": { | ||
"opitem": "Vision Zero CRIS Import - v2", | ||
"opfield": f"common.BUCKET_NAME", | ||
}, | ||
"EXTRACT_PASSWORD": { | ||
"opitem": "Vision Zero CRIS Import - v2", | ||
"opfield": f"common.EXTRACT_PASSWORD", | ||
}, | ||
"HASURA_GRAPHQL_ENDPOINT": { | ||
"opitem": "Vision Zero CRIS Import - v2", | ||
"opfield": f"{secrets_env_prefix}.HASURA_GRAPHQL_ENDPOINT", | ||
}, | ||
"HASURA_GRAPHQL_ADMIN_SECRET": { | ||
"opitem": "Vision Zero CRIS Import - v2", | ||
"opfield": f"{secrets_env_prefix}.HASURA_GRAPHQL_ADMIN_SECRET", | ||
}, | ||
} | ||
|
||
docker_image = f"atddocker/vz-cris-import:{'production' if DEPLOYMENT_ENVIRONMENT == 'production' else 'latest'}" | ||
|
||
|
||
DEFAULT_ARGS = { | ||
"depends_on_past": False, | ||
"email_on_failure": False, | ||
"email_on_retry": False, | ||
"retries": 0, | ||
"execution_timeout": duration(minutes=60), | ||
"on_failure_callback": task_fail_slack_alert, | ||
} | ||
|
||
|
||
with DAG( | ||
catchup=False, | ||
dag_id="vz-cris-import", | ||
description="Import TxDOT CRIS CSVs and PDFs into the Vision Zero database", | ||
default_args=DEFAULT_ARGS, | ||
schedule="0 5 * * *" if DEPLOYMENT_ENVIRONMENT == "prod" else None, | ||
start_date=datetime(2024, 8, 1, tz="America/Chicago"), | ||
tags=["vision-zero", "cris", "repo:atd-vz-data"], | ||
on_failure_callback=task_fail_slack_alert, | ||
) | ||
def cris_import(): | ||
) as dag: | ||
env_vars = get_env_vars_task(REQUIRED_SECRETS) | ||
|
||
DockerOperator( | ||
cris_import = DockerOperator( | ||
task_id="run_cris_import", | ||
environment=dict(os.environ), | ||
docker_conn_id="docker_default", | ||
image="atddocker/vz-cris-import:production", | ||
image=docker_image, | ||
command=f"./cris_import.py --csv --pdf --s3-download --s3-upload --s3-archive --workers 2", | ||
environment=env_vars, | ||
auto_remove=True, | ||
tty=True, | ||
force_pull=True, | ||
) | ||
|
||
|
||
cris_import() | ||
cris_import |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.