+
+
\ No newline at end of file
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index c3befc8cf0..ea053a2e5a 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -38,6 +38,7 @@
streampark-flink-packerstreampark-flink-kubernetesstreampark-flink-sql-gateway
+ streampark-flink-catalog-store
@@ -62,7 +63,10 @@
${jupiter.version}test
-
+
+ org.assertj
+ assertj-core
+
diff --git a/streampark-flink/streampark-flink-catalog-store/pom.xml b/streampark-flink/streampark-flink-catalog-store/pom.xml
new file mode 100644
index 0000000000..78223ed550
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/pom.xml
@@ -0,0 +1,138 @@
+
+
+
+ 4.0.0
+
+ org.apache.streampark
+ streampark-flink
+ 2.2.0-SNAPSHOT
+
+
+ streampark-flink-catalog-store
+ StreamPark : Flink Catalog Store
+
+
+ 19.0
+ 1.18.1
+ 2.15.3
+
+
+
+
+ org.apache.flink
+ flink-annotations
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-shaded-jackson
+
+
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-shaded-jackson
+
+
+
+
+ org.apache.flink
+ flink-table-api-java-uber
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-shaded-jackson
+
+
+
+
+ org.apache.flink
+ flink-shaded-jackson
+ ${flink.shaded.jackson.version}-${flink.shaded.version19}
+ provided
+
+
+ mysql
+ mysql-connector-java
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.testcontainers
+ mysql
+ 1.19.0
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ shade
+
+ package
+
+ true
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+ org.apache.streampark:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
new file mode 100644
index 0000000000..11cb6906ef
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+/** Serialization utils */
+public final class JacksonUtils {
+
+ private JacksonUtils() {
+ }
+
+ private static final ObjectMapper MAPPER;
+
+ static {
+ MAPPER = new ObjectMapper();
+ MAPPER.registerModule(new SimpleModule());
+ MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+ }
+
+ public static T read(String json, Class clazz) throws JsonProcessingException {
+ return MAPPER.readValue(json, clazz);
+ }
+
+ public static T read(String json, TypeReference typeReference) throws JsonProcessingException {
+ return MAPPER.readValue(json, typeReference);
+ }
+
+ public static String write(Object object) throws JsonProcessingException {
+ return MAPPER.writeValueAsString(object);
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
new file mode 100644
index 0000000000..feeb76841a
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Catalog Store for Jdbc.
+ */
+public class JdbcCatalogStore extends AbstractCatalogStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalogStore.class);
+ private final JdbcConnectionProvider jdbcConnectionProvider;
+ private transient Connection connection;
+ private transient Statement statement;
+ private transient ResultSet resultSet;
+
+ private final String catalogTableName;
+
+ public JdbcCatalogStore(JdbcConnectionProvider jdbcConnectionProvider, String catalogTableName) {
+ this.jdbcConnectionProvider = jdbcConnectionProvider;
+ this.catalogTableName = catalogTableName;
+ }
+
+ @Override
+ public void open() {
+ try {
+ this.connection = jdbcConnectionProvider.getOrEstablishConnection();
+ this.statement = this.connection.createStatement();
+ super.open();
+ } catch (SQLException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ closeResultSetAndStatement();
+ try {
+ jdbcConnectionProvider.close();
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
+ super.close();
+ }
+
+ @Override
+ public void storeCatalog(String catalogName, CatalogDescriptor catalogDescriptor) throws CatalogException {
+ checkOpenState();
+ try {
+ if (!contains(catalogName)) {
+ statement.executeUpdate(String.format(
+ "insert into %s (catalog_name,configuration,create_time,update_time) values ('%s','%s',now(),now())",
+ this.catalogTableName, catalogName,
+ JacksonUtils.write(catalogDescriptor.getConfiguration().toMap())));
+ } else {
+ LOG.error("catalog {} is exist.", catalogName);
+ }
+ } catch (SQLException | JsonProcessingException e) {
+ throw new CatalogException(String.format("Store catalog %s failed!", catalogName), e);
+ }
+ }
+
+ @Override
+ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException {
+ checkOpenState();
+ try {
+ int effectRow = statement.executeUpdate(
+ String.format("delete from %s where catalog_name='%s'", this.catalogTableName, catalogName));
+
+ if (effectRow == 0 && !ignoreIfNotExists) {
+ throw new CatalogException(String.format("Remove catalog %s failed!", catalogName));
+ }
+ } catch (SQLException e) {
+ LOG.error("Remove catalog {} failed!", catalogName, e);
+ throw new CatalogException(String.format("Remove catalog %s failed!", catalogName));
+ }
+ }
+
+ @Override
+ public Optional getCatalog(String catalogName) throws CatalogException {
+ checkOpenState();
+ try {
+ resultSet = statement
+ .executeQuery(
+ String.format("select * from %s where catalog_name='%s'", this.catalogTableName, catalogName));
+ while (resultSet.next()) {
+ return Optional.of(CatalogDescriptor.of(catalogName,
+ Configuration.fromMap(JacksonUtils.read(resultSet.getString("configuration"), Map.class))));
+ }
+ } catch (SQLException | JsonProcessingException e) {
+ throw new CatalogException(String.format("Get catalog %s failed!", catalogName), e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Set listCatalogs() throws CatalogException {
+ checkOpenState();
+ Set catalogs = new HashSet<>();
+ try {
+ resultSet = statement.executeQuery(String.format("select * from %s;", this.catalogTableName));
+ while (resultSet.next()) {
+ catalogs.add(resultSet.getString("catalog_name"));
+ }
+ return catalogs;
+ } catch (SQLException e) {
+ throw new CatalogException("List catalogs failed!", e);
+ }
+ }
+
+ @Override
+ public boolean contains(String catalogName) throws CatalogException {
+ checkOpenState();
+ try {
+ resultSet = statement.executeQuery(
+ String.format("select * from %s where catalog_name='%s';", this.catalogTableName, catalogName));
+ while (resultSet.next()) {
+ resultSet.getString("catalog_name");
+ return true;
+ }
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Catalog %s is contains failed!", catalogName), e);
+ }
+ return false;
+ }
+
+ private void closeResultSetAndStatement() {
+ try {
+ if (resultSet != null && !resultSet.isClosed()) {
+ resultSet.close();
+ }
+ if (statement != null && !statement.isClosed()) {
+ statement.close();
+ }
+ resultSet = null;
+ statement = null;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
new file mode 100644
index 0000000000..38aa6e5c72
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.DRIVER;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.IDENTIFIER;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.MAX_RETRY_TIMEOUT;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.PASSWORD;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.TABLE_NAME;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.URL;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.USERNAME;
+
+/** Catalog Store Factory for Jdbc. */
+public class JdbcCatalogStoreFactory implements CatalogStoreFactory {
+
+ private JdbcConnectionProvider jdbcConnectionProvider;
+ private transient String catalogTableName;
+
+ @Override
+ public CatalogStore createCatalogStore() {
+ return new JdbcCatalogStore(jdbcConnectionProvider, this.catalogTableName);
+ }
+
+ @Override
+ public void open(Context context) {
+ FactoryUtil.FactoryHelper factoryHelper =
+ createCatalogStoreFactoryHelper(this, context);
+ factoryHelper.validate();
+
+ ReadableConfig options = factoryHelper.getOptions();
+ JdbcConnectionOptions jdbcConnectionOptions =
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(options.get(URL))
+ .withDriverName(options.get(DRIVER))
+ .withUsername(options.get(USERNAME))
+ .withPassword(options.get(PASSWORD))
+ .withConnectionCheckTimeoutSeconds(options.get(MAX_RETRY_TIMEOUT))
+ .build();
+
+ this.catalogTableName = options.get(TABLE_NAME);
+ this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectionOptions);
+ }
+
+ @Override
+ public void close() {
+ this.jdbcConnectionProvider.closeConnection();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> options = new HashSet<>();
+ options.add(URL);
+ options.add(DRIVER);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(TABLE_NAME);
+ return Collections.unmodifiableSet(options);
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> options = new HashSet<>();
+ options.add(MAX_RETRY_TIMEOUT);
+ return Collections.unmodifiableSet(options);
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
new file mode 100644
index 0000000000..85a324bb51
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Catalog Store Options for Jdbc.
+ */
+public class JdbcCatalogStoreFactoryOptions {
+
+ public static final String IDENTIFIER = "jdbc";
+
+ public static final ConfigOption URL =
+ ConfigOptions.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The JDBC database url.");
+ public static final ConfigOption TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The name of JDBC table to connect.");
+ public static final ConfigOption DRIVER =
+ ConfigOptions.key("driver")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.");
+ public static final ConfigOption USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The JDBC user name. 'username' and 'password' must both be specified if any of them is specified.");
+
+ public static final ConfigOption PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The JDBC password.");
+
+ public static final ConfigOption MAX_RETRY_TIMEOUT =
+ ConfigOptions.key("max-retry-timeout")
+ .intType()
+ .defaultValue(60)
+ .withDescription(
+ "Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second.");
+
+ private JdbcCatalogStoreFactoryOptions() {
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
new file mode 100644
index 0000000000..62f460d4e1
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+public class JdbcConnectionOptions implements Serializable {
+
+ public static final String USER_KEY = "user";
+ public static final String PASSWORD_KEY = "password";
+
+ private static final long serialVersionUID = 1L;
+
+ protected final String url;
+ @Nullable
+ protected final String driverName;
+ protected final int connectionCheckTimeoutSeconds;
+ @Nonnull
+ protected final Properties properties;
+
+ protected JdbcConnectionOptions(
+ String url,
+ @Nullable String driverName,
+ int connectionCheckTimeoutSeconds,
+ @Nonnull Properties properties) {
+ Preconditions.checkArgument(
+ connectionCheckTimeoutSeconds > 0,
+ "Connection check timeout seconds shouldn't be smaller than 1");
+ this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
+ this.driverName = driverName;
+ this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+ this.properties =
+ Preconditions.checkNotNull(properties, "Connection properties must be non-null");
+ }
+
+ public String getDbURL() {
+ return url;
+ }
+
+ @Nullable
+ public String getDriverName() {
+ return driverName;
+ }
+
+ public Optional getUsername() {
+ return Optional.ofNullable(properties.getProperty(USER_KEY));
+ }
+
+ public Optional getPassword() {
+ return Optional.ofNullable(properties.getProperty(PASSWORD_KEY));
+ }
+
+ public int getConnectionCheckTimeoutSeconds() {
+ return connectionCheckTimeoutSeconds;
+ }
+
+ @Nonnull
+ public Properties getProperties() {
+ return properties;
+ }
+
+ @Nonnull
+ public static Properties getBriefAuthProperties(String user, String password) {
+ final Properties result = new Properties();
+ if (Objects.nonNull(user)) {
+ result.put(USER_KEY, user);
+ }
+ if (Objects.nonNull(password)) {
+ result.put(PASSWORD_KEY, password);
+ }
+ return result;
+ }
+
+ /** Builder for {@link JdbcConnectionOptions}. */
+ public static class JdbcConnectionOptionsBuilder {
+
+ private String url;
+ private String driverName;
+ private int connectionCheckTimeoutSeconds = 60;
+ private final Properties properties = new Properties();
+
+ public JdbcConnectionOptionsBuilder withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
+ this.driverName = driverName;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withProperty(String propKey, String propVal) {
+ Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
+ Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");
+ this.properties.put(propKey, propVal);
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withUsername(String username) {
+ if (Objects.nonNull(username)) {
+ this.properties.put(USER_KEY, username);
+ }
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withPassword(String password) {
+ if (Objects.nonNull(password)) {
+ this.properties.put(PASSWORD_KEY, password);
+ }
+ return this;
+ }
+
+ /**
+ * Set the maximum timeout between retries, default is 60 seconds.
+ *
+ * @param connectionCheckTimeoutSeconds the timeout seconds, shouldn't smaller than 1
+ * second.
+ */
+ public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds(
+ int connectionCheckTimeoutSeconds) {
+ this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+ return this;
+ }
+
+ public JdbcConnectionOptions build() {
+ return new JdbcConnectionOptions(
+ url, driverName, connectionCheckTimeoutSeconds, properties);
+ }
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
new file mode 100644
index 0000000000..c49cb1fc7d
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/** JDBC connection provider. */
+@PublicEvolving
+public interface JdbcConnectionProvider extends Serializable, AutoCloseable {
+
+ /**
+ * Get existing connection.
+ *
+ * @return existing connection
+ */
+ @Nullable
+ Connection getConnection();
+
+ /**
+ * Get existing connection properties.
+ *
+ * @return existing connection properties
+ */
+ @Nonnull
+ default Properties getProperties() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Check whether possible existing connection is valid or not through {@link
+ * Connection#isValid(int)}.
+ *
+ * @return true if existing connection is valid
+ * @throws SQLException sql exception throw from {@link Connection#isValid(int)}
+ */
+ boolean isConnectionValid() throws SQLException;
+
+ /**
+ * Get existing connection or establish an new one if there is none.
+ *
+ * @return existing connection or newly established connection
+ * @throws SQLException sql exception
+ * @throws ClassNotFoundException driver class not found
+ */
+ Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
+
+ /** Close possible existing connection. */
+ void closeConnection();
+
+ /**
+ * Close possible existing connection and establish an new one.
+ *
+ * @return newly established connection
+ * @throws SQLException sql exception
+ * @throws ClassNotFoundException driver class not found
+ */
+ Connection reestablishConnection() throws SQLException, ClassNotFoundException;
+
+ default void close() throws Exception {
+ closeConnection();
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
new file mode 100644
index 0000000000..e78c0bfbe8
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.Properties;
+
+/** Simple JDBC connection provider. */
+@NotThreadSafe
+@PublicEvolving
+public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final JdbcConnectionOptions jdbcOptions;
+
+ private transient Driver loadedDriver;
+ private transient Connection connection;
+
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK 9.
+ DriverManager.getDrivers();
+ }
+
+ public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
+ this.jdbcOptions = jdbcOptions;
+ }
+
+ @Override
+ public Connection getConnection() {
+ return connection;
+ }
+
+ @Nonnull
+ @Override
+ public Properties getProperties() {
+ return jdbcOptions.getProperties();
+ }
+
+ @Override
+ public boolean isConnectionValid() throws SQLException {
+ return connection != null
+ && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
+ }
+
+ private Driver loadDriver(String driverName) throws SQLException, ClassNotFoundException {
+ Preconditions.checkNotNull(driverName);
+ Enumeration drivers = DriverManager.getDrivers();
+ while (drivers.hasMoreElements()) {
+ Driver driver = drivers.nextElement();
+ if (driver.getClass().getName().equals(driverName)) {
+ return driver;
+ }
+ }
+ // We could reach here for reasons:
+ // * Class loader hell of DriverManager(see JDK-8146872).
+ // * driver is not installed as a service provider.
+ Class> clazz =
+ Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
+ try {
+ return (Driver) clazz.newInstance();
+ } catch (Exception ex) {
+ throw new SQLException("Fail to create driver of class " + driverName, ex);
+ }
+ }
+
+ private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
+ if (loadedDriver == null) {
+ loadedDriver = loadDriver(jdbcOptions.getDriverName());
+ }
+ return loadedDriver;
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
+ if (isConnectionValid()) {
+ return connection;
+ }
+ if (jdbcOptions.getDriverName() == null) {
+ connection = DriverManager.getConnection(jdbcOptions.getDbURL(), getProperties());
+ } else {
+ Driver driver = getLoadedDriver();
+ connection = driver.connect(jdbcOptions.getDbURL(), getProperties());
+ if (connection == null) {
+ // Throw same exception as DriverManager.getConnection when no driver found to match
+ // caller expectation.
+ throw new SQLException(
+ "No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
+ }
+ }
+ return connection;
+ }
+
+ @Override
+ public void closeConnection() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("JDBC connection close failed.", e);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ @Override
+ public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
+ closeConnection();
+ return getOrEstablishConnection();
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..9bcdf4448d
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.streampark.catalog.JdbcCatalogStoreFactory
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
new file mode 100644
index 0000000000..59842e1b4e
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
+
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+public class JdbcCatalogStoreFactoryTest extends MysqlBaseITCASE {
+
+ @org.junit.Test
+ public void testCatalogStoreFactoryDiscovery() {
+
+ String factoryIdentifier = JdbcCatalogStoreFactoryOptions.IDENTIFIER;
+ Map options = new HashMap<>();
+ options.put("url", MYSQL_CONTAINER.getJdbcUrl());
+ options.put("table-name", "t_mysql_catalog");
+ options.put("driver", MYSQL_CONTAINER.getDriverClassName());
+ options.put("username", MYSQL_CONTAINER.getUsername());
+ options.put("password", MYSQL_CONTAINER.getPassword());
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ final FactoryUtil.DefaultCatalogStoreContext discoveryContext =
+ new FactoryUtil.DefaultCatalogStoreContext(options, null, classLoader);
+ final CatalogStoreFactory factory =
+ FactoryUtil.discoverFactory(
+ classLoader, CatalogStoreFactory.class, factoryIdentifier);
+ factory.open(discoveryContext);
+
+ CatalogStore catalogStore = factory.createCatalogStore();
+ assertThat(catalogStore instanceof JdbcCatalogStore).isTrue();
+
+ factory.close();
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
new file mode 100644
index 0000000000..6267688632
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
+import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.assertj.core.api.ThrowableAssert;
+
+import java.util.Set;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+public class JdbcCatalogStoreTest extends MysqlBaseITCASE {
+
+ private static final String DUMMY = "dummy";
+ private static final CatalogDescriptor DUMMY_CATALOG;
+
+ static {
+ Configuration conf = new Configuration();
+ conf.set(CommonCatalogOptions.CATALOG_TYPE, DUMMY);
+ conf.set(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE, "dummy_db");
+
+ DUMMY_CATALOG = CatalogDescriptor.of(DUMMY, conf);
+ }
+ @org.junit.Test
+ public void testNotOpened() {
+ CatalogStore catalogStore = initCatalogStore();
+
+ assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+ assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG));
+ assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true));
+ }
+
+ @org.junit.Test
+ public void testStore() {
+ CatalogStore catalogStore = initCatalogStore();
+ catalogStore.open();
+
+ catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+
+ Set storedCatalogs = catalogStore.listCatalogs();
+ assertThat(storedCatalogs.size()).isEqualTo(1);
+ assertThat(storedCatalogs.contains(DUMMY)).isTrue();
+ }
+
+ @org.junit.Test
+ public void testRemoveExisting() {
+ CatalogStore catalogStore = initCatalogStore();
+ catalogStore.open();
+
+ catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+ assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
+
+ catalogStore.removeCatalog(DUMMY, false);
+ assertThat(catalogStore.listCatalogs().size()).isEqualTo(0);
+ assertThat(catalogStore.contains(DUMMY)).isFalse();
+ }
+
+ @org.junit.Test
+ public void testRemoveNonExisting() {
+ CatalogStore catalogStore = initCatalogStore();
+ catalogStore.open();
+
+ catalogStore.removeCatalog(DUMMY, true);
+
+ assertThatThrownBy(() -> catalogStore.removeCatalog(DUMMY, false))
+ .isInstanceOf(CatalogException.class)
+ .hasMessageContaining(
+ "Remove catalog " + DUMMY + " failed!");
+ }
+
+ @org.junit.Test
+ public void testClosed() {
+ CatalogStore catalogStore = initCatalogStore();
+
+ catalogStore.open();
+
+ catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+ assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
+
+ catalogStore.close();
+
+ assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+ assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG));
+ assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true));
+ }
+
+ private void assertCatalogStoreNotOpened(
+ ThrowableAssert.ThrowingCallable shouldRaiseThrowable) {
+ assertThatThrownBy(shouldRaiseThrowable)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("CatalogStore is not opened yet.");
+ }
+
+ private JdbcCatalogStore initCatalogStore() {
+ JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(MYSQL_CONTAINER.getJdbcUrl())
+ .withDriverName(MYSQL_CONTAINER.getDriverClassName())
+ .withUsername(MYSQL_CONTAINER.getUsername())
+ .withPassword(MYSQL_CONTAINER.getPassword())
+ .build();
+
+ JdbcConnectionProvider jdbcConnectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectionOptions);
+ return new JdbcCatalogStore(jdbcConnectionProvider, "t_mysql_catalog");
+
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
new file mode 100644
index 0000000000..94a5c2f7e7
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.mysql;
+
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MySqlContainer extends JdbcDatabaseContainer {
+
+ public static final String IMAGE = "mysql";
+
+ public static final String MYSQL_VERSION = "8.0";
+ public static final Integer MYSQL_PORT = 3306;
+
+ private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+ private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
+ private static final String MYSQL_ROOT_USER = "root";
+
+ private String databaseName = "test";
+ private String username = "test";
+ private String password = "test";
+
+ public MySqlContainer() {
+ this(MYSQL_VERSION);
+ }
+
+ public MySqlContainer(String version) {
+ super(DockerImageName.parse(IMAGE + ":" + version));
+ addExposedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected Set getLivenessCheckPorts() {
+ return new HashSet<>(getMappedPort(MYSQL_PORT));
+ }
+
+ @Override
+ protected void configure() {
+ optionallyMapResourceParameterAsVolume(
+ MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf");
+
+ if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+ optionallyMapResourceParameterAsVolume(
+ SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
+ }
+
+ addEnv("MYSQL_DATABASE", databaseName);
+ addEnv("MYSQL_USER", username);
+ if (password != null && !password.isEmpty()) {
+ addEnv("MYSQL_PASSWORD", password);
+ addEnv("MYSQL_ROOT_PASSWORD", password);
+ } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
+ addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
+ } else {
+ throw new ContainerLaunchException(
+ "Empty password can be used only with the root user");
+ }
+ setStartupAttempts(3);
+ }
+
+ @Override
+ public String getDriverClassName() {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ return "com.mysql.cj.jdbc.Driver";
+ } catch (ClassNotFoundException e) {
+ return "com.mysql.jdbc.Driver";
+ }
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getDatabasePort()
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl(databaseName);
+ }
+
+ public int getDatabasePort() {
+ return getMappedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected String constructUrlForConnection(String queryString) {
+ String url = super.constructUrlForConnection(queryString);
+
+ if (!url.contains("useSSL=")) {
+ String separator = url.contains("?") ? "&" : "?";
+ url = url + separator + "useSSL=false";
+ }
+
+ if (!url.contains("allowPublicKeyRetrieval=")) {
+ url = url + "&allowPublicKeyRetrieval=true";
+ }
+
+ return url;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ @Override
+ public String getUsername() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ @SuppressWarnings("unchecked")
+ public MySqlContainer withConfigurationOverride(String s) {
+ parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public MySqlContainer withSetupSQL(String sqlPath) {
+ parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withDatabaseName(final String databaseName) {
+ this.databaseName = databaseName;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withUsername(final String username) {
+ this.username = username;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withPassword(final String password) {
+ this.password = password;
+ return this;
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
new file mode 100644
index 0000000000..799335b96d
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.mysql;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.stream.Stream;
+
+public class MysqlBaseITCASE {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlBaseITCASE.class);
+ protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer("docker/server/my.cnf");
+
+ @org.junit.BeforeClass
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @org.junit.AfterClass
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.stop();
+ }
+ LOG.info("Containers are stopped.");
+ }
+
+ protected static MySqlContainer createMySqlContainer(String configPath) {
+ return (MySqlContainer) new MySqlContainer(MySqlContainer.MYSQL_VERSION)
+ .withConfigurationOverride(configPath)
+ .withSetupSQL("docker/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
new file mode 100644
index 0000000000..953ffe8f91
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but would
+# be longer on a production system. Row-level info is required for ingest to work.
+# Server ID is required, but this will vary on production systems.
+server-id = 223344
+log_bin = mysql-bin
+binlog_format = row
+# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the exception
+# message when the binlog expires in the server.
+binlog_expire_logs_seconds = 1
+max_binlog_size = 4096
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
new file mode 100644
index 0000000000..1b22107deb
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
@@ -0,0 +1,41 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: emptydb
+-- ----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE emptydb;
+USE flink-test;
+create table t_mysql_catalog (
+ `id` bigint NOT NULL AUTO_INCREMENT,
+ `catalog_name` varchar(255) NOT NULL,
+ `configuration` text,
+ `create_time` datetime DEFAULT NULL,
+ `update_time` datetime DEFAULT NULL,
+ PRIMARY KEY (`id`) USING BTREE,
+ UNIQUE INDEX `uniq_catalog_name` (`catalog_name`) USING BTREE
+)
+
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
index db8ff46cf4..d13f26c1b3 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
@@ -29,7 +29,7 @@
StreamPark : Flink Shims 1.18
- 1.18.0
+ 1.18.1