Skip to content

Commit

Permalink
Add an example for building a flex template for a python pipeline wit…
Browse files Browse the repository at this point in the history
…h extra dependencies and a custom container. (#11252)

* Add an example for building a flex template for a python pipeline with dependencies and a custom container

* Support projects with a colon in conftest. Also make metadata.json optional when building templates

* Add a test.

* Apply suggestions from code review

Co-authored-by: Rebecca Szper <[email protected]>

* Clarify workaround gcloud missing functionality.

* Misc style edits in commented code.

* Apply suggestions from code review

* Simplify the example: reuse the Dockerfile for the Flex Template

* Show how to inspect docker image.

* Add Cloud Shell instructions

* fix test.

* Apply suggestions from code review

Co-authored-by: Rebecca Szper <[email protected]>

* Update dataflow/flex-templates/pipeline_with_dependencies/README.md

* Update dataflow/flex-templates/pipeline_with_dependencies/setup.py

* Add metadata.json

* Fix tests.

* Apply typing best practices

* lint

* Minor comment clarifications.

* Exclude the new example from renovate bot.

* Make exclusion a glob

---------

Co-authored-by: Rebecca Szper <[email protected]>
  • Loading branch information
tvalentyn and rszper authored Mar 21, 2024
1 parent 6b55606 commit be27f1a
Show file tree
Hide file tree
Showing 17 changed files with 961 additions and 9 deletions.
22 changes: 14 additions & 8 deletions dataflow/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def cloud_build_submit(
cmd = ["gcloud", "auth", "configure-docker"]
logging.info(f"{cmd}")
subprocess.check_call(cmd)
gcr_project = project.replace(':', '/')

if substitutions:
cmd_substitutions = [
Expand Down Expand Up @@ -561,13 +562,14 @@ def cloud_build_submit(
"builds",
"submit",
f"--project={project}",
f"--tag=gcr.io/{project}/{image_name}:{UUID}",
f"--tag=gcr.io/{gcr_project}/{image_name}:{UUID}",
*cmd_substitutions,
source,
]
logging.info(f"{cmd}")
subprocess.check_call(cmd)
logging.info(f"Created image: gcr.io/{project}/{image_name}:{UUID}")
logging.info(
f"Created image: gcr.io/{gcr_project}/{image_name}:{UUID}")
yield f"{image_name}:{UUID}"
else:
raise ValueError("must specify either `config` or `image_name`")
Expand All @@ -578,14 +580,15 @@ def cloud_build_submit(
"container",
"images",
"delete",
f"gcr.io/{project}/{image_name}:{UUID}",
f"gcr.io/{gcr_project}/{image_name}:{UUID}",
f"--project={project}",
"--force-delete-tags",
"--quiet",
]
logging.info(f"{cmd}")
subprocess.check_call(cmd)
logging.info(f"Deleted image: gcr.io/{project}/{image_name}:{UUID}")
logging.info(
f"Deleted image: gcr.io/{gcr_project}/{image_name}:{UUID}")

@staticmethod
def dataflow_job_url(
Expand Down Expand Up @@ -756,23 +759,26 @@ def dataflow_jobs_cancel(
def dataflow_flex_template_build(
bucket_name: str,
image_name: str,
metadata_file: str = "metadata.json",
metadata_file: str | None = "metadata.json",
template_file: str = "template.json",
project: str = PROJECT,
) -> str:
# https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/build
template_gcs_path = f"gs://{bucket_name}/{template_file}"
gcr_project = project.replace(':', '/')
cmd = [
"gcloud",
"dataflow",
"flex-template",
"build",
template_gcs_path,
f"--project={project}",
f"--image=gcr.io/{project}/{image_name}",
"--sdk-language=PYTHON",
f"--metadata-file={metadata_file}",
f"--image=gcr.io/{gcr_project}/{image_name}",
"--sdk-language=PYTHON"
]
if metadata_file:
cmd.append(f"--metadata-file={metadata_file}")

logging.info(f"{cmd}")
subprocess.check_call(cmd)

Expand Down
80 changes: 80 additions & 0 deletions dataflow/flex-templates/pipeline_with_dependencies/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# This Dockerfile defines a container image that will serve as both SDK container image
# (launch environment) and the base image for Dataflow Flex Template (launch environment).
#
# For more information, see:
# - https://cloud.google.com/dataflow/docs/reference/flex-templates-base-images
# - https://cloud.google.com/dataflow/docs/guides/using-custom-containers


# This Dockerfile illustrates how to use a custom base image when building
# a custom contaier images for Dataflow. A 'slim' base image is smaller in size,
# but does not include some preinstalled libraries, like google-cloud-debugger.
# To use a standard image, use apache/beam_python3.11_sdk:2.54.0 instead.
# Use consistent versions of Python interpreter in the project.
FROM python:3.11-slim

# Copy SDK entrypoint binary from Apache Beam image, which makes it possible to
# use the image as SDK container image. If you explicitly depend on
# apache-beam in setup.py, use the same version of Beam in both files.
COPY --from=apache/beam_python3.11_sdk:2.54.0 /opt/apache/beam /opt/apache/beam

# Copy Flex Template launcher binary from the launcher image, which makes it
# possible to use the image as a Flex Template base image.
COPY --from=gcr.io/dataflow-templates-base/python311-template-launcher-base:20230622_RC00 /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher

# Location to store the pipeline artifacts.
ARG WORKDIR=/template
WORKDIR ${WORKDIR}

COPY requirements.txt .
COPY setup.py .
COPY main.py .
COPY my_package my_package

# Installing exhaustive list of dependencies from a requirements.txt
# helps to ensure that every time Docker container image is built,
# the Python dependencies stay the same. Using `--no-cache-dir` reduces image size.
RUN pip install --no-cache-dir -r requirements.txt

# Installing the pipeline package makes all modules encompassing the pipeline
# available via import statements and installs necessary dependencies.
# Editable installation allows picking up later changes to the pipeline code
# for example during local experimentation within the container.
RUN pip install -e .

# For more informaiton, see: https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"

# To reduce pipeline submission time
# do not use FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE directive here
# to specify pipeline dependencies, because this image will be used
# as custom sdk container image and it already installs the dependencies
# from the requirements.txt.

# Optionally, verify that dependencies are not conflicting.
# A conflict may or may not be significant for your pipeline.
RUN pip check

# Optionally, list all installed dependencies.
# The output can be used to seed requirements.txt for reproducible builds.
RUN pip freeze

# Set the entrypoint to Apache Beam SDK launcher, which allows this image
# to be used as an SDK container image.
ENTRYPOINT ["/opt/apache/beam/boot"]
180 changes: 180 additions & 0 deletions dataflow/flex-templates/pipeline_with_dependencies/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Dataflow Flex Template: a pipeline with dependencies and a custom container image.

[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=dataflow/flex-templates/streaming_beam/README.md)

This project illustrates the following Dataflow Python pipeline setup:
- The pipeline is a package that consists of [multiple files](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#multiple-file-dependencies).
- The pipeline has at least one dependency that is not provided in the default Dataflow runtime environment.
- The workflow uses a [custom container image](https://cloud.google.com/dataflow/docs/guides/using-custom-containers) to preinstall dependencies and to define the pipeline runtime environment.
- The workflow uses a [Dataflow Flex Template](https://cloud.google.com/dataflow/docs/concepts/dataflow-templates) to control the pipeline submission environment.
- The runtime and submission environment use same set of Python dependencies and can be created in a reproducible manner.

To illustrate this setup, we use a pipeline that does the following:

1. Finds the longest word in an input file
2. Creates a [FIGLet text banner](https://en.wikipedia.org/wiki/FIGlet) from of it using [pyfiglet](https://pypi.org/project/pyfiglet/)
3. Outputs the text banner in another file


## The structure of the example

The pipeline package is comprised of the `my_package` directory and the `setup.py` file. The package defines the pipeline, the pipeline dependencies, and the input parameters. You can define multiple pipelines in the same package. The `my_package.launcher` module is used to submit the pipeline to a runner.

The `main.py` file provides a top-level entrypoint to trigger the pipeline launcher from a
launch environment.

The `Dockerfile` defines the runtime environment for the pipeline. It also configures the Flex Template, which lets you reuse the runtime image to build the Flex Template.

The `requirements.txt` file defines all Python packages in the dependency chain of the pipeline package. Use it to create reproducible Python environments in the Docker image.

The `metadata.json` file defines Flex Template parameters and their validation rules. It is optional.

## Before you begin

1. Follow the
[Dataflow setup instructions](../../README.md).

1. [Enable the Cloud Build API](https://console.cloud.google.com/flows/enableapi?apiid=cloudbuild.googleapis.com).

1. Clone the [`python-docs-samples` repository](https://github.com/GoogleCloudPlatform/python-docs-samples)
and navigate to the code sample.

```sh
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/dataflow/flex-templates/streaming_beam
```

## Create a Cloud Storage bucket

```sh
export PROJECT="project-id"
export BUCKET="your-bucket"
export REGION="us-central1"
gsutil mb -p $PROJECT gs://$BUCKET
```

## Create an Artifact Registry repository

```sh
export REPOSITORY="your-repository"
gcloud artifacts repositories create $REPOSITORY \
--repository-format=docker \
--location=$REGION

gcloud artifacts repositories create $REPOSITORY \
--repository-format=docker \
--location=$REGION \
--project $PROJECT

gcloud auth configure-docker $REGION-docker.pkg.dev
```

## Build a Docker image for the pipeline runtime environment

Using a [custom SDK container image](https://cloud.google.com/dataflow/docs/guides/using-custom-containers)
allows flexible customizations of the runtime environment.

This example uses the custom container image both to preinstall all of the pipeline dependencies before job submission and to create a reproducible runtime environment.

To illustrate customizations, a [custom base base image](https://cloud.google.com/dataflow/docs/guides/build-container-image#use_a_custom_base_image) is used to build the SDK container image.

The Flex Template launcher is included in the SDK container image, which makes it possible to [use the SDK container image to build a Flex Template](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#use_custom_container_images).

```sh
# Use a unique tag to version the artifacts that are built.
export TAG=`date +%Y%m%d-%H%M%S`
export SDK_CONTAINER_IMAGE="$REGION-docker.pkg.dev/$PROJECT/$REPOSITORY/my_base_image:$TAG"

gcloud builds submit . --tag $SDK_CONTAINER_IMAGE --project $PROJECT
```

## Optional: Inspect the Docker image

If you have a local installation of Docker, you can inspect the image and run the pipeline by using the Direct Runner:
```
docker run --rm -it --entrypoint=/bin/bash $SDK_CONTAINER_IMAGE
# Once the container is created, run:
pip list
python main.py --input requirements.txt --output=/tmp/output
cat /tmp/output*
```

## Build the Flex Template

Build the Flex Template [from the SDK container image](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#use_custom_container_images).
Using the runtime image as the Flex Template image reduces the number of Docker images that need to be maintained.
It also ensures that the pipeline uses the same dependencies at submission and at runtime.

```sh
export TEMPLATE_FILE=gs://$BUCKET/longest-word-$TAG.json
export TEMPLATE_IMAGE=$REGION-docker.pkg.dev/$PROJECT/$REPOSITORY/my_template_image:$TAG
```

```sh
gcloud dataflow flex-template build $TEMPLATE_FILE \
--image $SDK_CONTAINER_IMAGE \
--sdk-language "PYTHON" \
--metadata-file=metadata.json \
--project $PROJECT
```

## Run the template

```sh
gcloud dataflow flex-template run "flex-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location $TEMPLATE_FILE \
--region $REGION \
--staging-location "gs://$BUCKET/staging" \
--parameters input="gs://dataflow-samples/shakespeare/hamlet.txt" \
--parameters output="gs://$BUCKET/output" \
--parameters sdk_container_image=$SDK_CONTAINER_IMAGE \
--project $PROJECT
```

After the pipeline finishes, use the following command to inspect the output:
```
gsutil cat gs://$BUCKET/output*
```

## Optional: Update the dependencies in the requirements file and rebuild the Docker images

The top-level pipeline dependencies are defined in the `install_requires` section of the `setup.py` file.

The `requirements.txt` file pins all Python dependencies, that must be installed in the Docker container image, including the transitive dependencies. Listing all packages produces reproducible Python environments every time the image is built.
Version control the `requirements.txt` file together with the rest of pipeline code.

When the dependencies of your pipeline change or when you want to use the latest available versions of packages in the pipeline's dependency chain, regenerate the `requirements.txt` file:

```
python3.11 -m pip install pip-tools # Use a consistent minor version of Python throughout the project.
pip-compile ./setup.py
```

If you base your custom container image on the standard Apache Beam base image, to reduce the image size and to give preference to the versions already installed in the Apache Beam base image, use a constraints file:

```
wget https://raw.githubusercontent.com/apache/beam/release-2.54.0/sdks/python/container/py311/base_image_requirements.txt
pip-compile --constraint=base_image_requirements.txt ./setup.py
```

Alternatively, take the following steps:

1. Use an empty `requirements.txt` file.
1. Build the SDK container Docker image from the Docker file.
1. Collect the output of `pip freeze` at the last stage of the Docker build.
1. Seed the `requirements.txt` file with that content.

For more information, see the Apache Beam [reproducible environments](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#create-reproducible-environments) documentation.


## What's next?

For more information about building and running Flex Templates, see
📝 [Use Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates).

For more information about building and using custom containers, see
📝 [Use custom containers in Dataflow](https://cloud.google.com/dataflow/docs/guides/using-custom-containers).

To reduce Docker image build time, see:
📝 [Using Kaniko Cache](https://cloud.google.com/build/docs/optimize-builds/kaniko-cache).
Loading

0 comments on commit be27f1a

Please sign in to comment.