Skip to content

Commit

Permalink
[feature] Introduce flink jdbc catalog store plugin. (#3914)
Browse files Browse the repository at this point in the history
* [catalog] Introduce flink jdbc catalog store plugin.

* fixed delete where unmatched

* improve jdbc close method.

* [catalog] refactor project module to stream-flink.

* [catalog] refactor package name

* [catalog] delete unused logs
  • Loading branch information
Mrart authored Jul 31, 2024
1 parent 1c3b890 commit 180bfb2
Show file tree
Hide file tree
Showing 18 changed files with 1,493 additions and 16 deletions.
28 changes: 14 additions & 14 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion streampark-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<module>streampark-flink-packer</module>
<module>streampark-flink-kubernetes</module>
<module>streampark-flink-sql-gateway</module>
<module>streampark-flink-catalog-store</module>
</modules>

<dependencies>
Expand All @@ -62,7 +63,10 @@
<version>${jupiter.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
138 changes: 138 additions & 0 deletions streampark-flink/streampark-flink-catalog-store/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>streampark-flink-catalog-store</artifactId>
<name>StreamPark : Flink Catalog Store</name>

<properties>
<flink.shaded.version19>19.0</flink.shaded.version19>
<flink.version>1.18.1</flink.version>
<flink.shaded.jackson.version>2.15.3</flink.shaded.jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-annotations</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>${flink.shaded.jackson.version}-${flink.shaded.version19}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
<artifactSet>
<includes>
<include>org.apache.streampark:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.catalog;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;

/** Serialization utils */
public final class JacksonUtils {

private JacksonUtils() {
}

private static final ObjectMapper MAPPER;

static {
MAPPER = new ObjectMapper();
MAPPER.registerModule(new SimpleModule());
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
}

public static <T> T read(String json, Class<T> clazz) throws JsonProcessingException {
return MAPPER.readValue(json, clazz);
}

public static <T> T read(String json, TypeReference<T> typeReference) throws JsonProcessingException {
return MAPPER.readValue(json, typeReference);
}

public static String write(Object object) throws JsonProcessingException {
return MAPPER.writeValueAsString(object);
}
}
Loading

0 comments on commit 180bfb2

Please sign in to comment.