From fbcab702080089be9d1d1a2d6ec665dd060b2f7a Mon Sep 17 00:00:00 2001 From: caicancai <2356672992@qq.com> Date: Sun, 5 Nov 2023 15:27:58 +0800 Subject: [PATCH 1/3] [Improve][Flink] Improve deployreponse parameters --- .../streampark/flink/client/bean/DeployResponse.scala | 9 ++++++++- .../client/impl/KubernetesNativeSessionClient.scala | 2 +- .../flink/client/impl/KubernetesSessionClientV2.scala | 3 ++- .../streampark/flink/client/impl/YarnSessionClient.scala | 5 ++++- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala index 91b6ecad09..d4e88b8498 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala @@ -17,4 +17,11 @@ package org.apache.streampark.flink.client.bean -case class DeployResponse(address: String, clusterId: String) +import javax.annotation.Nullable + +import java.util.{Map => JavaMap} + +case class DeployResponse( + @Nullable address: String = "", + clusterId: String, + flinkConfig: JavaMap[String, String]) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index e2684014e1..44c2295bf9 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -189,7 +189,7 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient } if (client.getWebInterfaceURL != null) { - DeployResponse(client.getWebInterfaceURL, client.getClusterId) + DeployResponse(client.getWebInterfaceURL, client.getClusterId, flinkConfig.toMap) } else { null } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala index 68a89cac03..1fb12e871a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala @@ -158,13 +158,14 @@ object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger { FlinkK8sOperator.deployCluster(deployRequest.id, flinkDeployDef).runIOAsTry match { case Success(_) => logInfo(richMsg("Flink Cluster has been submitted successfully.")) - DeployResponse(null, deployRequest.clusterId) case Failure(err) => logError( richMsg(s"Submit Flink Cluster fail in${deployRequest.executionMode.getName}_V2 mode!"), err) throw err } + + DeployResponse(null, deployRequest.clusterId, flinkConfig.toMap) } /** Shutdown Flink cluster. */ diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index c91fc45f02..d1cd12d98d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -219,7 +219,10 @@ object YarnSessionClient extends YarnClientTrait { .retrieve(ApplicationId.fromString(deployRequest.clusterId)) .getClusterClient if (yarnClient.getWebInterfaceURL != null) { - return DeployResponse(yarnClient.getWebInterfaceURL, yarnClient.getClusterId.toString) + return DeployResponse( + yarnClient.getWebInterfaceURL, + yarnClient.getClusterId.toString, + flinkConfig.toMap) } } } catch { From a876666fcca393cefbb21900a944a458eba48716 Mon Sep 17 00:00:00 2001 From: caicancai <2356672992@qq.com> Date: Sun, 5 Nov 2023 16:01:27 +0800 Subject: [PATCH 2/3] [Improve][Flink] Improve deployreponse parameters --- .../apache/streampark/flink/client/bean/DeployResponse.scala | 5 +---- .../flink/client/impl/KubernetesNativeSessionClient.scala | 2 +- .../flink/client/impl/KubernetesSessionClientV2.scala | 2 +- .../streampark/flink/client/impl/YarnSessionClient.scala | 5 +---- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala index d4e88b8498..703b4944b0 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala @@ -21,7 +21,4 @@ import javax.annotation.Nullable import java.util.{Map => JavaMap} -case class DeployResponse( - @Nullable address: String = "", - clusterId: String, - flinkConfig: JavaMap[String, String]) +case class DeployResponse(@Nullable address: String = "", clusterId: String) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index 44c2295bf9..e2684014e1 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -189,7 +189,7 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient } if (client.getWebInterfaceURL != null) { - DeployResponse(client.getWebInterfaceURL, client.getClusterId, flinkConfig.toMap) + DeployResponse(client.getWebInterfaceURL, client.getClusterId) } else { null } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala index 1fb12e871a..d5e25ce5a4 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala @@ -165,7 +165,7 @@ object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger { throw err } - DeployResponse(null, deployRequest.clusterId, flinkConfig.toMap) + DeployResponse(null, deployRequest.clusterId) } /** Shutdown Flink cluster. */ diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index d1cd12d98d..c91fc45f02 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -219,10 +219,7 @@ object YarnSessionClient extends YarnClientTrait { .retrieve(ApplicationId.fromString(deployRequest.clusterId)) .getClusterClient if (yarnClient.getWebInterfaceURL != null) { - return DeployResponse( - yarnClient.getWebInterfaceURL, - yarnClient.getClusterId.toString, - flinkConfig.toMap) + return DeployResponse(yarnClient.getWebInterfaceURL, yarnClient.getClusterId.toString) } } } catch { From 7b7d5b449b652650d4e12f06d71efd044ad32962 Mon Sep 17 00:00:00 2001 From: caicancai <2356672992@qq.com> Date: Sun, 5 Nov 2023 16:02:27 +0800 Subject: [PATCH 3/3] [Improve][Flink] Improve deployreponse parameters --- .../apache/streampark/flink/client/bean/DeployResponse.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala index 703b4944b0..abab13b86d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala @@ -19,6 +19,4 @@ package org.apache.streampark.flink.client.bean import javax.annotation.Nullable -import java.util.{Map => JavaMap} - case class DeployResponse(@Nullable address: String = "", clusterId: String)