From be27f1a6ae2ecdda51ab736ce827e5366dce0dbf Mon Sep 17 00:00:00 2001 From: tvalentyn Date: Thu, 21 Mar 2024 14:26:54 -0700 Subject: [PATCH] Add an example for building a flex template for a python pipeline with extra dependencies and a custom container. (#11252) * Add an example for building a flex template for a python pipeline with dependencies and a custom container * Support projects with a colon in conftest. Also make metadata.json optional when building templates * Add a test. * Apply suggestions from code review Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> * Clarify workaround gcloud missing functionality. * Misc style edits in commented code. * Apply suggestions from code review * Simplify the example: reuse the Dockerfile for the Flex Template * Show how to inspect docker image. * Add Cloud Shell instructions * fix test. * Apply suggestions from code review Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> * Update dataflow/flex-templates/pipeline_with_dependencies/README.md * Update dataflow/flex-templates/pipeline_with_dependencies/setup.py * Add metadata.json * Fix tests. * Apply typing best practices * lint * Minor comment clarifications. * Exclude the new example from renovate bot. * Make exclusion a glob --------- Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> --- dataflow/conftest.py | 22 +- .../pipeline_with_dependencies/Dockerfile | 80 +++++ .../pipeline_with_dependencies/README.md | 180 ++++++++++ .../pipeline_with_dependencies/e2e_test.py | 70 ++++ .../pipeline_with_dependencies/main.py | 35 ++ .../pipeline_with_dependencies/metadata.json | 23 ++ .../my_package/__init__.py | 15 + .../my_package/launcher.py | 35 ++ .../my_package/my_pipeline.py | 39 +++ .../my_package/my_transforms.py | 37 ++ .../my_package/utils/__init__.py | 15 + .../my_package/utils/figlet.py | 25 ++ .../noxfile_config.py | 23 ++ .../requirements-test.txt | 21 ++ .../requirements.txt | 320 ++++++++++++++++++ .../pipeline_with_dependencies/setup.py | 27 ++ renovate.json | 3 +- 17 files changed, 961 insertions(+), 9 deletions(-) create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/Dockerfile create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/README.md create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/e2e_test.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/main.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/metadata.json create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/my_package/__init__.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/my_package/launcher.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/my_package/my_pipeline.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/my_package/my_transforms.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/__init__.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/figlet.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/noxfile_config.py create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/requirements-test.txt create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/requirements.txt create mode 100644 dataflow/flex-templates/pipeline_with_dependencies/setup.py diff --git a/dataflow/conftest.py b/dataflow/conftest.py index 911b74dbfb28..edfd386daf1f 100644 --- a/dataflow/conftest.py +++ b/dataflow/conftest.py @@ -527,6 +527,7 @@ def cloud_build_submit( cmd = ["gcloud", "auth", "configure-docker"] logging.info(f"{cmd}") subprocess.check_call(cmd) + gcr_project = project.replace(':', '/') if substitutions: cmd_substitutions = [ @@ -561,13 +562,14 @@ def cloud_build_submit( "builds", "submit", f"--project={project}", - f"--tag=gcr.io/{project}/{image_name}:{UUID}", + f"--tag=gcr.io/{gcr_project}/{image_name}:{UUID}", *cmd_substitutions, source, ] logging.info(f"{cmd}") subprocess.check_call(cmd) - logging.info(f"Created image: gcr.io/{project}/{image_name}:{UUID}") + logging.info( + f"Created image: gcr.io/{gcr_project}/{image_name}:{UUID}") yield f"{image_name}:{UUID}" else: raise ValueError("must specify either `config` or `image_name`") @@ -578,14 +580,15 @@ def cloud_build_submit( "container", "images", "delete", - f"gcr.io/{project}/{image_name}:{UUID}", + f"gcr.io/{gcr_project}/{image_name}:{UUID}", f"--project={project}", "--force-delete-tags", "--quiet", ] logging.info(f"{cmd}") subprocess.check_call(cmd) - logging.info(f"Deleted image: gcr.io/{project}/{image_name}:{UUID}") + logging.info( + f"Deleted image: gcr.io/{gcr_project}/{image_name}:{UUID}") @staticmethod def dataflow_job_url( @@ -756,12 +759,13 @@ def dataflow_jobs_cancel( def dataflow_flex_template_build( bucket_name: str, image_name: str, - metadata_file: str = "metadata.json", + metadata_file: str | None = "metadata.json", template_file: str = "template.json", project: str = PROJECT, ) -> str: # https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/build template_gcs_path = f"gs://{bucket_name}/{template_file}" + gcr_project = project.replace(':', '/') cmd = [ "gcloud", "dataflow", @@ -769,10 +773,12 @@ def dataflow_flex_template_build( "build", template_gcs_path, f"--project={project}", - f"--image=gcr.io/{project}/{image_name}", - "--sdk-language=PYTHON", - f"--metadata-file={metadata_file}", + f"--image=gcr.io/{gcr_project}/{image_name}", + "--sdk-language=PYTHON" ] + if metadata_file: + cmd.append(f"--metadata-file={metadata_file}") + logging.info(f"{cmd}") subprocess.check_call(cmd) diff --git a/dataflow/flex-templates/pipeline_with_dependencies/Dockerfile b/dataflow/flex-templates/pipeline_with_dependencies/Dockerfile new file mode 100644 index 000000000000..8232175732eb --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/Dockerfile @@ -0,0 +1,80 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This Dockerfile defines a container image that will serve as both SDK container image +# (launch environment) and the base image for Dataflow Flex Template (launch environment). +# +# For more information, see: +# - https://cloud.google.com/dataflow/docs/reference/flex-templates-base-images +# - https://cloud.google.com/dataflow/docs/guides/using-custom-containers + + +# This Dockerfile illustrates how to use a custom base image when building +# a custom contaier images for Dataflow. A 'slim' base image is smaller in size, +# but does not include some preinstalled libraries, like google-cloud-debugger. +# To use a standard image, use apache/beam_python3.11_sdk:2.54.0 instead. +# Use consistent versions of Python interpreter in the project. +FROM python:3.11-slim + +# Copy SDK entrypoint binary from Apache Beam image, which makes it possible to +# use the image as SDK container image. If you explicitly depend on +# apache-beam in setup.py, use the same version of Beam in both files. +COPY --from=apache/beam_python3.11_sdk:2.54.0 /opt/apache/beam /opt/apache/beam + +# Copy Flex Template launcher binary from the launcher image, which makes it +# possible to use the image as a Flex Template base image. +COPY --from=gcr.io/dataflow-templates-base/python311-template-launcher-base:20230622_RC00 /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher + +# Location to store the pipeline artifacts. +ARG WORKDIR=/template +WORKDIR ${WORKDIR} + +COPY requirements.txt . +COPY setup.py . +COPY main.py . +COPY my_package my_package + +# Installing exhaustive list of dependencies from a requirements.txt +# helps to ensure that every time Docker container image is built, +# the Python dependencies stay the same. Using `--no-cache-dir` reduces image size. +RUN pip install --no-cache-dir -r requirements.txt + +# Installing the pipeline package makes all modules encompassing the pipeline +# available via import statements and installs necessary dependencies. +# Editable installation allows picking up later changes to the pipeline code +# for example during local experimentation within the container. +RUN pip install -e . + +# For more informaiton, see: https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates +ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py" +ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py" + +# To reduce pipeline submission time +# do not use FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE directive here +# to specify pipeline dependencies, because this image will be used +# as custom sdk container image and it already installs the dependencies +# from the requirements.txt. + +# Optionally, verify that dependencies are not conflicting. +# A conflict may or may not be significant for your pipeline. +RUN pip check + +# Optionally, list all installed dependencies. +# The output can be used to seed requirements.txt for reproducible builds. +RUN pip freeze + +# Set the entrypoint to Apache Beam SDK launcher, which allows this image +# to be used as an SDK container image. +ENTRYPOINT ["/opt/apache/beam/boot"] diff --git a/dataflow/flex-templates/pipeline_with_dependencies/README.md b/dataflow/flex-templates/pipeline_with_dependencies/README.md new file mode 100644 index 000000000000..d0265b531fd9 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/README.md @@ -0,0 +1,180 @@ +# Dataflow Flex Template: a pipeline with dependencies and a custom container image. + +[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=dataflow/flex-templates/streaming_beam/README.md) + +This project illustrates the following Dataflow Python pipeline setup: +- The pipeline is a package that consists of [multiple files](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#multiple-file-dependencies). +- The pipeline has at least one dependency that is not provided in the default Dataflow runtime environment. +- The workflow uses a [custom container image](https://cloud.google.com/dataflow/docs/guides/using-custom-containers) to preinstall dependencies and to define the pipeline runtime environment. +- The workflow uses a [Dataflow Flex Template](https://cloud.google.com/dataflow/docs/concepts/dataflow-templates) to control the pipeline submission environment. +- The runtime and submission environment use same set of Python dependencies and can be created in a reproducible manner. + +To illustrate this setup, we use a pipeline that does the following: + +1. Finds the longest word in an input file +2. Creates a [FIGLet text banner](https://en.wikipedia.org/wiki/FIGlet) from of it using [pyfiglet](https://pypi.org/project/pyfiglet/) +3. Outputs the text banner in another file + + +## The structure of the example + +The pipeline package is comprised of the `my_package` directory and the `setup.py` file. The package defines the pipeline, the pipeline dependencies, and the input parameters. You can define multiple pipelines in the same package. The `my_package.launcher` module is used to submit the pipeline to a runner. + +The `main.py` file provides a top-level entrypoint to trigger the pipeline launcher from a +launch environment. + +The `Dockerfile` defines the runtime environment for the pipeline. It also configures the Flex Template, which lets you reuse the runtime image to build the Flex Template. + +The `requirements.txt` file defines all Python packages in the dependency chain of the pipeline package. Use it to create reproducible Python environments in the Docker image. + +The `metadata.json` file defines Flex Template parameters and their validation rules. It is optional. + +## Before you begin + +1. Follow the + [Dataflow setup instructions](../../README.md). + +1. [Enable the Cloud Build API](https://console.cloud.google.com/flows/enableapi?apiid=cloudbuild.googleapis.com). + +1. Clone the [`python-docs-samples` repository](https://github.com/GoogleCloudPlatform/python-docs-samples) +and navigate to the code sample. + + ```sh + git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git + cd python-docs-samples/dataflow/flex-templates/streaming_beam + ``` + +## Create a Cloud Storage bucket + +```sh +export PROJECT="project-id" +export BUCKET="your-bucket" +export REGION="us-central1" +gsutil mb -p $PROJECT gs://$BUCKET +``` + +## Create an Artifact Registry repository + +```sh +export REPOSITORY="your-repository" +gcloud artifacts repositories create $REPOSITORY \ + --repository-format=docker \ + --location=$REGION + +gcloud artifacts repositories create $REPOSITORY \ + --repository-format=docker \ + --location=$REGION \ + --project $PROJECT + +gcloud auth configure-docker $REGION-docker.pkg.dev +``` + +## Build a Docker image for the pipeline runtime environment + +Using a [custom SDK container image](https://cloud.google.com/dataflow/docs/guides/using-custom-containers) +allows flexible customizations of the runtime environment. + +This example uses the custom container image both to preinstall all of the pipeline dependencies before job submission and to create a reproducible runtime environment. + +To illustrate customizations, a [custom base base image](https://cloud.google.com/dataflow/docs/guides/build-container-image#use_a_custom_base_image) is used to build the SDK container image. + +The Flex Template launcher is included in the SDK container image, which makes it possible to [use the SDK container image to build a Flex Template](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#use_custom_container_images). + +```sh +# Use a unique tag to version the artifacts that are built. +export TAG=`date +%Y%m%d-%H%M%S` +export SDK_CONTAINER_IMAGE="$REGION-docker.pkg.dev/$PROJECT/$REPOSITORY/my_base_image:$TAG" + +gcloud builds submit . --tag $SDK_CONTAINER_IMAGE --project $PROJECT +``` + +## Optional: Inspect the Docker image + +If you have a local installation of Docker, you can inspect the image and run the pipeline by using the Direct Runner: +``` +docker run --rm -it --entrypoint=/bin/bash $SDK_CONTAINER_IMAGE + +# Once the container is created, run: +pip list +python main.py --input requirements.txt --output=/tmp/output +cat /tmp/output* +``` + +## Build the Flex Template + +Build the Flex Template [from the SDK container image](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#use_custom_container_images). +Using the runtime image as the Flex Template image reduces the number of Docker images that need to be maintained. +It also ensures that the pipeline uses the same dependencies at submission and at runtime. + +```sh +export TEMPLATE_FILE=gs://$BUCKET/longest-word-$TAG.json +export TEMPLATE_IMAGE=$REGION-docker.pkg.dev/$PROJECT/$REPOSITORY/my_template_image:$TAG +``` + +```sh +gcloud dataflow flex-template build $TEMPLATE_FILE \ + --image $SDK_CONTAINER_IMAGE \ + --sdk-language "PYTHON" \ + --metadata-file=metadata.json \ + --project $PROJECT +``` + +## Run the template + +```sh +gcloud dataflow flex-template run "flex-`date +%Y%m%d-%H%M%S`" \ + --template-file-gcs-location $TEMPLATE_FILE \ + --region $REGION \ + --staging-location "gs://$BUCKET/staging" \ + --parameters input="gs://dataflow-samples/shakespeare/hamlet.txt" \ + --parameters output="gs://$BUCKET/output" \ + --parameters sdk_container_image=$SDK_CONTAINER_IMAGE \ + --project $PROJECT +``` + +After the pipeline finishes, use the following command to inspect the output: +``` +gsutil cat gs://$BUCKET/output* +``` + +## Optional: Update the dependencies in the requirements file and rebuild the Docker images + +The top-level pipeline dependencies are defined in the `install_requires` section of the `setup.py` file. + +The `requirements.txt` file pins all Python dependencies, that must be installed in the Docker container image, including the transitive dependencies. Listing all packages produces reproducible Python environments every time the image is built. +Version control the `requirements.txt` file together with the rest of pipeline code. + +When the dependencies of your pipeline change or when you want to use the latest available versions of packages in the pipeline's dependency chain, regenerate the `requirements.txt` file: + +``` + python3.11 -m pip install pip-tools # Use a consistent minor version of Python throughout the project. + pip-compile ./setup.py +``` + +If you base your custom container image on the standard Apache Beam base image, to reduce the image size and to give preference to the versions already installed in the Apache Beam base image, use a constraints file: + +``` + wget https://raw.githubusercontent.com/apache/beam/release-2.54.0/sdks/python/container/py311/base_image_requirements.txt + pip-compile --constraint=base_image_requirements.txt ./setup.py +``` + +Alternatively, take the following steps: + +1. Use an empty `requirements.txt` file. +1. Build the SDK container Docker image from the Docker file. +1. Collect the output of `pip freeze` at the last stage of the Docker build. +1. Seed the `requirements.txt` file with that content. + +For more information, see the Apache Beam [reproducible environments](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#create-reproducible-environments) documentation. + + +## What's next? + +For more information about building and running Flex Templates, see +📝 [Use Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates). + +For more information about building and using custom containers, see +📝 [Use custom containers in Dataflow](https://cloud.google.com/dataflow/docs/guides/using-custom-containers). + +To reduce Docker image build time, see: +📝 [Using Kaniko Cache](https://cloud.google.com/build/docs/optimize-builds/kaniko-cache). \ No newline at end of file diff --git a/dataflow/flex-templates/pipeline_with_dependencies/e2e_test.py b/dataflow/flex-templates/pipeline_with_dependencies/e2e_test.py new file mode 100644 index 000000000000..d06ffafa02b0 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/e2e_test.py @@ -0,0 +1,70 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +# This is a test that exercises the example. It is not a part of the example. + +try: + # `conftest` cannot be imported when running in `nox`, but we still + # try to import it for the autocomplete when writing the tests. + from conftest import Utils +except ModuleNotFoundError: + Utils = None +import pytest + +NAME = "dataflow/flex-templates/pipeline-with-dependencies" +SDK_IMAGE_NAME = NAME + "/sdk_container_image" +TEMPLATE_IMAGE_NAME = NAME + "/template_image" + + +@pytest.fixture(scope="session") +def bucket_name(utils: Utils) -> str: + yield from utils.storage_bucket(NAME) + + +def _include_repo(utils: Utils, image: str) -> str: + project = utils.project + gcr_project = project.replace(':', '/') + return f"gcr.io/{gcr_project}/{image}" + + +@pytest.fixture(scope="session") +def sdk_container_image(utils: Utils) -> str: + yield from utils.cloud_build_submit(SDK_IMAGE_NAME) + + +@pytest.fixture(scope="session") +def flex_template_path(utils: Utils, bucket_name: str, sdk_container_image: str) -> str: + yield from utils.dataflow_flex_template_build( + bucket_name, sdk_container_image) + + +@pytest.fixture(scope="session") +def dataflow_job_id( + utils: Utils, + bucket_name: str, + flex_template_path: str, + sdk_container_image: str +) -> str: + yield from utils.dataflow_flex_template_run( + job_name=NAME, + template_path=flex_template_path, + bucket_name=bucket_name, + parameters={ + "input": "gs://dataflow-samples/shakespeare/hamlet.txt", + "output": f"gs://{bucket_name}/output", + "sdk_container_image": _include_repo(utils, sdk_container_image), + }, + ) + + +def test_flex_template_with_dependencies_and_custom_container(utils: Utils, dataflow_job_id: str) -> None: + utils.dataflow_jobs_wait(dataflow_job_id, target_states={"JOB_STATE_DONE"}) diff --git a/dataflow/flex-templates/pipeline_with_dependencies/main.py b/dataflow/flex-templates/pipeline_with_dependencies/main.py new file mode 100644 index 000000000000..c06cc4d14f20 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/main.py @@ -0,0 +1,35 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Top-level entry point that launches the pipeline. + +In this example, the Python pipeline is defined in a package, consisting of +several modules. This file provides the entrypoint that launches the +workflow defined in the package. + +This entrypoint will be called when the Flex Template starts. + +The my_package package should be installed in the Flex Template image, and +in the runtime environment. The latter could be accomplished with the +--setup_file pipeline option or by supplying a custom container image. +""" + +import logging + +from my_package import launcher + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + launcher.run() diff --git a/dataflow/flex-templates/pipeline_with_dependencies/metadata.json b/dataflow/flex-templates/pipeline_with_dependencies/metadata.json new file mode 100644 index 000000000000..f94e958e3181 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/metadata.json @@ -0,0 +1,23 @@ +{ + "_comment": "This file allows you to optionally add additional metadata for the Flex Template, its parameters and their validation rules.", + "name": "Longest Word Finder.", + "description": "A Flex Template that finds the longest word in the input, and makes a FIGlet-style banner out of it.", + "parameters": [ + { + "name": "input", + "label": "Input path", + "helpText": "The path and filename prefix for input files. Example: gs://dataflow-samples/shakespeare/kinglear.txt", + "regexes": [ + "^gs:\\/\\/[^\\n\\r]+$" + ] + }, + { + "name": "output", + "label": "Output destination", + "helpText": "The path and filename prefix for writing the output. Example: gs://your-bucket/longest-word", + "regexes": [ + "^gs:\\/\\/[^\\n\\r]+$" + ] + } + ] +} diff --git a/dataflow/flex-templates/pipeline_with_dependencies/my_package/__init__.py b/dataflow/flex-templates/pipeline_with_dependencies/my_package/__init__.py new file mode 100644 index 000000000000..8844a4ced72f --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/my_package/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A package that has a Beam pipeline that is split across multiple modules.""" diff --git a/dataflow/flex-templates/pipeline_with_dependencies/my_package/launcher.py b/dataflow/flex-templates/pipeline_with_dependencies/my_package/launcher.py new file mode 100644 index 000000000000..2f3ce5dc1994 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/my_package/launcher.py @@ -0,0 +1,35 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines command line arguments for the pipeline defined in the package.""" + +import argparse + +from my_package import my_pipeline + + +def run(argv: list[str] | None = None): + """Parses the parameters provided on the command line and runs the pipeline. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', required=True, help='Input file(s) to process') + parser.add_argument('--output', required=True, help='Output file') + + pipeline_args, other_args = parser.parse_known_args(argv) + + pipeline = my_pipeline.longest_word_pipeline( + pipeline_args.input, pipeline_args.output, other_args) + + pipeline.run() diff --git a/dataflow/flex-templates/pipeline_with_dependencies/my_package/my_pipeline.py b/dataflow/flex-templates/pipeline_with_dependencies/my_package/my_pipeline.py new file mode 100644 index 000000000000..7c4d3ce2009a --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/my_package/my_pipeline.py @@ -0,0 +1,39 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines a pipeline to create a banner from the longest word in the input.""" + +import apache_beam as beam + +from my_package import my_transforms +from my_package.utils import figlet + + +def longest_word_pipeline( + input_path: str, output_path: str, + pipeline_options_args: list[str]) -> beam.Pipeline: + """Instantiates and returns a Beam pipeline object""" + + pipeline_options = beam.options.pipeline_options.PipelineOptions( + pipeline_options_args) + + pipeline = beam.Pipeline(options=pipeline_options) + _ = ( + pipeline + | 'Read Input' >> beam.io.ReadFromText(input_path) + | 'Find the Longest Word' >> my_transforms.FindLongestWord() + | 'Create a Banner' >> beam.Map(figlet.render) + | 'Write Output' >> beam.io.WriteToText(output_path)) + + return pipeline diff --git a/dataflow/flex-templates/pipeline_with_dependencies/my_package/my_transforms.py b/dataflow/flex-templates/pipeline_with_dependencies/my_package/my_transforms.py new file mode 100644 index 000000000000..da83ddbaa290 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/my_package/my_transforms.py @@ -0,0 +1,37 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines custom PTransforms and DoFns used in the pipleines.""" + +from collections.abc import Iterable +import re + +import apache_beam as beam + + +class WordExtractingDoFn(beam.DoFn): + """Parses each line of input text into words.""" + + def process(self, element: str) -> Iterable[str]: + return re.findall(r'[\w\']+', element, re.UNICODE) + + +class FindLongestWord(beam.PTransform): + """Extracts words from text and finds the longest one.""" + + def expand(self, pcoll): + return ( + pcoll + | "Extract words" >> beam.ParDo(WordExtractingDoFn()) + | "Find longest" >> beam.combiners.Top.Largest(n=1, key=len)) diff --git a/dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/__init__.py b/dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/__init__.py new file mode 100644 index 000000000000..7da70b202c47 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A sample subpackage that might contain utilities and helpers.""" diff --git a/dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/figlet.py b/dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/figlet.py new file mode 100644 index 000000000000..6e5b84ae844a --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/my_package/utils/figlet.py @@ -0,0 +1,25 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A sample module storing helper functions to interface with pyfiglet.""" + +from typing import List + +import pyfiglet + + +def render(words: List[str]) -> str: + """Renders a FIGlet banner for each string in the input list of words.""" + + return "\n".join([pyfiglet.figlet_format(w, width=150) for w in words]) diff --git a/dataflow/flex-templates/pipeline_with_dependencies/noxfile_config.py b/dataflow/flex-templates/pipeline_with_dependencies/noxfile_config.py new file mode 100644 index 000000000000..460411fffee7 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/noxfile_config.py @@ -0,0 +1,23 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is a test configuration file. It is not a part of the sample. + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + # > ℹī¸ We're opting out of all Python versions except 3.11. + # > The Python version used is defined by the Dockerfile, so it's redundant + # > to run multiple tests since they would all be running the same Dockerfile. + "ignored_versions": ["2.7", "3.6", "3.7", "3.8", "3.9", "3.10", "3.12"], +} diff --git a/dataflow/flex-templates/pipeline_with_dependencies/requirements-test.txt b/dataflow/flex-templates/pipeline_with_dependencies/requirements-test.txt new file mode 100644 index 000000000000..16c7c4dffed2 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/requirements-test.txt @@ -0,0 +1,21 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is a test configuration file. It is not a part of the example. + +google-api-python-client==2.87.0 +google-cloud-storage==2.9.0 +pytest-xdist==3.3.0 +pytest==7.0.1 +pyyaml==6.0 diff --git a/dataflow/flex-templates/pipeline_with_dependencies/requirements.txt b/dataflow/flex-templates/pipeline_with_dependencies/requirements.txt new file mode 100644 index 000000000000..463e3c9aec48 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/requirements.txt @@ -0,0 +1,320 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This requirements.txt file lists all Python dependenciences that are installed +# in the Docker image in this example. Listing all dependencies allows +# creating reproducible Python environments. For more information, +# see the "Update the dependencies in the requirements file" section in README.md + +# +# This file is autogenerated by pip-compile with Python 3.11 +# by the following command: +# +# pip-compile ./setup.py +# +apache-beam[gcp]==2.54.0 + # via my_package (setup.py) +attrs==23.2.0 + # via + # jsonschema + # referencing +cachetools==5.3.3 + # via + # apache-beam + # google-auth +certifi==2024.2.2 + # via requests +charset-normalizer==3.3.2 + # via requests +cloudpickle==2.2.1 + # via apache-beam +crcmod==1.7 + # via apache-beam +deprecated==1.2.14 + # via google-cloud-spanner +dill==0.3.1.1 + # via apache-beam +dnspython==2.6.1 + # via pymongo +docopt==0.6.2 + # via hdfs +fastavro==1.9.4 + # via apache-beam +fasteners==0.19 + # via + # apache-beam + # google-apitools +google-api-core[grpc]==2.17.1 + # via + # apache-beam + # google-cloud-aiplatform + # google-cloud-bigquery + # google-cloud-bigquery-storage + # google-cloud-bigtable + # google-cloud-core + # google-cloud-datastore + # google-cloud-dlp + # google-cloud-language + # google-cloud-pubsub + # google-cloud-pubsublite + # google-cloud-recommendations-ai + # google-cloud-resource-manager + # google-cloud-spanner + # google-cloud-storage + # google-cloud-videointelligence + # google-cloud-vision +google-apitools==0.5.31 + # via apache-beam +google-auth==2.28.1 + # via + # apache-beam + # google-api-core + # google-auth-httplib2 + # google-cloud-aiplatform + # google-cloud-core + # google-cloud-dlp + # google-cloud-language + # google-cloud-pubsub + # google-cloud-recommendations-ai + # google-cloud-resource-manager + # google-cloud-storage + # google-cloud-videointelligence + # google-cloud-vision +google-auth-httplib2==0.1.1 + # via apache-beam +google-cloud-aiplatform==1.42.1 + # via apache-beam +google-cloud-bigquery==3.17.2 + # via + # apache-beam + # google-cloud-aiplatform +google-cloud-bigquery-storage==2.24.0 + # via apache-beam +google-cloud-bigtable==2.23.0 + # via apache-beam +google-cloud-core==2.4.1 + # via + # apache-beam + # google-cloud-bigquery + # google-cloud-bigtable + # google-cloud-datastore + # google-cloud-spanner + # google-cloud-storage +google-cloud-datastore==2.19.0 + # via apache-beam +google-cloud-dlp==3.15.2 + # via apache-beam +google-cloud-language==2.13.2 + # via apache-beam +google-cloud-pubsub==2.19.7 + # via + # apache-beam + # google-cloud-pubsublite +google-cloud-pubsublite==1.9.0 + # via apache-beam +google-cloud-recommendations-ai==0.10.9 + # via apache-beam +google-cloud-resource-manager==1.12.2 + # via google-cloud-aiplatform +google-cloud-spanner==3.42.0 + # via apache-beam +google-cloud-storage==2.14.0 + # via + # apache-beam + # google-cloud-aiplatform +google-cloud-videointelligence==2.13.2 + # via apache-beam +google-cloud-vision==3.7.1 + # via apache-beam +google-crc32c==1.5.0 + # via + # google-cloud-storage + # google-resumable-media +google-resumable-media==2.7.0 + # via + # google-cloud-bigquery + # google-cloud-storage +googleapis-common-protos[grpc]==1.62.0 + # via + # google-api-core + # grpc-google-iam-v1 + # grpcio-status +grpc-google-iam-v1==0.13.0 + # via + # google-cloud-bigtable + # google-cloud-pubsub + # google-cloud-resource-manager + # google-cloud-spanner +grpc-interceptor==0.15.4 + # via google-cloud-spanner +grpcio==1.62.0 + # via + # apache-beam + # google-api-core + # google-cloud-pubsub + # google-cloud-pubsublite + # googleapis-common-protos + # grpc-google-iam-v1 + # grpc-interceptor + # grpcio-status +grpcio-status==1.62.0 + # via + # google-api-core + # google-cloud-pubsub + # google-cloud-pubsublite +hdfs==2.7.3 + # via apache-beam +httplib2==0.22.0 + # via + # apache-beam + # google-apitools + # google-auth-httplib2 + # oauth2client +idna==3.6 + # via requests +js2py==0.74 + # via apache-beam +jsonpickle==3.0.3 + # via apache-beam +jsonschema==4.21.1 + # via apache-beam +jsonschema-specifications==2023.12.1 + # via jsonschema +numpy==1.24.4 + # via + # apache-beam + # pyarrow + # shapely +oauth2client==4.1.3 + # via google-apitools +objsize==0.7.0 + # via apache-beam +orjson==3.9.15 + # via apache-beam +overrides==7.7.0 + # via google-cloud-pubsublite +packaging==23.2 + # via + # apache-beam + # google-cloud-aiplatform + # google-cloud-bigquery +proto-plus==1.23.0 + # via + # apache-beam + # google-cloud-aiplatform + # google-cloud-bigquery-storage + # google-cloud-bigtable + # google-cloud-datastore + # google-cloud-dlp + # google-cloud-language + # google-cloud-pubsub + # google-cloud-recommendations-ai + # google-cloud-resource-manager + # google-cloud-spanner + # google-cloud-videointelligence + # google-cloud-vision +protobuf==4.25.3 + # via + # apache-beam + # google-api-core + # google-cloud-aiplatform + # google-cloud-bigquery-storage + # google-cloud-bigtable + # google-cloud-datastore + # google-cloud-dlp + # google-cloud-language + # google-cloud-pubsub + # google-cloud-recommendations-ai + # google-cloud-resource-manager + # google-cloud-spanner + # google-cloud-videointelligence + # google-cloud-vision + # googleapis-common-protos + # grpc-google-iam-v1 + # grpcio-status + # proto-plus +pyarrow==14.0.2 + # via apache-beam +pyarrow-hotfix==0.6 + # via apache-beam +pyasn1==0.5.1 + # via + # oauth2client + # pyasn1-modules + # rsa +pyasn1-modules==0.3.0 + # via + # google-auth + # oauth2client +pydot==1.4.2 + # via apache-beam +pyfiglet==1.0.2 + # via my_package (setup.py) +pyjsparser==2.7.1 + # via js2py +pymongo==4.6.2 + # via apache-beam +pyparsing==3.1.1 + # via + # httplib2 + # pydot +python-dateutil==2.8.2 + # via + # apache-beam + # google-cloud-bigquery +pytz==2024.1 + # via apache-beam +referencing==0.33.0 + # via + # jsonschema + # jsonschema-specifications +regex==2023.12.25 + # via apache-beam +requests==2.31.0 + # via + # apache-beam + # google-api-core + # google-cloud-bigquery + # google-cloud-storage + # hdfs +rpds-py==0.18.0 + # via + # jsonschema + # referencing +rsa==4.9 + # via + # google-auth + # oauth2client +shapely==2.0.3 + # via google-cloud-aiplatform +six==1.16.0 + # via + # google-apitools + # hdfs + # js2py + # oauth2client + # python-dateutil +sqlparse==0.4.4 + # via google-cloud-spanner +typing-extensions==4.10.0 + # via apache-beam +tzlocal==5.2 + # via js2py +urllib3==2.2.1 + # via requests +wrapt==1.16.0 + # via deprecated +zstandard==0.22.0 + # via apache-beam diff --git a/dataflow/flex-templates/pipeline_with_dependencies/setup.py b/dataflow/flex-templates/pipeline_with_dependencies/setup.py new file mode 100644 index 000000000000..194da5c4b330 --- /dev/null +++ b/dataflow/flex-templates/pipeline_with_dependencies/setup.py @@ -0,0 +1,27 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines a Python package for an Apache Beam pipeline.""" + +import setuptools + +setuptools.setup( + name='my_package', + version='0.1.0', + install_requires=[ + 'apache-beam[gcp]==2.54.0', # Must match the version in `Dockerfile``. + 'pyfiglet', # This is the only non-Beam dependency of this pipeline. + ], + packages=setuptools.find_packages(), +) diff --git a/renovate.json b/renovate.json index 5f6065bb14e0..d63f29d05471 100644 --- a/renovate.json +++ b/renovate.json @@ -18,7 +18,8 @@ "composer/**/constraints.txt", "composer/blog/**/constraints.txt", "composer/airflow_1_samples/requirements.txt", - "appengine/standard" + "appengine/standard", + "dataflow/flex-templates/pipeline_with_dependencies/**" ], "packageRules": [ {