Skip to content

Commit

Permalink
[WIP][Feature] Support Flink 1.18 application submit (#3169)
Browse files Browse the repository at this point in the history
* [Feature] Support Flink 1.18 application submit

* Adaptive Flink1.18

* Optimized code

* Modifying checkVersion

* Formatted code
  • Loading branch information
ChengJie1053 authored Oct 25, 2023
1 parent 47f50ee commit 6ea6439
Show file tree
Hide file tree
Showing 11 changed files with 569 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg

def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
case Array(1, v, _) if v >= 12 && v <= 17 => true
case Array(1, v, _) if v >= 12 && v <= 18 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported flink version: $version")
Expand Down
7 changes: 7 additions & 0 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,13 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink 1.18 support-->
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-shims_flink-1.18_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink-submit-core -->
<dependency>
<groupId>org.apache.streampark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class EnvInitializer implements ApplicationRunner {

private static final Pattern PATTERN_FLINK_SHIMS_JAR =
Pattern.compile(
"^streampark-flink-shims_flink-(1.1[2-7])_(2.12)-(.*).jar$",
"^streampark-flink-shims_flink-(1.1[2-8])_(2.12)-(.*).jar$",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);

@SneakyThrows
Expand Down
1 change: 1 addition & 0 deletions streampark-flink/streampark-flink-shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<module>streampark-flink-shims_flink-1.15</module>
<module>streampark-flink-shims_flink-1.16</module>
<module>streampark-flink-shims_flink-1.17</module>
<module>streampark-flink-shims_flink-1.18</module>
</modules>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ object FlinkSqlValidator extends Logger {
private[this] lazy val sqlParserConfigMap: Map[String, SqlParser.Config] = {
def getConfig(sqlDialect: SqlDialect): Config = {
val conformance = sqlDialect match {
case HIVE => FlinkSqlConformance.HIVE
case HIVE =>
try {
FlinkSqlConformance.HIVE
} catch {
// for flink 1.18+
case _: NoSuchFieldError => FlinkSqlConformance.DEFAULT
case e => throw new IllegalArgumentException("Init Flink sql Dialect error: ", e)
}
case DEFAULT => FlinkSqlConformance.DEFAULT
case _ => throw new UnsupportedOperationException(s"Unsupported sqlDialect: $sqlDialect")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-shims</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>streampark-flink-shims_flink-1.18_${scala.binary.version}</artifactId>
<name>StreamPark : Flink Shims 1.18</name>

<properties>
<flink.version>1.18.0</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
<artifactSet>
<includes>
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.core

import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.core.execution.SavepointFormatType

import java.util.concurrent.CompletableFuture

class FlinkClusterClient[T](clusterClient: ClusterClient[T])
extends FlinkClientTrait[T](clusterClient) {

override def triggerSavepoint(
jobID: JobID,
savepointDir: String,
nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.triggerSavepoint(
jobID,
savepointDir,
if (nativeFormat) SavepointFormatType.NATIVE else SavepointFormatType.CANONICAL)
}

override def cancelWithSavepoint(
jobID: JobID,
savepointDirectory: String,
nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.cancelWithSavepoint(
jobID,
savepointDirectory,
if (nativeFormat) SavepointFormatType.NATIVE else SavepointFormatType.CANONICAL)
}

override def stopWithSavepoint(
jobID: JobID,
advanceToEndOfEventTime: Boolean,
savepointDirectory: String,
nativeFormat: Boolean): CompletableFuture[String] = {
clusterClient.stopWithSavepoint(
jobID,
advanceToEndOfEventTime,
savepointDirectory,
if (nativeFormat) SavepointFormatType.NATIVE else SavepointFormatType.CANONICAL)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.core

import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService

import java.util.Optional

class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
extends FlinkKubernetesClientTrait(kubeClient) {

override def getService(serviceName: String): Optional[KubernetesService] = {
kubeClient.getService(serviceName)
}

}
Loading

0 comments on commit 6ea6439

Please sign in to comment.