diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala index 6baf9d3751..5fb6636855 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala @@ -67,9 +67,9 @@ object LocalClient extends FlinkClientTrait { } override def doTriggerSavepoint( - request: TriggerSavepointRequest, + savepointRequest: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = { - RemoteClient.doTriggerSavepoint(request, flinkConfig) + RemoteClient.doTriggerSavepoint(savepointRequest, flinkConfig) } override def doCancel( diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala index 8e5c6a5c09..fe4fe454a9 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala @@ -48,12 +48,14 @@ object RemoteClient extends FlinkClientTrait { } - override def doCancel(request: CancelRequest, flinkConfig: Configuration): CancelResponse = { + override def doCancel( + cancelRequest: CancelRequest, + flinkConfig: Configuration): CancelResponse = { executeClientAction( - request, + cancelRequest, flinkConfig, (jobID, clusterClient) => { - CancelResponse(super.cancelJob(request, jobID, clusterClient)) + CancelResponse(super.cancelJob(cancelRequest, jobID, clusterClient)) }) } @@ -90,13 +92,13 @@ object RemoteClient extends FlinkClientTrait { } override def doTriggerSavepoint( - request: TriggerSavepointRequest, + savepointRequest: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = { executeClientAction( - request, + savepointRequest, flinkConfig, (jobID, clusterClient) => { - SavepointResponse(super.triggerSavepoint(request, jobID, clusterClient)) + SavepointResponse(super.triggerSavepoint(savepointRequest, jobID, clusterClient)) }) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index 89260632cc..c91fc45f02 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -134,11 +134,11 @@ object YarnSessionClient extends YarnClientTrait { } private[this] def executeClientAction[O, R <: SavepointRequestTrait]( - request: R, + savepointRequestTrait: R, flinkConfig: Configuration, actFunc: (JobID, ClusterClient[_]) => O): O = { flinkConfig - .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId) + .safeSet(YarnConfigOptions.APPLICATION_ID, savepointRequestTrait.clusterId) .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName) logInfo(s""" |------------------------------------------------------------------ @@ -152,10 +152,10 @@ object YarnSessionClient extends YarnClientTrait { val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig) clusterDescriptor = yarnClusterDescriptor._2 client = clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient - actFunc(JobID.fromHexString(request.jobId), client) + actFunc(JobID.fromHexString(savepointRequestTrait.jobId), client) } catch { case e: Exception => - logError(s"${request.getClass.getSimpleName} for flink yarn session job fail") + logError(s"${savepointRequestTrait.getClass.getSimpleName} for flink yarn session job fail") e.printStackTrace() throw e } finally { @@ -176,15 +176,16 @@ object YarnSessionClient extends YarnClientTrait { } override def doTriggerSavepoint( - request: TriggerSavepointRequest, + savepointRequest: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = { executeClientAction( - request, + savepointRequest, flinkConfig, (jobID, clusterClient) => { - val actionResult = super.triggerSavepoint(request, jobID, clusterClient) + val actionResult = super.triggerSavepoint(savepointRequest, jobID, clusterClient) SavepointResponse(actionResult) - }) + } + ) } def deploy(deployRequest: DeployRequest): DeployResponse = { @@ -195,8 +196,8 @@ object YarnSessionClient extends YarnClientTrait { | flinkVersion : ${deployRequest.flinkVersion.version} | execMode : ${deployRequest.executionMode.name()} | clusterId : ${deployRequest.clusterId} - | properties : ${deployRequest.properties.mkString(" ")} - |------------------------------------------------------------------------------------------- + | properties : ${deployRequest.properties.mkString(",")} + |------------------------------------------------------------------------------------------------------- |""".stripMargin) var clusterDescriptor: YarnClusterDescriptor = null var client: ClusterClient[ApplicationId] = null diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala index 456b260c29..4e58a50f90 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala @@ -40,11 +40,11 @@ import scala.util.Try trait YarnClientTrait extends FlinkClientTrait { private[this] def executeClientAction[R <: SavepointRequestTrait, O]( - request: R, + savepointRequestTrait: R, flinkConf: Configuration, actionFunc: (JobID, ClusterClient[_]) => O): O = { - flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId) + flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, savepointRequestTrait.clusterId) val clusterClientFactory = new YarnClusterClientFactory val applicationId = clusterClientFactory.getClusterId(flinkConf) if (applicationId == null) { @@ -57,22 +57,22 @@ trait YarnClientTrait extends FlinkClientTrait { .getClusterClient .autoClose( client => - Try(actionFunc(getJobID(request.jobId), client)).recover { + Try(actionFunc(getJobID(savepointRequestTrait.jobId), client)).recover { case e => throw new FlinkException( - s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " + + s"[StreamPark] Do ${savepointRequestTrait.getClass.getSimpleName} for the job ${savepointRequestTrait.jobId} failed. " + s"detail: ${ExceptionUtils.stringifyException(e)}"); }.get) } override def doTriggerSavepoint( - request: TriggerSavepointRequest, + savepointRequest: TriggerSavepointRequest, flinkConf: Configuration): SavepointResponse = { executeClientAction( - request, + savepointRequest, flinkConf, (jid, client) => { - SavepointResponse(super.triggerSavepoint(request, jid, client)) + SavepointResponse(super.triggerSavepoint(savepointRequest, jid, client)) }) }