From 180bfb23dac5b48e3dbc007918f5a0028cd3332c Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Wed, 31 Jul 2024 20:07:00 +0800 Subject: [PATCH] [feature] Introduce flink jdbc catalog store plugin. (#3914) * [catalog] Introduce flink jdbc catalog store plugin. * fixed delete where unmatched * improve jdbc close method. * [catalog] refactor project module to stream-flink. * [catalog] refactor package name * [catalog] delete unused logs --- .idea/vcs.xml | 28 +-- streampark-flink/pom.xml | 6 +- .../streampark-flink-catalog-store/pom.xml | 138 ++++++++++++++ .../streampark/catalog/JacksonUtils.java | 54 ++++++ .../streampark/catalog/JdbcCatalogStore.java | 175 ++++++++++++++++++ .../catalog/JdbcCatalogStoreFactory.java | 101 ++++++++++ .../JdbcCatalogStoreFactoryOptions.java | 71 +++++++ .../connections/JdbcConnectionOptions.java | 153 +++++++++++++++ .../connections/JdbcConnectionProvider.java | 85 +++++++++ .../SimpleJdbcConnectionProvider.java | 150 +++++++++++++++ .../org.apache.flink.table.factories.Factory | 16 ++ .../catalog/JdbcCatalogStoreFactoryTest.java | 56 ++++++ .../catalog/JdbcCatalogStoreTest.java | 137 ++++++++++++++ .../catalog/mysql/MySqlContainer.java | 175 ++++++++++++++++++ .../catalog/mysql/MysqlBaseITCASE.java | 57 ++++++ .../src/test/resources/docker/server/my.cnf | 64 +++++++ .../src/test/resources/docker/setup.sql | 41 ++++ .../streampark-flink-shims_flink-1.18/pom.xml | 2 +- 18 files changed, 1493 insertions(+), 16 deletions(-) create mode 100644 streampark-flink/streampark-flink-catalog-store/pom.xml create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java create mode 100644 streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf create mode 100644 streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql diff --git a/.idea/vcs.xml b/.idea/vcs.xml index eef8c2d277..48b6450335 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -16,17 +16,17 @@ ~ limitations under the License. --> - - - - - - - + + + + + + + \ No newline at end of file diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml index c3befc8cf0..ea053a2e5a 100644 --- a/streampark-flink/pom.xml +++ b/streampark-flink/pom.xml @@ -38,6 +38,7 @@ streampark-flink-packer streampark-flink-kubernetes streampark-flink-sql-gateway + streampark-flink-catalog-store @@ -62,7 +63,10 @@ ${jupiter.version} test - + + org.assertj + assertj-core + diff --git a/streampark-flink/streampark-flink-catalog-store/pom.xml b/streampark-flink/streampark-flink-catalog-store/pom.xml new file mode 100644 index 0000000000..78223ed550 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/pom.xml @@ -0,0 +1,138 @@ + + + + 4.0.0 + + org.apache.streampark + streampark-flink + 2.2.0-SNAPSHOT + + + streampark-flink-catalog-store + StreamPark : Flink Catalog Store + + + 19.0 + 1.18.1 + 2.15.3 + + + + + org.apache.flink + flink-annotations + ${flink.version} + provided + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + org.apache.flink + flink-shaded-jackson + + + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + org.apache.flink + flink-shaded-jackson + + + + + org.apache.flink + flink-table-api-java-uber + ${flink.version} + provided + + + org.apache.flink + flink-shaded-jackson + + + + + org.apache.flink + flink-shaded-jackson + ${flink.shaded.jackson.version}-${flink.shaded.version19} + provided + + + mysql + mysql-connector-java + test + + + org.assertj + assertj-core + test + + + org.testcontainers + mysql + 1.19.0 + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + shade + + package + + true + ${project.basedir}/target/dependency-reduced-pom.xml + + + org.apache.streampark:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java new file mode 100644 index 0000000000..11cb6906ef --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; + +/** Serialization utils */ +public final class JacksonUtils { + + private JacksonUtils() { + } + + private static final ObjectMapper MAPPER; + + static { + MAPPER = new ObjectMapper(); + MAPPER.registerModule(new SimpleModule()); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + } + + public static T read(String json, Class clazz) throws JsonProcessingException { + return MAPPER.readValue(json, clazz); + } + + public static T read(String json, TypeReference typeReference) throws JsonProcessingException { + return MAPPER.readValue(json, typeReference); + } + + public static String write(Object object) throws JsonProcessingException { + return MAPPER.writeValueAsString(object); + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java new file mode 100644 index 0000000000..feeb76841a --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog; + +import org.apache.streampark.catalog.connections.JdbcConnectionProvider; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.table.catalog.AbstractCatalogStore; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Catalog Store for Jdbc. + */ +public class JdbcCatalogStore extends AbstractCatalogStore { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalogStore.class); + private final JdbcConnectionProvider jdbcConnectionProvider; + private transient Connection connection; + private transient Statement statement; + private transient ResultSet resultSet; + + private final String catalogTableName; + + public JdbcCatalogStore(JdbcConnectionProvider jdbcConnectionProvider, String catalogTableName) { + this.jdbcConnectionProvider = jdbcConnectionProvider; + this.catalogTableName = catalogTableName; + } + + @Override + public void open() { + try { + this.connection = jdbcConnectionProvider.getOrEstablishConnection(); + this.statement = this.connection.createStatement(); + super.open(); + } catch (SQLException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + closeResultSetAndStatement(); + try { + jdbcConnectionProvider.close(); + } catch (Exception e) { + throw new CatalogException(e); + } + super.close(); + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor catalogDescriptor) throws CatalogException { + checkOpenState(); + try { + if (!contains(catalogName)) { + statement.executeUpdate(String.format( + "insert into %s (catalog_name,configuration,create_time,update_time) values ('%s','%s',now(),now())", + this.catalogTableName, catalogName, + JacksonUtils.write(catalogDescriptor.getConfiguration().toMap()))); + } else { + LOG.error("catalog {} is exist.", catalogName); + } + } catch (SQLException | JsonProcessingException e) { + throw new CatalogException(String.format("Store catalog %s failed!", catalogName), e); + } + } + + @Override + public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { + checkOpenState(); + try { + int effectRow = statement.executeUpdate( + String.format("delete from %s where catalog_name='%s'", this.catalogTableName, catalogName)); + + if (effectRow == 0 && !ignoreIfNotExists) { + throw new CatalogException(String.format("Remove catalog %s failed!", catalogName)); + } + } catch (SQLException e) { + LOG.error("Remove catalog {} failed!", catalogName, e); + throw new CatalogException(String.format("Remove catalog %s failed!", catalogName)); + } + } + + @Override + public Optional getCatalog(String catalogName) throws CatalogException { + checkOpenState(); + try { + resultSet = statement + .executeQuery( + String.format("select * from %s where catalog_name='%s'", this.catalogTableName, catalogName)); + while (resultSet.next()) { + return Optional.of(CatalogDescriptor.of(catalogName, + Configuration.fromMap(JacksonUtils.read(resultSet.getString("configuration"), Map.class)))); + } + } catch (SQLException | JsonProcessingException e) { + throw new CatalogException(String.format("Get catalog %s failed!", catalogName), e); + } + return Optional.empty(); + } + + @Override + public Set listCatalogs() throws CatalogException { + checkOpenState(); + Set catalogs = new HashSet<>(); + try { + resultSet = statement.executeQuery(String.format("select * from %s;", this.catalogTableName)); + while (resultSet.next()) { + catalogs.add(resultSet.getString("catalog_name")); + } + return catalogs; + } catch (SQLException e) { + throw new CatalogException("List catalogs failed!", e); + } + } + + @Override + public boolean contains(String catalogName) throws CatalogException { + checkOpenState(); + try { + resultSet = statement.executeQuery( + String.format("select * from %s where catalog_name='%s';", this.catalogTableName, catalogName)); + while (resultSet.next()) { + resultSet.getString("catalog_name"); + return true; + } + } catch (SQLException e) { + throw new CatalogException(String.format("Catalog %s is contains failed!", catalogName), e); + } + return false; + } + + private void closeResultSetAndStatement() { + try { + if (resultSet != null && !resultSet.isClosed()) { + resultSet.close(); + } + if (statement != null && !statement.isClosed()) { + statement.close(); + } + resultSet = null; + statement = null; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java new file mode 100644 index 0000000000..38aa6e5c72 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog; + +import org.apache.streampark.catalog.connections.JdbcConnectionOptions; +import org.apache.streampark.catalog.connections.JdbcConnectionProvider; +import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper; +import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.DRIVER; +import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.IDENTIFIER; +import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.MAX_RETRY_TIMEOUT; +import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.PASSWORD; +import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.TABLE_NAME; +import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.URL; +import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.USERNAME; + +/** Catalog Store Factory for Jdbc. */ +public class JdbcCatalogStoreFactory implements CatalogStoreFactory { + + private JdbcConnectionProvider jdbcConnectionProvider; + private transient String catalogTableName; + + @Override + public CatalogStore createCatalogStore() { + return new JdbcCatalogStore(jdbcConnectionProvider, this.catalogTableName); + } + + @Override + public void open(Context context) { + FactoryUtil.FactoryHelper factoryHelper = + createCatalogStoreFactoryHelper(this, context); + factoryHelper.validate(); + + ReadableConfig options = factoryHelper.getOptions(); + JdbcConnectionOptions jdbcConnectionOptions = + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(options.get(URL)) + .withDriverName(options.get(DRIVER)) + .withUsername(options.get(USERNAME)) + .withPassword(options.get(PASSWORD)) + .withConnectionCheckTimeoutSeconds(options.get(MAX_RETRY_TIMEOUT)) + .build(); + + this.catalogTableName = options.get(TABLE_NAME); + this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectionOptions); + } + + @Override + public void close() { + this.jdbcConnectionProvider.closeConnection(); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(URL); + options.add(DRIVER); + options.add(USERNAME); + options.add(PASSWORD); + options.add(TABLE_NAME); + return Collections.unmodifiableSet(options); + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(MAX_RETRY_TIMEOUT); + return Collections.unmodifiableSet(options); + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java new file mode 100644 index 0000000000..85a324bb51 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Catalog Store Options for Jdbc. + */ +public class JdbcCatalogStoreFactoryOptions { + + public static final String IDENTIFIER = "jdbc"; + + public static final ConfigOption URL = + ConfigOptions.key("url") + .stringType() + .noDefaultValue() + .withDescription( + "The JDBC database url."); + public static final ConfigOption TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription( + "The name of JDBC table to connect."); + public static final ConfigOption DRIVER = + ConfigOptions.key("driver") + .stringType() + .noDefaultValue() + .withDescription( + "The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL."); + public static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "The JDBC user name. 'username' and 'password' must both be specified if any of them is specified."); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "The JDBC password."); + + public static final ConfigOption MAX_RETRY_TIMEOUT = + ConfigOptions.key("max-retry-timeout") + .intType() + .defaultValue(60) + .withDescription( + "Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second."); + + private JdbcCatalogStoreFactoryOptions() { + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java new file mode 100644 index 0000000000..62f460d4e1 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog.connections; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +public class JdbcConnectionOptions implements Serializable { + + public static final String USER_KEY = "user"; + public static final String PASSWORD_KEY = "password"; + + private static final long serialVersionUID = 1L; + + protected final String url; + @Nullable + protected final String driverName; + protected final int connectionCheckTimeoutSeconds; + @Nonnull + protected final Properties properties; + + protected JdbcConnectionOptions( + String url, + @Nullable String driverName, + int connectionCheckTimeoutSeconds, + @Nonnull Properties properties) { + Preconditions.checkArgument( + connectionCheckTimeoutSeconds > 0, + "Connection check timeout seconds shouldn't be smaller than 1"); + this.url = Preconditions.checkNotNull(url, "jdbc url is empty"); + this.driverName = driverName; + this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds; + this.properties = + Preconditions.checkNotNull(properties, "Connection properties must be non-null"); + } + + public String getDbURL() { + return url; + } + + @Nullable + public String getDriverName() { + return driverName; + } + + public Optional getUsername() { + return Optional.ofNullable(properties.getProperty(USER_KEY)); + } + + public Optional getPassword() { + return Optional.ofNullable(properties.getProperty(PASSWORD_KEY)); + } + + public int getConnectionCheckTimeoutSeconds() { + return connectionCheckTimeoutSeconds; + } + + @Nonnull + public Properties getProperties() { + return properties; + } + + @Nonnull + public static Properties getBriefAuthProperties(String user, String password) { + final Properties result = new Properties(); + if (Objects.nonNull(user)) { + result.put(USER_KEY, user); + } + if (Objects.nonNull(password)) { + result.put(PASSWORD_KEY, password); + } + return result; + } + + /** Builder for {@link JdbcConnectionOptions}. */ + public static class JdbcConnectionOptionsBuilder { + + private String url; + private String driverName; + private int connectionCheckTimeoutSeconds = 60; + private final Properties properties = new Properties(); + + public JdbcConnectionOptionsBuilder withUrl(String url) { + this.url = url; + return this; + } + + public JdbcConnectionOptionsBuilder withDriverName(String driverName) { + this.driverName = driverName; + return this; + } + + public JdbcConnectionOptionsBuilder withProperty(String propKey, String propVal) { + Preconditions.checkNotNull(propKey, "Connection property key mustn't be null"); + Preconditions.checkNotNull(propVal, "Connection property value mustn't be null"); + this.properties.put(propKey, propVal); + return this; + } + + public JdbcConnectionOptionsBuilder withUsername(String username) { + if (Objects.nonNull(username)) { + this.properties.put(USER_KEY, username); + } + return this; + } + + public JdbcConnectionOptionsBuilder withPassword(String password) { + if (Objects.nonNull(password)) { + this.properties.put(PASSWORD_KEY, password); + } + return this; + } + + /** + * Set the maximum timeout between retries, default is 60 seconds. + * + * @param connectionCheckTimeoutSeconds the timeout seconds, shouldn't smaller than 1 + * second. + */ + public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds( + int connectionCheckTimeoutSeconds) { + this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds; + return this; + } + + public JdbcConnectionOptions build() { + return new JdbcConnectionOptions( + url, driverName, connectionCheckTimeoutSeconds, properties); + } + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java new file mode 100644 index 0000000000..c49cb1fc7d --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog.connections; + +import org.apache.flink.annotation.PublicEvolving; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +/** JDBC connection provider. */ +@PublicEvolving +public interface JdbcConnectionProvider extends Serializable, AutoCloseable { + + /** + * Get existing connection. + * + * @return existing connection + */ + @Nullable + Connection getConnection(); + + /** + * Get existing connection properties. + * + * @return existing connection properties + */ + @Nonnull + default Properties getProperties() { + throw new UnsupportedOperationException(); + } + + /** + * Check whether possible existing connection is valid or not through {@link + * Connection#isValid(int)}. + * + * @return true if existing connection is valid + * @throws SQLException sql exception throw from {@link Connection#isValid(int)} + */ + boolean isConnectionValid() throws SQLException; + + /** + * Get existing connection or establish an new one if there is none. + * + * @return existing connection or newly established connection + * @throws SQLException sql exception + * @throws ClassNotFoundException driver class not found + */ + Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException; + + /** Close possible existing connection. */ + void closeConnection(); + + /** + * Close possible existing connection and establish an new one. + * + * @return newly established connection + * @throws SQLException sql exception + * @throws ClassNotFoundException driver class not found + */ + Connection reestablishConnection() throws SQLException, ClassNotFoundException; + + default void close() throws Exception { + closeConnection(); + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java new file mode 100644 index 0000000000..e78c0bfbe8 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog.connections; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Enumeration; +import java.util.Properties; + +/** Simple JDBC connection provider. */ +@NotThreadSafe +@PublicEvolving +public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class); + + private static final long serialVersionUID = 1L; + + private final JdbcConnectionOptions jdbcOptions; + + private transient Driver loadedDriver; + private transient Connection connection; + + static { + // Load DriverManager first to avoid deadlock between DriverManager's + // static initialization block and specific driver class's static + // initialization block when two different driver classes are loading + // concurrently using Class.forName while DriverManager is uninitialized + // before. + // + // This could happen in JDK 8 but not above as driver loading has been + // moved out of DriverManager's static initialization block since JDK 9. + DriverManager.getDrivers(); + } + + public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) { + this.jdbcOptions = jdbcOptions; + } + + @Override + public Connection getConnection() { + return connection; + } + + @Nonnull + @Override + public Properties getProperties() { + return jdbcOptions.getProperties(); + } + + @Override + public boolean isConnectionValid() throws SQLException { + return connection != null + && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds()); + } + + private Driver loadDriver(String driverName) throws SQLException, ClassNotFoundException { + Preconditions.checkNotNull(driverName); + Enumeration drivers = DriverManager.getDrivers(); + while (drivers.hasMoreElements()) { + Driver driver = drivers.nextElement(); + if (driver.getClass().getName().equals(driverName)) { + return driver; + } + } + // We could reach here for reasons: + // * Class loader hell of DriverManager(see JDK-8146872). + // * driver is not installed as a service provider. + Class clazz = + Class.forName(driverName, true, Thread.currentThread().getContextClassLoader()); + try { + return (Driver) clazz.newInstance(); + } catch (Exception ex) { + throw new SQLException("Fail to create driver of class " + driverName, ex); + } + } + + private Driver getLoadedDriver() throws SQLException, ClassNotFoundException { + if (loadedDriver == null) { + loadedDriver = loadDriver(jdbcOptions.getDriverName()); + } + return loadedDriver; + } + + @Override + public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException { + if (isConnectionValid()) { + return connection; + } + if (jdbcOptions.getDriverName() == null) { + connection = DriverManager.getConnection(jdbcOptions.getDbURL(), getProperties()); + } else { + Driver driver = getLoadedDriver(); + connection = driver.connect(jdbcOptions.getDbURL(), getProperties()); + if (connection == null) { + // Throw same exception as DriverManager.getConnection when no driver found to match + // caller expectation. + throw new SQLException( + "No suitable driver found for " + jdbcOptions.getDbURL(), "08001"); + } + } + return connection; + } + + @Override + public void closeConnection() { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.warn("JDBC connection close failed.", e); + } finally { + connection = null; + } + } + } + + @Override + public Connection reestablishConnection() throws SQLException, ClassNotFoundException { + closeConnection(); + return getOrEstablishConnection(); + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..9bcdf4448d --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.streampark.catalog.JdbcCatalogStoreFactory diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java new file mode 100644 index 0000000000..59842e1b4e --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog; + +import org.apache.streampark.catalog.mysql.MysqlBaseITCASE; + +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +public class JdbcCatalogStoreFactoryTest extends MysqlBaseITCASE { + + @org.junit.Test + public void testCatalogStoreFactoryDiscovery() { + + String factoryIdentifier = JdbcCatalogStoreFactoryOptions.IDENTIFIER; + Map options = new HashMap<>(); + options.put("url", MYSQL_CONTAINER.getJdbcUrl()); + options.put("table-name", "t_mysql_catalog"); + options.put("driver", MYSQL_CONTAINER.getDriverClassName()); + options.put("username", MYSQL_CONTAINER.getUsername()); + options.put("password", MYSQL_CONTAINER.getPassword()); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final FactoryUtil.DefaultCatalogStoreContext discoveryContext = + new FactoryUtil.DefaultCatalogStoreContext(options, null, classLoader); + final CatalogStoreFactory factory = + FactoryUtil.discoverFactory( + classLoader, CatalogStoreFactory.class, factoryIdentifier); + factory.open(discoveryContext); + + CatalogStore catalogStore = factory.createCatalogStore(); + assertThat(catalogStore instanceof JdbcCatalogStore).isTrue(); + + factory.close(); + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java new file mode 100644 index 0000000000..6267688632 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog; + +import org.apache.streampark.catalog.connections.JdbcConnectionOptions; +import org.apache.streampark.catalog.connections.JdbcConnectionProvider; +import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider; +import org.apache.streampark.catalog.mysql.MysqlBaseITCASE; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import org.assertj.core.api.ThrowableAssert; + +import java.util.Set; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +public class JdbcCatalogStoreTest extends MysqlBaseITCASE { + + private static final String DUMMY = "dummy"; + private static final CatalogDescriptor DUMMY_CATALOG; + + static { + Configuration conf = new Configuration(); + conf.set(CommonCatalogOptions.CATALOG_TYPE, DUMMY); + conf.set(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE, "dummy_db"); + + DUMMY_CATALOG = CatalogDescriptor.of(DUMMY, conf); + } + @org.junit.Test + public void testNotOpened() { + CatalogStore catalogStore = initCatalogStore(); + + assertCatalogStoreNotOpened(catalogStore::listCatalogs); + assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG)); + assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true)); + } + + @org.junit.Test + public void testStore() { + CatalogStore catalogStore = initCatalogStore(); + catalogStore.open(); + + catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG); + + Set storedCatalogs = catalogStore.listCatalogs(); + assertThat(storedCatalogs.size()).isEqualTo(1); + assertThat(storedCatalogs.contains(DUMMY)).isTrue(); + } + + @org.junit.Test + public void testRemoveExisting() { + CatalogStore catalogStore = initCatalogStore(); + catalogStore.open(); + + catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG); + assertThat(catalogStore.listCatalogs().size()).isEqualTo(1); + + catalogStore.removeCatalog(DUMMY, false); + assertThat(catalogStore.listCatalogs().size()).isEqualTo(0); + assertThat(catalogStore.contains(DUMMY)).isFalse(); + } + + @org.junit.Test + public void testRemoveNonExisting() { + CatalogStore catalogStore = initCatalogStore(); + catalogStore.open(); + + catalogStore.removeCatalog(DUMMY, true); + + assertThatThrownBy(() -> catalogStore.removeCatalog(DUMMY, false)) + .isInstanceOf(CatalogException.class) + .hasMessageContaining( + "Remove catalog " + DUMMY + " failed!"); + } + + @org.junit.Test + public void testClosed() { + CatalogStore catalogStore = initCatalogStore(); + + catalogStore.open(); + + catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG); + assertThat(catalogStore.listCatalogs().size()).isEqualTo(1); + + catalogStore.close(); + + assertCatalogStoreNotOpened(catalogStore::listCatalogs); + assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG)); + assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true)); + } + + private void assertCatalogStoreNotOpened( + ThrowableAssert.ThrowingCallable shouldRaiseThrowable) { + assertThatThrownBy(shouldRaiseThrowable) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("CatalogStore is not opened yet."); + } + + private JdbcCatalogStore initCatalogStore() { + JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(MYSQL_CONTAINER.getJdbcUrl()) + .withDriverName(MYSQL_CONTAINER.getDriverClassName()) + .withUsername(MYSQL_CONTAINER.getUsername()) + .withPassword(MYSQL_CONTAINER.getPassword()) + .build(); + + JdbcConnectionProvider jdbcConnectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectionOptions); + return new JdbcCatalogStore(jdbcConnectionProvider, "t_mysql_catalog"); + + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java new file mode 100644 index 0000000000..94a5c2f7e7 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog.mysql; + +import org.testcontainers.containers.ContainerLaunchException; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.utility.DockerImageName; + +import java.util.HashSet; +import java.util.Set; + +public class MySqlContainer extends JdbcDatabaseContainer { + + public static final String IMAGE = "mysql"; + + public static final String MYSQL_VERSION = "8.0"; + public static final Integer MYSQL_PORT = 3306; + + private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF"; + private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL"; + private static final String MYSQL_ROOT_USER = "root"; + + private String databaseName = "test"; + private String username = "test"; + private String password = "test"; + + public MySqlContainer() { + this(MYSQL_VERSION); + } + + public MySqlContainer(String version) { + super(DockerImageName.parse(IMAGE + ":" + version)); + addExposedPort(MYSQL_PORT); + } + + @Override + protected Set getLivenessCheckPorts() { + return new HashSet<>(getMappedPort(MYSQL_PORT)); + } + + @Override + protected void configure() { + optionallyMapResourceParameterAsVolume( + MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf"); + + if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) { + optionallyMapResourceParameterAsVolume( + SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A"); + } + + addEnv("MYSQL_DATABASE", databaseName); + addEnv("MYSQL_USER", username); + if (password != null && !password.isEmpty()) { + addEnv("MYSQL_PASSWORD", password); + addEnv("MYSQL_ROOT_PASSWORD", password); + } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) { + addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes"); + } else { + throw new ContainerLaunchException( + "Empty password can be used only with the root user"); + } + setStartupAttempts(3); + } + + @Override + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + getHost() + + ":" + + getDatabasePort() + + "/" + + databaseName + + additionalUrlParams; + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(databaseName); + } + + public int getDatabasePort() { + return getMappedPort(MYSQL_PORT); + } + + @Override + protected String constructUrlForConnection(String queryString) { + String url = super.constructUrlForConnection(queryString); + + if (!url.contains("useSSL=")) { + String separator = url.contains("?") ? "&" : "?"; + url = url + separator + "useSSL=false"; + } + + if (!url.contains("allowPublicKeyRetrieval=")) { + url = url + "&allowPublicKeyRetrieval=true"; + } + + return url; + } + + @Override + public String getDatabaseName() { + return databaseName; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public String getPassword() { + return password; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } + + @SuppressWarnings("unchecked") + public MySqlContainer withConfigurationOverride(String s) { + parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s); + return this; + } + + @SuppressWarnings("unchecked") + public MySqlContainer withSetupSQL(String sqlPath) { + parameters.put(SETUP_SQL_PARAM_NAME, sqlPath); + return this; + } + + @Override + public MySqlContainer withDatabaseName(final String databaseName) { + this.databaseName = databaseName; + return this; + } + + @Override + public MySqlContainer withUsername(final String username) { + this.username = username; + return this; + } + + @Override + public MySqlContainer withPassword(final String password) { + this.password = password; + return this; + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java new file mode 100644 index 0000000000..799335b96d --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.catalog.mysql; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.util.stream.Stream; + +public class MysqlBaseITCASE { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlBaseITCASE.class); + protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer("docker/server/my.cnf"); + + @org.junit.BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + @org.junit.AfterClass + public static void stopContainers() { + LOG.info("Stopping containers..."); + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.stop(); + } + LOG.info("Containers are stopped."); + } + + protected static MySqlContainer createMySqlContainer(String configPath) { + return (MySqlContainer) new MySqlContainer(MySqlContainer.MYSQL_VERSION) + .withConfigurationOverride(configPath) + .withSetupSQL("docker/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } +} diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf new file mode 100644 index 0000000000..953ffe8f91 --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems. +server-id = 223344 +log_bin = mysql-bin +binlog_format = row +# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the exception +# message when the binlog expires in the server. +binlog_expire_logs_seconds = 1 +max_binlog_size = 4096 + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql new file mode 100644 index 0000000000..1b22107deb --- /dev/null +++ b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql @@ -0,0 +1,41 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- In production you would almost certainly limit the replication user must be on the follower (slave) machine, +-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. +-- However, in this database we'll grant 2 users different privileges: +-- +-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing) +-- 2) 'mysqluser' - all privileges +-- +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%'; +CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw'; +GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%'; + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: emptydb +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE emptydb; +USE flink-test; +create table t_mysql_catalog ( + `id` bigint NOT NULL AUTO_INCREMENT, + `catalog_name` varchar(255) NOT NULL, + `configuration` text, + `create_time` datetime DEFAULT NULL, + `update_time` datetime DEFAULT NULL, + PRIMARY KEY (`id`) USING BTREE, + UNIQUE INDEX `uniq_catalog_name` (`catalog_name`) USING BTREE +) + diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml index db8ff46cf4..d13f26c1b3 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml @@ -29,7 +29,7 @@ StreamPark : Flink Shims 1.18 - 1.18.0 + 1.18.1