An HTTP POST to the airflow endpoint from an on-prem system is used as a trigger to initiate the workflow.
At a high level the Cloud Composer workflow performs the following steps:
- Extracts some metadata from the HTTP POST that triggered the workflow.
- Spins up a Dataproc Cluster.
- Submits a Spark job that performs the following:
- Reads newline delimited json data generated by an export from the nyc-tlc:yellow.trips public BigQuery table.
- Enhances the data with an average_speed column.
- Writes the enhanced data as in CSV format to a temporary location in Google Cloud storage.
- Tear down the Dataproc Cluster Load these files to BigQuery.
- Clean up the temporary path of enhanced data in GCS.
When there is an HTTP POST to the airflow endpoint it should contain a paylaod of the following structure.
payload = {
'run_id': 'post-triggered-run-%s' % datetime.now().strftime('%Y%m%d%H%M%s'),
'conf': "{'raw_path': raw_path, 'transformed_path': transformed_path}"
}
Where raw_path is a timestamped path to the existing raw files in gcs in newline delimited json format and transformed path is a path with matching time stamp to stage the enhanced file before loading to BigQuery.
The workflow then provisions a Dataproc Cluster and submits a spark job to enhance the data.
Based on the status of the Spark job, the workflow will then move the processed files to a Cloud Storage bucket setup to store processed data. A separate folder is created along with a processed date field to hold the files in this bucket.
Ready to dive deeper? Check out the complete code
It is recommended that virtualenv be used to keep everything tidy. The requirements.txt describes the dependencies needed for the code used in this repo.
virtualenv composer-env
source composer-env/bin/activate
The POST will need to be authenticated with Identity Aware Proxy. We reccomend doing this by copying the latest version of make_iap_request.py from the Google Cloud python-docs-samples repo and using the provided dag_trigger.py.
pip install -r ~/professional-services/examples/cloud-composer-examples/requirements.txt
wget https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/requirements.txt -O ~/professional-services/examples/cloud-composer-examples/iap_requirements.txt
pip install -r iap_requirements.txt
wget https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/make_iap_request.py -O ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/make_iap_request.py
(Or if your are on a Mac you can use curl.)
# From the cloud-composer-examples directory
pip install -r ~/professional-services/examples/cloud-composer-examples/requirements.txt
curl https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/requirements.txt >> ~/professional-services/examples/cloud-composer-examples/iap_requirements.txt
pip install -r iap-requirements.txt
curl https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/make_iap_request.py >> ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/make_iap_request.py
Note that we skipped install pyspark for the purposes of being lighter weight to stand up this example. If you have the need to test pyspark locally you should additionally run:
pip install pyspark>=2.3.1
The following high-level steps describe the setup needed to run this example: 0. set your project information.
export PROJECT=<REPLACE-THIS-WITH-YOUR-PROJECT-ID>
gcloud config set project $PROJECT
- Create a Cloud Storage (GCS) bucket for receiving input files (input-gcs-bucket).
gsutil mb -c regional -l us-central1 gs://$PROJECT
- Export the public BigQuery Table to a new dataset.
bq mk ComposerDemo
export EXPORT_TS=`date "+%Y-%m-%dT%H%M%S"`&& bq extract \
--destination_format=NEWLINE_DELIMITED_JSON \
nyc-tlc:yellow.trips \
gs://$PROJECT/cloud-composer-lab/raw-$EXPORT_TS/nyc-tlc-yellow-*.json
- Create a Cloud Composer environment - Follow these steps to create a Cloud Composer environment if needed (cloud-composer-env). We will set these variables in the composer environment.
Key | Value | Example |
---|---|---|
gcp_project | your-gcp-project-id | cloud-comp-http-demo |
gcp_bucket | gcs-bucket-with-raw-files | cloud-comp-http-demo |
gce_zone | compute-engine-zone | us-central1-b |
gcloud beta composer environments create demo-ephemeral-dataproc \
--location us-central1 \
--zone us-central1-b \
--machine-type n1-standard-2 \
--disk-size 20
# Set Airflow Variables in the Composer Environment we just created.
gcloud composer environments run \
demo-ephemeral-dataproc \
--location=us-central1 variables -- \
--set gcp_project $PROJECT
gcloud composer environments run demo-ephemeral-dataproc \
--location=us-central1 variables -- \
--set gce_zone us-central1-b
gcloud composer environments run demo-ephemeral-dataproc \
--location=us-central1 variables -- \
--set gcs_bucket $PROJECT
gcloud composer environments run demo-ephemeral-dataproc \
--location=us-central1 variables -- \
--set bq_output_table $PROJECT:ComposerDemo.nyc-tlc-yellow-trips
-
Browse to the Cloud Composer widget in Cloud Console and click on the DAG folder icon as shown below:
-
Upload the PySpark code spark_avg_speed.py into a spark-jobs folder in GCS.
gsutil cp ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/spark_avg_speed.py gs://$PROJECT/spark-jobs/
- The DAG folder is essentially a Cloud Storage bucket. Upload the ephemeral_dataproc_spark_dag.py file into the folder:
gsutil cp ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/ephemeral_dataproc_spark_dag.py gs://<dag-folder>/dags
Make sure that you have installed the iap_requirements.txt
in the steps above.
You will need to create a service account with the necessary permissions to create an IAP request and use Cloud Composer to use the dag_trigger.py
script. This can be achieved with the following commands:
gcloud iam service-accounts create dag-trigger
# Give service account permissions to create tokens for
# iap requests.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountActor
# Service account also needs to be authorized to use Composer.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/composer.user
# We need a service account key to trigger the dag.
gcloud iam service-accounts keys create ~/$PROJECT-dag-trigger-key.json \
--iam-account=dag-trigger@$PROJECT.iam.gserviceaccount.com
# Finally use this as your application credentials by setting the environment variable on the machine you will run `dag_trigger.py`
export GOOGLE_APPLICATION_CREDENTIALS=~/$PROJECT-dag-trigger-key.json
To trigger this workflow use dag_trigger.py takes 3 arguments as shown below
python dag_trigger.py \
--url=<airflow endpoint url> \
--iapClientId=<client id> \
--raw_path=<path to raw files for enhancement in GCS>
The endpoint of triggering the dag had the following structure https://<airflow web server url>/api/experimental/dags/<dag-id>/dag_runs
in this case our dag-id is average-speed.
The airflow webserver can be found once your composer environment is set up by clicking on your environment in the console and checking here:
In oder to obtain your --iapClientId
Visit the Airflow URL https://YOUR_UNIQUE_ID.appspot.com (which you noted in the last step) in an incognito window, don't authenticate or login, and first landing page for IAP Auth has client Id in the url in the address bar:
https://accounts.google.com/signin/oauth/identifier?**client_id=00000000000-xxxx0x0xx0xx00xxxx0x00xxx0xxxxx.apps.googleusercontent.com**&as=a6VGEPwFpCL1qIwusi49IQ&destination=https%3A%2F%2Fh0b798498b93687a6-tp.appspot.com&approval_state=!ChRKSmd1TVc1VlQzMDB3MHI2UGI4SxIfWXhaRjJLcWdwcndRVUU3MWpGWk5XazFEbUp6N05SWQ%E2%88%99AB8iHBUAAAAAWvsaqTGCmRazWx9NqQtnYVOllz0r2x_i&xsrfsig=AHgIfE_o0kxXt6N3ch1JH4Fb19CB7wdbMg&flowName=GeneralOAuthFlow