Skip to content

Commit

Permalink
[ISSUE-3299][Improve] Improve streampark-flink-client module base on …
Browse files Browse the repository at this point in the history
…[3.1 Naming Style] (#3300)
  • Loading branch information
caicancai authored Oct 30, 2023
1 parent f83210f commit ba269cb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ object LocalClient extends FlinkClientTrait {
}

override def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
RemoteClient.doTriggerSavepoint(request, flinkConfig)
RemoteClient.doTriggerSavepoint(savepointRequest, flinkConfig)
}

override def doCancel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ object RemoteClient extends FlinkClientTrait {

}

override def doCancel(request: CancelRequest, flinkConfig: Configuration): CancelResponse = {
override def doCancel(
cancelRequest: CancelRequest,
flinkConfig: Configuration): CancelResponse = {
executeClientAction(
request,
cancelRequest,
flinkConfig,
(jobID, clusterClient) => {
CancelResponse(super.cancelJob(request, jobID, clusterClient))
CancelResponse(super.cancelJob(cancelRequest, jobID, clusterClient))
})
}

Expand Down Expand Up @@ -90,13 +92,13 @@ object RemoteClient extends FlinkClientTrait {
}

override def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
executeClientAction(
request,
savepointRequest,
flinkConfig,
(jobID, clusterClient) => {
SavepointResponse(super.triggerSavepoint(request, jobID, clusterClient))
SavepointResponse(super.triggerSavepoint(savepointRequest, jobID, clusterClient))
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ object YarnSessionClient extends YarnClientTrait {
}

private[this] def executeClientAction[O, R <: SavepointRequestTrait](
request: R,
savepointRequestTrait: R,
flinkConfig: Configuration,
actFunc: (JobID, ClusterClient[_]) => O): O = {
flinkConfig
.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
.safeSet(YarnConfigOptions.APPLICATION_ID, savepointRequestTrait.clusterId)
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
logInfo(s"""
|------------------------------------------------------------------
Expand All @@ -152,10 +152,10 @@ object YarnSessionClient extends YarnClientTrait {
val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
client = clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
actFunc(JobID.fromHexString(request.jobId), client)
actFunc(JobID.fromHexString(savepointRequestTrait.jobId), client)
} catch {
case e: Exception =>
logError(s"${request.getClass.getSimpleName} for flink yarn session job fail")
logError(s"${savepointRequestTrait.getClass.getSimpleName} for flink yarn session job fail")
e.printStackTrace()
throw e
} finally {
Expand All @@ -176,15 +176,16 @@ object YarnSessionClient extends YarnClientTrait {
}

override def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
executeClientAction(
request,
savepointRequest,
flinkConfig,
(jobID, clusterClient) => {
val actionResult = super.triggerSavepoint(request, jobID, clusterClient)
val actionResult = super.triggerSavepoint(savepointRequest, jobID, clusterClient)
SavepointResponse(actionResult)
})
}
)
}

def deploy(deployRequest: DeployRequest): DeployResponse = {
Expand All @@ -195,8 +196,8 @@ object YarnSessionClient extends YarnClientTrait {
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
| properties : ${deployRequest.properties.mkString(",")}
|-------------------------------------------------------------------------------------------------------
|""".stripMargin)
var clusterDescriptor: YarnClusterDescriptor = null
var client: ClusterClient[ApplicationId] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import scala.util.Try
trait YarnClientTrait extends FlinkClientTrait {

private[this] def executeClientAction[R <: SavepointRequestTrait, O](
request: R,
savepointRequestTrait: R,
flinkConf: Configuration,
actionFunc: (JobID, ClusterClient[_]) => O): O = {

flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, savepointRequestTrait.clusterId)
val clusterClientFactory = new YarnClusterClientFactory
val applicationId = clusterClientFactory.getClusterId(flinkConf)
if (applicationId == null) {
Expand All @@ -57,22 +57,22 @@ trait YarnClientTrait extends FlinkClientTrait {
.getClusterClient
.autoClose(
client =>
Try(actionFunc(getJobID(request.jobId), client)).recover {
Try(actionFunc(getJobID(savepointRequestTrait.jobId), client)).recover {
case e =>
throw new FlinkException(
s"[StreamPark] Do ${request.getClass.getSimpleName} for the job ${request.jobId} failed. " +
s"[StreamPark] Do ${savepointRequestTrait.getClass.getSimpleName} for the job ${savepointRequestTrait.jobId} failed. " +
s"detail: ${ExceptionUtils.stringifyException(e)}");
}.get)
}

override def doTriggerSavepoint(
request: TriggerSavepointRequest,
savepointRequest: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {
executeClientAction(
request,
savepointRequest,
flinkConf,
(jid, client) => {
SavepointResponse(super.triggerSavepoint(request, jid, client))
SavepointResponse(super.triggerSavepoint(savepointRequest, jid, client))
})
}

Expand Down

0 comments on commit ba269cb

Please sign in to comment.