diff --git a/examples/beam-python/Dockerfile b/examples/beam-python/Dockerfile new file mode 100644 index 00000000..ff4fcf4c --- /dev/null +++ b/examples/beam-python/Dockerfile @@ -0,0 +1,39 @@ +FROM flink:1.8.3-scala_2.12 AS flink +FROM apachebeam/python3.6_sdk:2.17.0 + +# Install dependencies +RUN set -ex \ + && apt-get update \ + && apt-get -y install \ + gettext-base \ + openjdk-8-jre-headless \ + openjdk-8-jdk-headless \ + && rm -rf /var/lib/apt/lists/* + +# add Flink from the official Flink image +ENV FLINK_HOME=/opt/flink +ENV PATH=$PATH:$FLINK_HOME/bin +COPY --from=flink $FLINK_HOME $FLINK_HOME + +# Install the job server, this will be the Flink entry point +RUN \ + mkdir -p /opt/flink/flink-web-upload \ + && ( \ + cd /opt/flink/flink-web-upload \ + && curl -f -O https://repository.apache.org/content/groups/public/org/apache/beam/beam-runners-flink-1.8-job-server/2.17.0/beam-runners-flink-1.8-job-server-2.17.0.jar \ + && ln -s beam-runners-flink-1.8-job-server*.jar beam-runner.jar \ + ) \ + && echo 'jobmanager.web.upload.dir: /opt/flink' >> $FLINK_HOME/conf/flink-conf.yaml + +# Application code - this can be moved to an s2i assemble script +COPY . /code +WORKDIR /code/src +RUN \ + pip install -r /code/src/requirements.txt + +# entry point for FlinkK8sOperator Flink config +COPY docker-entrypoint.sh / + +ENTRYPOINT ["/docker-entrypoint.sh"] +EXPOSE 6123 8081 +CMD ["local"] diff --git a/examples/beam-python/README.md b/examples/beam-python/README.md new file mode 100644 index 00000000..cd35600d --- /dev/null +++ b/examples/beam-python/README.md @@ -0,0 +1,9 @@ +# Beam Python Application example + +This example shows how to build a Docker image for a Beam Python application that is compatible with the Flink operator, from Flink and Beam base containers. + +The Python SDK workers run within the task manager container and the pipeline is submitted through the native Flink entry point (no Beam job server required). For more information about the Beam deployment see this [document](https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d). + +To deploy the example locally: `kubectl create -f flink-operator-custom-resource.yaml` + +Flink UI (after running `kubectl proxy`): `http://localhost:8001/api/v1/namespaces/flink-operator/services/beam-python-flinkk8soperator-example:8081/proxy/#/overview` diff --git a/examples/beam-python/docker-entrypoint.sh b/examples/beam-python/docker-entrypoint.sh new file mode 100755 index 00000000..b3ef3e12 --- /dev/null +++ b/examples/beam-python/docker-entrypoint.sh @@ -0,0 +1,48 @@ +#!/bin/sh + +drop_privs_cmd() { + if [ $(id -u) != 0 ]; then + # Don't need to drop privs if EUID != 0 + return + elif [ -x /sbin/su-exec ]; then + # Alpine + echo su-exec + else + # Others + #echo gosu flink + echo "" + fi +} + +# Add in extra configs set by the operator +if [ -n "$FLINK_PROPERTIES" ]; then + echo "$FLINK_PROPERTIES" >> "$FLINK_HOME/conf/flink-conf.yaml" +fi + +envsubst < $FLINK_HOME/conf/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml.tmp +mv $FLINK_HOME/conf/flink-conf.yaml.tmp $FLINK_HOME/conf/flink-conf.yaml + +COMMAND=$@ + +if [ $# -lt 1 ]; then + COMMAND="local" +fi +echo "COMMAND: $COMMAND" + +if [ "$COMMAND" = "help" ]; then + echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)" + exit 0 +elif [ "$COMMAND" = "jobmanager" ]; then + echo "Starting Job Manager" + echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml" + exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground +elif [ "$COMMAND" = "taskmanager" ]; then + echo "Starting Task Manager" + echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml" + exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground +elif [ "$COMMAND" = "local" ]; then + echo "Starting local cluster" + exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground local +fi + +exec "$@" diff --git a/examples/beam-python/flink-operator-custom-resource.yaml b/examples/beam-python/flink-operator-custom-resource.yaml new file mode 100644 index 00000000..e8b093b2 --- /dev/null +++ b/examples/beam-python/flink-operator-custom-resource.yaml @@ -0,0 +1,35 @@ +apiVersion: flink.k8s.io/v1beta1 +kind: FlinkApplication +metadata: + name: beam-python-flinkk8soperator-example + namespace: flink-operator + annotations: + labels: + environment: development +spec: + #image: docker.io/lyft/flinkk8soperator-example-beam:{sha} + image: flinkk8soperator-example-beam + flinkConfig: + taskmanager.network.memory.fraction: 0.1 + taskmanager.network.memory.min: 10m + state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints + state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints + state.savepoints.dir: file:///checkpoints/flink/savepoints + jobManagerConfig: + resources: + requests: + memory: "200Mi" + cpu: "0.1" + replicas: 1 + taskManagerConfig: + taskSlots: 2 + resources: + requests: + memory: "200Mi" + cpu: "0.1" + flinkVersion: "1.8" + jarName: "beam-runner.jar" + parallelism: 1 + entryClass: "org.apache.beam.runners.flink.FlinkPortableClientEntryPoint" + programArgs: "--driver-cmd \"cd /code/src; exec python -m beam_example.pipeline --job_name=beam-flinkk8soperator-example\"" + deleteMode: None diff --git a/examples/beam-python/src/beam_example/__init__.py b/examples/beam-python/src/beam_example/__init__.py new file mode 100644 index 00000000..3be6bd29 --- /dev/null +++ b/examples/beam-python/src/beam_example/__init__.py @@ -0,0 +1 @@ +from __future__ import absolute_import \ No newline at end of file diff --git a/examples/beam-python/src/beam_example/pipeline.py b/examples/beam-python/src/beam_example/pipeline.py new file mode 100644 index 00000000..79db2b59 --- /dev/null +++ b/examples/beam-python/src/beam_example/pipeline.py @@ -0,0 +1,28 @@ +from __future__ import print_function + +from __future__ import absolute_import + +import sys +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +if __name__ == "__main__": + # --job_endpoint argument supplied by the Flink entry point + args = [ + "--runner=PortableRunner", + "--streaming", + "--sdk_worker_parallelism=2", + "--job_name=beam-on-flinkk8soperator", + "--environment_type=PROCESS", + "--environment_config={\"command\": \"/opt/apache/beam/boot\"}", + ] + # command line options override defaults + args.extend(sys.argv[1:]) + print("args: " + str(args)) + pipeline = beam.Pipeline(options=PipelineOptions(args)) + pcoll = (pipeline + | beam.Create([0, 1, 2]) + | beam.Map(lambda x: x)) + result = pipeline.run() + # streaming job does not finish + #result.wait_until_finish() diff --git a/examples/beam-python/src/requirements.txt b/examples/beam-python/src/requirements.txt new file mode 100644 index 00000000..6bbe939a --- /dev/null +++ b/examples/beam-python/src/requirements.txt @@ -0,0 +1,4 @@ +# Add your dependencies here +# +numpy==1.16.4 # via pyarrow +apache-beam==2.17.0