From 05fc150116a47927770204a675f30af260d45123 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Mon, 22 Jan 2024 12:31:01 +0100 Subject: [PATCH] [FLINK-33557] Externalize Cassandra Python connector code --- .gitignore | 15 + flink-connector-cassandra/pom.xml | 1 - flink-python/MANIFEST.in | 20 + flink-python/README.txt | 14 + flink-python/dev/integration_test.sh | 50 +++ flink-python/pom.xml | 248 ++++++++++++ .../datastream/connectors/cassandra.py | 369 ++++++++++++++++++ .../connectors/tests/test_cassandra.py | 49 +++ .../pyflink/pyflink_gateway_server.py | 288 ++++++++++++++ flink-python/setup.py | 162 ++++++++ flink-python/tox.ini | 51 +++ pom.xml | 2 + 12 files changed, 1268 insertions(+), 1 deletion(-) create mode 100644 flink-python/MANIFEST.in create mode 100644 flink-python/README.txt create mode 100755 flink-python/dev/integration_test.sh create mode 100644 flink-python/pom.xml create mode 100644 flink-python/pyflink/datastream/connectors/cassandra.py create mode 100644 flink-python/pyflink/datastream/connectors/tests/test_cassandra.py create mode 100644 flink-python/pyflink/pyflink_gateway_server.py create mode 100644 flink-python/setup.py create mode 100644 flink-python/tox.ini diff --git a/.gitignore b/.gitignore index acbe2176..9055a90b 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,18 @@ tools/flink tools/flink-* tools/releasing/release tools/japicmp-output + +# Generated files, do not store in git +flink-python/apache_flink_connectors_cassandra.egg-info/ +flink-python/.tox/ +flink-python/build +flink-python/dist +flink-python/dev/download +flink-python/dev/.conda/ +flink-python/dev/log/ +flink-python/dev/.stage.txt +flink-python/dev/install_command.sh +flink-python/dev/lint-python.sh +flink-python/dev/build-wheels.sh +flink-python/dev/glibc_version_fix.h +flink-python/dev/dev-requirements.txt diff --git a/flink-connector-cassandra/pom.xml b/flink-connector-cassandra/pom.xml index 08508fcd..d30208d4 100644 --- a/flink-connector-cassandra/pom.xml +++ b/flink-connector-cassandra/pom.xml @@ -37,7 +37,6 @@ under the License. - 2.12 2.12.7 1.7.36 2.17.1 diff --git a/flink-python/MANIFEST.in b/flink-python/MANIFEST.in new file mode 100644 index 00000000..720decf9 --- /dev/null +++ b/flink-python/MANIFEST.in @@ -0,0 +1,20 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +graft pyflink +global-exclude *.py[cod] __pycache__ .DS_Store diff --git a/flink-python/README.txt b/flink-python/README.txt new file mode 100644 index 00000000..823aafba --- /dev/null +++ b/flink-python/README.txt @@ -0,0 +1,14 @@ +This is official Apache Flink Cassandra Python connector. + +For the latest information about Flink connector, please visit our website at: + + https://flink.apache.org + +and our GitHub Account for Cassandra connector + + https://github.com/apache/flink-connector-cassandra + +If you have any questions, ask on our Mailing lists: + + user@flink.apache.org + dev@flink.apache.org diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh new file mode 100755 index 00000000..e792674e --- /dev/null +++ b/flink-python/dev/integration_test.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +function test_module() { + module="$FLINK_PYTHON_DIR/pyflink/$1" + echo "test module $module" + pytest --durations=20 ${module} $2 + if [[ $? -ne 0 ]]; then + echo "test module $module failed" + exit 1 + fi +} + +function test_all_modules() { + # test datastream module + test_module "datastream" +} + +# CURRENT_DIR is "/flink-python/dev/" +CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)" + +# FLINK_PYTHON_DIR is "/flink-python/" +FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR") + +# set the FLINK_TEST_LIB_DIR to "/flink-connector-cassandra/flink-python-connector-cassandra/target/dep..." +export FLINK_TEST_LIBS="${FLINK_PYTHON_DIR}/target/test-dependencies/*" + +# Temporarily update the installed 'pyflink_gateway_server.py' files with the new one +# Needed only until Flink 1.19 release +echo "Checking ${FLINK_PYTHON_DIR} for 'pyflink_gateway_server.py'" +find "${FLINK_PYTHON_DIR}/.tox" -name pyflink_gateway_server.py -exec cp "${FLINK_PYTHON_DIR}/pyflink/pyflink_gateway_server.py" {} \; + +# python test +test_all_modules diff --git a/flink-python/pom.xml b/flink-python/pom.xml new file mode 100644 index 00000000..e164595a --- /dev/null +++ b/flink-python/pom.xml @@ -0,0 +1,248 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connector-cassandra-parent + 4.0-SNAPSHOT + + + flink-connector-cassandra-python + Flink : Connectors : Cassandra : Python + + pom + + + + org.apache.flink + flink-connector-cassandra_${scala.binary.version} + ${project.version} + provided + + + org.slf4j + slf4j-api + + + + + + org.apache.flink + flink-runtime + ${flink.version} + test + test-jar + + + org.slf4j + slf4j-api + + + com.esotericsoftware.kryo + kryo + + + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + org.slf4j + slf4j-api + + + + + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + + + org.slf4j + slf4j-api + + + org.apache.commons + commons-compress + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + clean + clean + + run + + + + + + + + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + package + + copy + + + + + org.apache.flink + flink-connector-cassandra_${scala.binary.version} + + + org.apache.flink + flink-runtime + + tests + + + org.apache.flink + flink-test-utils + + + org.apache.flink + flink-connector-test-utils + + + ${project.build.directory}/test-dependencies + + + + copy-dependencies + package + + copy-dependencies + + + junit + ${project.build.directory}/test-dependencies + + + + + + + org.codehaus.mojo + wagon-maven-plugin + 2.0.2 + + + download-install + validate + + download-single + + + + https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/install_command.sh + + ${project.basedir}/dev + ${python.infra.download.skip} + + + + download-lint + validate + + download-single + + + + https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/lint-python.sh + + ${project.basedir}/dev + ${python.infra.download.skip} + + + + download-build-wheels + validate + + download-single + + + + https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/build-wheels.sh + + ${project.basedir}/dev + ${python.infra.download.skip} + + + + download-build-version-header + validate + + download-single + + + + https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/glibc_version_fix.h + + ${project.basedir}/dev + ${python.infra.download.skip} + + + + + + + diff --git a/flink-python/pyflink/datastream/connectors/cassandra.py b/flink-python/pyflink/datastream/connectors/cassandra.py new file mode 100644 index 00000000..b532475e --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/cassandra.py @@ -0,0 +1,369 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from enum import Enum + +from pyflink.common import Duration +from pyflink.java_gateway import get_gateway + + +__all__ = [ + 'CassandraSink', + 'ConsistencyLevel', + 'MapperOptions', + 'ClusterBuilder', + 'CassandraCommitter', + 'CassandraFailureHandler' +] + +# ---- Classes introduced to construct the MapperOptions ---- + + +class ConsistencyLevel(Enum): + """ + The consistency level + """ + ANY = 0 + ONE = 1 + TWO = 2 + THREE = 3 + QUORUM = 4 + ALL = 5 + LOCAL_QUORUM = 6 + EACH_QUORUM = 7 + SERIAL = 8 + LOCAL_SERIAL = 9 + LOCAL_ONE = 10 + + def _to_j_consistency_level(self): + JConsistencyLevel = get_gateway().jvm.com.datastax.driver.core.ConsistencyLevel + return getattr(JConsistencyLevel, self.name) + + +class MapperOptions(object): + """ + This class is used to configure a Mapper after deployment. + """ + + def __init__(self): + """ + A simple method to construct MapperOptions. + + Example: + :: + + >>> mapper_option = MapperOptions() \\ + ... .ttl(1800) \\ + ... .timestamp(3600) \\ + ... .consistency_level(ConsistencyLevel.ANY) \\ + ... .tracing(True) \\ + ... .save_null_fields(True) + """ + JSimpleMapperOptions = get_gateway().jvm.org.apache.flink.streaming.connectors. \ + cassandra.SimpleMapperOptions + self._j_mapper_options = JSimpleMapperOptions() + + def ttl(self, ttl: int) -> 'MapperOptions': + """ + Creates a new Option object to add time-to-live to a mapper operation. This is only + valid for save operations. + """ + self._j_mapper_options.ttl(ttl) + return self + + def timestamp(self, timestamp: int) -> 'MapperOptions': + """ + Creates a new Option object to add a timestamp to a mapper operation. This is only + valid for save and delete operations. + """ + self._j_mapper_options.timestamp(timestamp) + return self + + def consistency_level(self, cl: ConsistencyLevel) -> 'MapperOptions': + """ + Creates a new Option object to add a consistency level value to a mapper operation. + This is valid for save, delete and get operations. + """ + self._j_mapper_options.consistencyLevel(cl._to_j_consistency_level()) + return self + + def tracing(self, enabled: bool) -> 'MapperOptions': + """ + Creates a new Option object to enable query tracing for a mapper operation. This is + valid for save, delete and get operations. + """ + self._j_mapper_options.tracing(enabled) + return self + + def save_null_fields(self, enabled: bool) -> 'MapperOptions': + """ + Creates a new Option object to specify whether null entity fields should be included in + insert queries. This option is valid only for save operations. + """ + self._j_mapper_options.saveNullFields(enabled) + return self + + def if_not_exists(self, enabled: bool) -> 'MapperOptions': + """ + Creates a new Option object to specify whether an IF NOT EXISTS clause should be included in + insert queries. This option is valid only for save operations. + + If this option is not specified, it defaults to false (IF NOT EXISTS statements are not + used). + """ + self._j_mapper_options.ifNotExists(enabled) + return self + + +class ClusterBuilder(object): + """ + This class is used to configure a Cluster after deployment. The cluster represents the + connection that will be established to Cassandra. + """ + + def __init__(self, j_cluster_builder): + self._j_cluster_builder = j_cluster_builder + + +class CassandraCommitter(object): + """ + CheckpointCommitter that saves information about completed checkpoints within a separate table + in a cassandra database. + """ + + def __init__(self, j_checkpoint_committer): + self._j_checkpoint_committer = j_checkpoint_committer + + @staticmethod + def default_checkpoint_committer(builder: ClusterBuilder, key_space: str = None) \ + -> 'CassandraCommitter': + """ + CheckpointCommitter that saves information about completed checkpoints within a separate + table in a cassandra database. + + Entries are in the form: | operator_id | subtask_id | last_completed_checkpoint | + """ + JCassandraCommitter = get_gateway().jvm.org.apache.flink.streaming.connectors. \ + cassandra.CassandraCommitter + if key_space is None: + j_checkpoint_committer = JCassandraCommitter(builder._j_cluster_builder) + else: + j_checkpoint_committer = JCassandraCommitter(builder._j_cluster_builder, key_space) + return CassandraCommitter(j_checkpoint_committer) + + +class CassandraFailureHandler(object): + """ + Handle a failed Throwable. + """ + + def __init__(self, j_cassandra_failure_handler): + self._j_cassandra_failure_handler = j_cassandra_failure_handler + + @staticmethod + def no_op() -> 'CassandraFailureHandler': + """ + A CassandraFailureHandler that simply fails the sink on any failures. + This is also the default failure handler if not specified. + """ + return CassandraFailureHandler(get_gateway().jvm.org.apache.flink.streaming.connectors. + cassandra.NoOpCassandraFailureHandler()) + + +# ---- CassandraSink ---- + + +class CassandraSink(object): + """ + Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to + cassandra. + """ + + def __init__(self, j_cassandra_sink): + self._j_cassandra_sink = j_cassandra_sink + + def name(self, name: str) -> 'CassandraSink': + """ + Set the name of this sink. This name is used by the visualization and logging during + runtime. + """ + self._j_cassandra_sink.name(name) + return self + + def uid(self, uid: str) -> 'CassandraSink': + """ + Sets an ID for this operator. The specified ID is used to assign the same operator ID + across job submissions (for example when starting a job from a savepoint). + Note that this ID needs to be unique per transformation and job. Otherwise, job submission + will fail. + """ + self._j_cassandra_sink.uid(uid) + return self + + def set_uid_hash(self, uid_hash: str) -> 'CassandraSink': + """ + Sets an user provided hash for this operator. This will be used AS IS the create the + JobVertexID. + + The user provided hash is an alternative to the generated hashes, that is considered when + identifying an operator through the default hash mechanics fails (e.g. because of changes + between Flink versions). + + Note that this should be used as a workaround or for trouble shooting. The provided hash + needs to be unique per transformation and job. Otherwise, job submission will fail. + Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator + chain and trying so will let your job fail. + + A use case for this is in migration between Flink versions or changing the jobs in a way + that changes the automatically generated hashes. In this case, providing the previous hashes + directly through this method (e.g. obtained from old logs) can help to reestablish a lost + mapping from states to their target operator. + """ + self._j_cassandra_sink.setUidHash(uid_hash) + return self + + def set_parallelism(self, parallelism: int) -> 'CassandraSink': + """ + Sets the parallelism for this sink. The degree must be higher than zero. + """ + self._j_cassandra_sink.setParallelism(parallelism) + return self + + def disable_chaining(self) -> 'CassandraSink': + """ + Turns off chaining for this operator so thread co-location will not be used as an + optimization. + """ + self._j_cassandra_sink.disableChaining() + return self + + def slot_sharing_group(self, slot_sharing_group: str) -> 'CassandraSink': + """ + Sets the slot sharing group of this operation. Parallel instances of operations that are in + the same slot sharing group will be co-located in the same TaskManager slot, if possible. + + Operations inherit the slot sharing group of input operations if all input operations are in + the same slot sharing group and no slot sharing group was explicitly specified. + + Initially an operation is in the default slot sharing group. An operation can be put into + the default group explicitly by setting the slot sharing group to {@code "default"}. + """ + self._j_cassandra_sink.slotSharingGroup(slot_sharing_group) + return self + + @staticmethod + def add_sink(input) -> 'CassandraSinkBuilder': + """ + Writes a DataStream into a Cassandra database. + """ + JCassandraSink = get_gateway().jvm \ + .org.apache.flink.streaming.connectors.cassandra.CassandraSink + j_cassandra_sink_builder = JCassandraSink.addSink(input._j_data_stream) + return CassandraSink.CassandraSinkBuilder(j_cassandra_sink_builder) + + class CassandraSinkBuilder(object): + """ + Builder for a CassandraSink. + """ + + def __init__(self, j_cassandra_sink_builder): + self._j_cassandra_sink_builder = j_cassandra_sink_builder + + def set_query(self, query: str) -> 'CassandraSink.CassandraSinkBuilder': + """ + Sets the query that is to be executed for every record. + """ + self._j_cassandra_sink_builder.setQuery(query) + return self + + def set_host(self, host: str, port: int = 9042) -> 'CassandraSink.CassandraSinkBuilder': + """ + Sets the cassandra host/port to connect to. + """ + self._j_cassandra_sink_builder.setHost(host, port) + return self + + def set_cluster_builder(self, builder: ClusterBuilder) \ + -> 'CassandraSink.CassandraSinkBuilder': + """ + Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the + connection to cassandra. + """ + self._j_cassandra_sink_builder.setClusterBuilder(builder._j_cluster_builder) + return self + + def enable_write_ahead_log(self, committer: CassandraCommitter = None) \ + -> 'CassandraSink.CassandraSinkBuilder': + """ + Enables the write-ahead log, which allows exactly-once processing for non-deterministic + algorithms that use idempotent updates. + """ + if committer is None: + self._j_cassandra_sink_builder.enableWriteAheadLog() + else: + self._j_cassandra_sink_builder.enableWriteAheadLog( + committer._j_checkpoint_committer) + return self + + def set_mapper_options(self, options: MapperOptions) \ + -> 'CassandraSink.CassandraSinkBuilder': + """ + Sets the mapper options for this sink. The mapper options are used to configure the + DataStax com.datastax.driver.mapping.Mapper when writing POJOs. + This call has no effect if the input DataStream for this sink does not contain POJOs. + """ + self._j_cassandra_sink_builder.setMapperOptions(options._j_mapper_options) + return self + + def set_failure_handler(self, failure_handler: CassandraFailureHandler) \ + -> 'CassandraSink.CassandraSinkBuilder': + """ + Sets the failure handler for this sink. The failure handler is used to provide custom + error handling. + """ + self._j_cassandra_sink_builder.setFailureHandler( + failure_handler._j_cassandra_failure_handler) + return self + + def set_max_concurrent_requests(self, + max_concurrent_requests: int, + duration: Duration = None) \ + -> 'CassandraSink.CassandraSinkBuilder': + """ + Sets the maximum allowed number of concurrent requests for this sink. + """ + if duration is None: + self._j_cassandra_sink_builder.setMaxConcurrentRequests(max_concurrent_requests) + else: + self._j_cassandra_sink_builder.setMaxConcurrentRequests( + max_concurrent_requests, duration._j_duration) + return self + + def enable_ignore_null_fields(self) -> 'CassandraSink.CassandraSinkBuilder': + """ + Enables ignoring null values, treats null values as unset and avoids writing null fields + and creating tombstones. + This call has no effect if CassandraSinkBuilder.enableWriteAheadLog() is called. + """ + self._j_cassandra_sink_builder.enableIgnoreNullFields() + return self + + def build(self) -> 'CassandraSink': + """ + Finalizes the configuration of this sink. + """ + return CassandraSink(self._j_cassandra_sink_builder.build()) diff --git a/flink-python/pyflink/datastream/connectors/tests/test_cassandra.py b/flink-python/pyflink/datastream/connectors/tests/test_cassandra.py new file mode 100644 index 00000000..3b38e482 --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/tests/test_cassandra.py @@ -0,0 +1,49 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.common import Types +from pyflink.datastream.connectors.cassandra import CassandraSink, MapperOptions, ConsistencyLevel +from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase + + +class CassandraSinkTest(PyFlinkStreamingTestCase): + + def test_cassandra_sink(self): + type_info = Types.ROW([Types.STRING(), Types.INT()]) + ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), ('deeefg', 4)], + type_info=type_info) + cassandra_sink_builder = CassandraSink.add_sink(ds) + + cassandra_sink = cassandra_sink_builder \ + .set_host('localhost', 9876) \ + .set_query('query') \ + .enable_ignore_null_fields() \ + .set_mapper_options(MapperOptions() + .ttl(1) + .timestamp(100) + .tracing(True) + .if_not_exists(False) + .consistency_level(ConsistencyLevel.ANY) + .save_null_fields(True)) \ + .set_max_concurrent_requests(1000) \ + .build() + + cassandra_sink.name('cassandra_sink').set_parallelism(3) + + plan = eval(self.env.get_execution_plan()) + self.assertEqual("Sink: cassandra_sink", plan['nodes'][1]['type']) + self.assertEqual(3, plan['nodes'][1]['parallelism']) diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py new file mode 100644 index 00000000..935b94c6 --- /dev/null +++ b/flink-python/pyflink/pyflink_gateway_server.py @@ -0,0 +1,288 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This is a copy of the pyflink_gateway_server.py file from the Flink. +# The original file which is accessible here: +# https://github.com/apache/flink/blob/master/flink-python/pyflink/pyflink_gateway_server.py +# Additional change is the handling of the FLINK_TEST_LIB_DIR environmental variable. +# It could be used to add extra testing jars for the gateway classpath. +# The plan is to remove this once Pyflink 1.19 is released + +import argparse +import getpass +import glob +import os +import platform +import signal +import socket +import sys +from collections import namedtuple +from string import Template +from subprocess import Popen, PIPE + +from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root + +KEY_ENV_LOG_DIR = "env.log.dir" +KEY_ENV_YARN_CONF_DIR = "env.yarn.conf.dir" +KEY_ENV_HADOOP_CONF_DIR = "env.hadoop.conf.dir" +KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir" +KEY_ENV_JAVA_HOME = "env.java.home" +KEY_ENV_JAVA_OPTS = "env.java.opts.all" +KEY_ENV_JAVA_OPTS_DEPRECATED = "env.java.opts" + + +def on_windows(): + return platform.system() == "Windows" + + +def read_from_config(key, default_value, flink_conf_file): + value = default_value + # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI + # using the tainted value and might allow an attacker to access, modify, or test the existence + # of critical or sensitive files. + with open(os.path.realpath(flink_conf_file), "r") as f: + while True: + line = f.readline() + if not line: + break + if line.startswith("#") or len(line.strip()) == 0: + continue + k, v = line.split(":", 1) + if k.strip() == key: + value = v.strip() + return value + + +def find_java_executable(): + java_executable = "java.exe" if on_windows() else "java" + flink_home = _find_flink_home() + flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml") + java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file) + + if java_home is None and "JAVA_HOME" in os.environ: + java_home = os.environ["JAVA_HOME"] + + if java_home is not None: + java_executable = os.path.join(java_home, "bin", java_executable) + + return java_executable + + +def prepare_environment_variables(env): + flink_home = _find_flink_home() + # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI + # using the tainted value and might allow an attacker to access, modify, or test the existence + # of critical or sensitive files. + real_flink_home = os.path.realpath(flink_home) + + if 'FLINK_CONF_DIR' in env: + flink_conf_directory = os.path.realpath(env['FLINK_CONF_DIR']) + else: + flink_conf_directory = os.path.join(real_flink_home, "conf") + env['FLINK_CONF_DIR'] = flink_conf_directory + + if 'FLINK_LIB_DIR' in env: + flink_lib_directory = os.path.realpath(env['FLINK_LIB_DIR']) + else: + flink_lib_directory = os.path.join(real_flink_home, "lib") + env['FLINK_LIB_DIR'] = flink_lib_directory + + if 'FLINK_OPT_DIR' in env: + flink_opt_directory = os.path.realpath(env['FLINK_OPT_DIR']) + else: + flink_opt_directory = os.path.join(real_flink_home, "opt") + env['FLINK_OPT_DIR'] = flink_opt_directory + + if 'FLINK_PLUGINS_DIR' in env: + flink_plugins_directory = os.path.realpath(env['FLINK_PLUGINS_DIR']) + else: + flink_plugins_directory = os.path.join(real_flink_home, "plugins") + env['FLINK_PLUGINS_DIR'] = flink_plugins_directory + + env["FLINK_BIN_DIR"] = os.path.join(real_flink_home, "bin") + + +def construct_log_settings(env): + templates = [ + "-Dlog.file=${flink_log_dir}/flink-${flink_ident_string}-python-${hostname}.log", + "-Dlog4j.configuration=${log4j_properties}", + "-Dlog4j.configurationFile=${log4j_properties}", + "-Dlogback.configurationFile=${logback_xml}" + ] + + flink_home = os.path.realpath(_find_flink_home()) + flink_conf_dir = env['FLINK_CONF_DIR'] + flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") + + if "FLINK_LOG_DIR" in env: + flink_log_dir = env["FLINK_LOG_DIR"] + else: + flink_log_dir = read_from_config( + KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file) + + if "LOG4J_PROPERTIES" in env: + log4j_properties = env["LOG4J_PROPERTIES"] + else: + log4j_properties = "%s/log4j-cli.properties" % flink_conf_dir + + if "LOGBACK_XML" in env: + logback_xml = env["LOGBACK_XML"] + else: + logback_xml = "%s/logback.xml" % flink_conf_dir + + if "FLINK_IDENT_STRING" in env: + flink_ident_string = env["FLINK_IDENT_STRING"] + else: + flink_ident_string = getpass.getuser() + + hostname = socket.gethostname() + log_settings = [] + for template in templates: + log_settings.append(Template(template).substitute( + log4j_properties=log4j_properties, + logback_xml=logback_xml, + flink_log_dir=flink_log_dir, + flink_ident_string=flink_ident_string, + hostname=hostname)) + return log_settings + + +def get_jvm_opts(env): + flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") + jvm_opts = env.get( + 'FLINK_ENV_JAVA_OPTS', + read_from_config( + KEY_ENV_JAVA_OPTS, + read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", flink_conf_file), + flink_conf_file)) + + # Remove leading and ending double quotes (if present) of value + jvm_opts = jvm_opts.strip("\"") + return jvm_opts.split(" ") + + +def construct_flink_classpath(env): + flink_home = _find_flink_home() + flink_lib_directory = env['FLINK_LIB_DIR'] + flink_opt_directory = env['FLINK_OPT_DIR'] + + if on_windows(): + # The command length is limited on Windows. To avoid the problem we should shorten the + # command length as much as possible. + lib_jars = os.path.join(flink_lib_directory, "*") + else: + lib_jars = os.pathsep.join(glob.glob(os.path.join(flink_lib_directory, "*.jar"))) + + flink_python_jars = glob.glob(os.path.join(flink_opt_directory, "flink-python*.jar")) + if len(flink_python_jars) < 1: + print("The flink-python jar is not found in the opt folder of the FLINK_HOME: %s" % + flink_home) + return lib_jars + flink_python_jar = flink_python_jars[0] + + return os.pathsep.join([lib_jars, flink_python_jar]) + + +def construct_hadoop_classpath(env): + flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") + + hadoop_conf_dir = "" + if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env: + if os.path.isdir("/etc/hadoop/conf"): + print("Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or" + "HADOOP_CLASSPATH was set.") + hadoop_conf_dir = "/etc/hadoop/conf" + + hbase_conf_dir = "" + if 'HBASE_CONF_DIR' not in env: + if os.path.isdir("/etc/hbase/conf"): + print("Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.") + hbase_conf_dir = "/etc/hbase/conf" + + return os.pathsep.join( + [env.get("HADOOP_CLASSPATH", ""), + env.get("YARN_CONF_DIR", + read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)), + env.get("HADOOP_CONF_DIR", + read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, flink_conf_file)), + env.get("HBASE_CONF_DIR", + read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, flink_conf_file))]) + + +def construct_test_classpath(env): + test_jar_patterns = [ + "flink-python/target/test-dependencies/*", + "flink-python/target/artifacts/testDataStream.jar", + "flink-python/target/flink-python*-tests.jar", + ] + test_jars = [] + + # Connector tests need to add specific jars to the gateway classpath + if 'FLINK_TEST_LIBS' in env: + test_jars += glob.glob(env['FLINK_TEST_LIBS']) + else: + flink_source_root = _find_flink_source_root() + for pattern in test_jar_patterns: + pattern = pattern.replace("/", os.path.sep) + test_jars += glob.glob(os.path.join(flink_source_root, pattern)) + return os.path.pathsep.join(test_jars) + + +def construct_program_args(args): + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--class", required=True) + parser.add_argument("cluster_type", choices=["local", "remote", "yarn"]) + parse_result, other_args = parser.parse_known_args(args) + main_class = getattr(parse_result, "class") + cluster_type = parse_result.cluster_type + return namedtuple( + "ProgramArgs", ["main_class", "cluster_type", "other_args"])( + main_class, cluster_type, other_args) + + +def launch_gateway_server_process(env, args): + prepare_environment_variables(env) + program_args = construct_program_args(args) + if program_args.cluster_type == "local": + java_executable = find_java_executable() + log_settings = construct_log_settings(env) + jvm_args = env.get('JVM_ARGS', '') + jvm_opts = get_jvm_opts(env) + classpath = os.pathsep.join( + [construct_flink_classpath(env), construct_hadoop_classpath(env)]) + if "FLINK_TESTING" in env: + classpath = os.pathsep.join([classpath, construct_test_classpath(env)]) + command = [java_executable, jvm_args, "-XX:+IgnoreUnrecognizedVMOptions", + "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED"] \ + + jvm_opts + log_settings \ + + ["-cp", classpath, program_args.main_class] + program_args.other_args + else: + command = [os.path.join(env["FLINK_BIN_DIR"], "flink"), "run"] + program_args.other_args\ + + ["-c", program_args.main_class] + preexec_fn = None + if not on_windows(): + def preexec_func(): + # ignore ctrl-c / SIGINT + signal.signal(signal.SIGINT, signal.SIG_IGN) + preexec_fn = preexec_func + return Popen(list(filter(lambda c: len(c) != 0, command)), + stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env) + + +if __name__ == "__main__": + launch_gateway_server_process(os.environ, sys.argv[1:]) diff --git a/flink-python/setup.py b/flink-python/setup.py new file mode 100644 index 00000000..1b1e9d00 --- /dev/null +++ b/flink-python/setup.py @@ -0,0 +1,162 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from __future__ import print_function + +import glob +import io +import os +import sys + +from setuptools import setup +from shutil import copy, rmtree +from xml.etree import ElementTree as ET + +PACKAGE_NAME = 'apache-flink-connector-aws' +# Source files, directories +CURRENT_DIR = os.path.abspath(os.path.dirname(__file__)) +POM_FILE = os.path.join(CURRENT_DIR, '../pom.xml') +README_FILE = os.path.join(CURRENT_DIR, 'README.txt') + +# Generated files and directories +VERSION_FILE = os.path.join( + CURRENT_DIR, + 'pyflink/datastream/connectors/cassandra_connector_version.py') +LIB_PATH = os.path.join(CURRENT_DIR, 'pyflink/lib') +DEPENDENCY_FILE = os.path.join(CURRENT_DIR, 'dev/dev-requirements.txt') + + +# Removes a file or directory if exists. +def remove_if_exists(file_path): + if os.path.exists(file_path): + if os.path.isfile(file_path): + os.remove(file_path) + if os.path.isdir(file_path): + rmtree(file_path) + + +# Reads the content of the README.txt file. +def readme_content(): + with io.open(README_FILE, 'r', encoding='utf-8') as f: + return f.read() + + +# Reads the parameters used by the setup command. +# The source is the cassandra_connector_version.py and the README.txt. +def setup_parameters(): + try: + exec(open(VERSION_FILE).read()) + return locals()['__connector_version__'], locals()['__flink_dependency__'], readme_content() + except IOError: + print("Failed to load PyFlink version file for packaging. " + + "'%s' not found!" % VERSION_FILE, + file=sys.stderr) + sys.exit(-1) + + +# Reads and parses the flink-connector-aws main pom.xml. +# Based on the version data in the pom.xml prepares the pyflink dir: +# - Generates cassandra_connector_version.py +# - Generates dev-requirements.txt +def prepare_pyflink_dir(): + # source files + pom_root = ET.parse(POM_FILE).getroot() + flink_version = pom_root.findall( + "./{http://maven.apache.org/POM/4.0.0}properties/" + + "{http://maven.apache.org/POM/4.0.0}flink.version" + )[0].text + connector_version = pom_root.findall( + "./{http://maven.apache.org/POM/4.0.0}version")[0].text.replace("-SNAPSHOT", ".dev0") + + flink_dependency = "apache-flink>=" + flink_version + + os.makedirs(LIB_PATH) + connector_jar = \ + glob.glob(CURRENT_DIR + '/target/test-dependencies/flink-connector-cassandra*.jar')[0] + copy(connector_jar, LIB_PATH) + + with io.open(VERSION_FILE, 'w', encoding='utf-8') as f: + f.write('# Generated file, do not edit\n') + f.write('__connector_version__ = "' + connector_version + '"\n') + f.write('__flink_dependency__ = "' + flink_dependency + '"\n') + + with io.open(DEPENDENCY_FILE, 'w', encoding='utf-8') as f: + f.write('# Generated file, do not edit\n') + f.write(flink_dependency + '\n') + +# Main +print("Python version used to package: " + sys.version) + +# Python version check +if sys.version_info < (3, 7): + print("Python versions prior to 3.7 are not supported for PyFlink.", + file=sys.stderr) + sys.exit(-1) + +# Checks the running environment: +# - In the connector source root directory - package preparation +# - Otherwise - package deployment +in_flink_source = os.path.isfile( + "../flink-connector-cassandra/src/main/java/org/apache/flink" + "/connector/cassandra/source/CassandraSource.java") + +# Cleans up the generated files and directories and regenerate them. +if in_flink_source: + remove_if_exists(VERSION_FILE) + remove_if_exists(DEPENDENCY_FILE) + remove_if_exists(LIB_PATH) + prepare_pyflink_dir() + print("\nPreparing Flink Cassandra connector package") + +# Reads the current setup data from the cassandra_connector_version.py file and the README.txt +(connector_version, flink_dependency, long_description) = setup_parameters() + +print("\nConnector version: " + connector_version) +print("Flink dependency: " + flink_dependency + "\n") + +if in_flink_source: + # Removes temporary directory used by the setup tool + remove_if_exists(PACKAGE_NAME.replace('-', '_') + '.egg-info') + +# Runs the python setup +setup( + name=PACKAGE_NAME, + version=connector_version, + include_package_data=True, + url='https://flink.apache.org', + license='https://www.apache.org/licenses/LICENSE-2.0', + author='Apache Software Foundation', + author_email='dev@flink.apache.org', + python_requires='>=3.7', + install_requires=[flink_dependency], + description='Apache Flink Python Cassandra Connector API', + long_description=long_description, + long_description_content_type='text/plain', + zip_safe=False, + py_modules=[ + "pyflink" + ], + classifiers=[ + 'Development Status :: 5 - Production/Stable', + 'License :: OSI Approved :: Apache Software License', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10'] +) + +print("\nFlink Cassandra connector package is ready\n") diff --git a/flink-python/tox.ini b/flink-python/tox.ini new file mode 100644 index 00000000..acbc519b --- /dev/null +++ b/flink-python/tox.ini @@ -0,0 +1,51 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +[tox] +# tox (https://tox.readthedocs.io/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. +# new environments will be excluded by default unless explicitly added to envlist. +envlist = {py37, py38, py39, py310}-cython + +[testenv] +whitelist_externals = /bin/bash +deps = apache-flink +passenv = * +commands = + python --version + pip install pytest + bash ./dev/integration_test.sh +# Replace the default installation command with a custom retry installation script, because on high-speed +# networks, downloading a package may raise a ConnectionResetError: [Errno 104] Peer reset connection. +install_command = {toxinidir}/dev/install_command.sh {opts} {packages} + +[flake8] +# We follow PEP 8 (https://www.python.org/dev/peps/pep-0008/) with one exception: lines can be +# up to 100 characters in length, not 79. +ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504 +max-line-length=100 +exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/* + +[mypy] +files=pyflink/datastream/connectors/*.py +ignore_missing_imports = True +strict_optional=False + +[mypy-pyflink.fn_execution.*] +ignore_errors = True diff --git a/pom.xml b/pom.xml index 4f52479d..f42ef191 100644 --- a/pom.xml +++ b/pom.xml @@ -43,11 +43,13 @@ under the License. 1.18.0 + 2.12 3.0.0-1.16 flink-connector-cassandra + flink-python