Skip to content

Commit

Permalink
Naming standardization (#3177)
Browse files Browse the repository at this point in the history
  • Loading branch information
caicancai authored Sep 24, 2023
1 parent 16d39f9 commit 436b9f3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
}

override def doTriggerSavepoint(
request: TriggerSavepointRequest,
triggerSavepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {
flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
super.doTriggerSavepoint(request, flinkConf)
super.doTriggerSavepoint(triggerSavepointRequest, flinkConf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo
}

override def doTriggerSavepoint(
request: TriggerSavepointRequest,
triggerSavepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
flinkConfig.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
super.doTriggerSavepoint(request, flinkConfig)
super.doTriggerSavepoint(triggerSavepointRequest, flinkConfig)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,59 +99,60 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
}

@throws[Exception]
override def doCancel(request: CancelRequest, flinkConf: Configuration): CancelResponse = {
override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = {
val effect =
if (!request.withSavepoint) {
if (!cancelRequest.withSavepoint) {
// cancel job
FlinkK8sOperator
.cancelJob(request.id)
.cancelJob(cancelRequest.id)
.as(CancelResponse(null))
} else {
// stop job with savepoint
val savepointDef = JobSavepointDef(
drain = Option(request.withDrain).getOrElse(false),
savepointPath = Option(request.savepointPath),
formatType = Option(request.nativeFormat)
drain = Option(cancelRequest.withDrain).getOrElse(false),
savepointPath = Option(cancelRequest.savepointPath),
formatType = Option(cancelRequest.nativeFormat)
.map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT)
)
FlinkK8sOperator
.stopJob(request.id, savepointDef)
.stopJob(cancelRequest.id, savepointDef)
.flatMap {
result =>
if (result.isFailed) ZIO.fail(StopJobFail(result.failureCause.get))
else ZIO.succeed(CancelResponse(result.location.orNull))
}
}

def richMsg: String => String = s"[flink-cancel][appId=${request.id}] " + _
def richMsg: String => String = s"[flink-cancel][appId=${cancelRequest.id}] " + _

effect.runIOAsTry match {
case Success(rsp) =>
logInfo(richMsg("Cancel flink job successfully."))
rsp
case Failure(err) =>
logError(
richMsg(s"Cancel flink job fail in ${request.executionMode.getName}_V2 mode!"),
richMsg(s"Cancel flink job fail in ${cancelRequest.executionMode.getName}_V2 mode!"),
err)
throw err
}
}

@throws[Exception]
override def doTriggerSavepoint(
request: TriggerSavepointRequest,
triggerSavepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {

val savepointDef = JobSavepointDef(
savepointPath = Option(request.savepointPath),
formatType = Option(request.nativeFormat)
savepointPath = Option(triggerSavepointRequest.savepointPath),
formatType = Option(triggerSavepointRequest.nativeFormat)
.map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT)
)

def richMsg: String => String = s"[flink-trigger-savepoint][appId=${request.id}] " + _
def richMsg: String => String =
s"[flink-trigger-savepoint][appId=${triggerSavepointRequest.id}] " + _

FlinkK8sOperator
.triggerJobSavepoint(request.id, savepointDef)
.triggerJobSavepoint(triggerSavepointRequest.id, savepointDef)
.flatMap {
result =>
if (result.isFailed) ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
Expand All @@ -163,7 +164,8 @@ trait KubernetesClientV2Trait extends FlinkClientTrait {
rsp
case Failure(err) =>
logError(
richMsg(s"Cancel flink job fail in ${request.executionMode.getName}_V2 mode!"),
richMsg(
s"Cancel flink job fail in ${triggerSavepointRequest.executionMode.getName}_V2 mode!"),
err)
throw err
}
Expand Down

0 comments on commit 436b9f3

Please sign in to comment.