diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala index ea4905d9c2..209a210451 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala @@ -99,9 +99,9 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { } override def doTriggerSavepoint( - request: TriggerSavepointRequest, + triggerSavepointRequest: TriggerSavepointRequest, flinkConf: Configuration): SavepointResponse = { flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName) - super.doTriggerSavepoint(request, flinkConf) + super.doTriggerSavepoint(triggerSavepointRequest, flinkConf) } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index c4c9c6e41c..a63e912987 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -250,9 +250,9 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo } override def doTriggerSavepoint( - request: TriggerSavepointRequest, + triggerSavepointRequest: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = { flinkConfig.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_SESSION.getName) - super.doTriggerSavepoint(request, flinkConfig) + super.doTriggerSavepoint(triggerSavepointRequest, flinkConfig) } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala index 75b9792469..eebcaec2eb 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala @@ -99,23 +99,23 @@ trait KubernetesClientV2Trait extends FlinkClientTrait { } @throws[Exception] - override def doCancel(request: CancelRequest, flinkConf: Configuration): CancelResponse = { + override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = { val effect = - if (!request.withSavepoint) { + if (!cancelRequest.withSavepoint) { // cancel job FlinkK8sOperator - .cancelJob(request.id) + .cancelJob(cancelRequest.id) .as(CancelResponse(null)) } else { // stop job with savepoint val savepointDef = JobSavepointDef( - drain = Option(request.withDrain).getOrElse(false), - savepointPath = Option(request.savepointPath), - formatType = Option(request.nativeFormat) + drain = Option(cancelRequest.withDrain).getOrElse(false), + savepointPath = Option(cancelRequest.savepointPath), + formatType = Option(cancelRequest.nativeFormat) .map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT) ) FlinkK8sOperator - .stopJob(request.id, savepointDef) + .stopJob(cancelRequest.id, savepointDef) .flatMap { result => if (result.isFailed) ZIO.fail(StopJobFail(result.failureCause.get)) @@ -123,7 +123,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait { } } - def richMsg: String => String = s"[flink-cancel][appId=${request.id}] " + _ + def richMsg: String => String = s"[flink-cancel][appId=${cancelRequest.id}] " + _ effect.runIOAsTry match { case Success(rsp) => @@ -131,7 +131,7 @@ trait KubernetesClientV2Trait extends FlinkClientTrait { rsp case Failure(err) => logError( - richMsg(s"Cancel flink job fail in ${request.executionMode.getName}_V2 mode!"), + richMsg(s"Cancel flink job fail in ${cancelRequest.executionMode.getName}_V2 mode!"), err) throw err } @@ -139,19 +139,20 @@ trait KubernetesClientV2Trait extends FlinkClientTrait { @throws[Exception] override def doTriggerSavepoint( - request: TriggerSavepointRequest, + triggerSavepointRequest: TriggerSavepointRequest, flinkConf: Configuration): SavepointResponse = { val savepointDef = JobSavepointDef( - savepointPath = Option(request.savepointPath), - formatType = Option(request.nativeFormat) + savepointPath = Option(triggerSavepointRequest.savepointPath), + formatType = Option(triggerSavepointRequest.nativeFormat) .map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT) ) - def richMsg: String => String = s"[flink-trigger-savepoint][appId=${request.id}] " + _ + def richMsg: String => String = + s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _ FlinkK8sOperator - .triggerJobSavepoint(request.id, savepointDef) + .triggerJobSavepoint(triggerSavepointRequest.id, savepointDef) .flatMap { result => if (result.isFailed) ZIO.fail(TriggerJobSavepointFail(result.failureCause.get)) @@ -163,7 +164,8 @@ trait KubernetesClientV2Trait extends FlinkClientTrait { rsp case Failure(err) => logError( - richMsg(s"Cancel flink job fail in ${request.executionMode.getName}_V2 mode!"), + richMsg( + s"Cancel flink job fail in ${triggerSavepointRequest.executionMode.getName}_V2 mode!"), err) throw err }