Skip to content

Commit

Permalink
[Improve] deploy flink job on k8s timeout improvement (#3425)
Browse files Browse the repository at this point in the history
* [Improve] k8s timeout improvement

* [Improve] maven build args check improvement

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Dec 23, 2023
1 parent 2781c94 commit 0e64672
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

@Slf4j
Expand Down Expand Up @@ -210,39 +212,49 @@ public String getMavenArgs() {
StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");

if (StringUtils.isNotBlank(this.buildArgs)) {
List<String> dangerArgs = getDangerArgs(this.buildArgs);
if (dangerArgs.isEmpty()) {
String dangerArgs = getDangerArgs(this.buildArgs);
if (dangerArgs == null) {
cmdBuffer.append(this.buildArgs.trim());
} else {
throw new IllegalArgumentException(
String.format(
"Invalid build args, dangerous operation symbol detected: %s, in your buildArgs: %s",
dangerArgs.stream().collect(Collectors.joining(",")), this.buildArgs));
"Invalid maven argument, dangerous args: %s, in your buildArgs: %s",
dangerArgs, this.buildArgs));
}
}

String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
if (StringUtils.isNotBlank(setting)) {
List<String> dangerArgs = getDangerArgs(setting);
if (dangerArgs.isEmpty()) {
String dangerArgs = getDangerArgs(setting);
if (dangerArgs == null) {
File file = new File(setting);
if (file.exists() && file.isFile()) {
cmdBuffer.append(" --settings ").append(setting);
} else {
throw new IllegalArgumentException(
String.format("Invalid maven setting path, %s no exists or not file", setting));
String.format("Invalid maven-setting file path, %s no exists or not file", setting));
}
} else {
throw new IllegalArgumentException(
String.format(
"Invalid maven setting path, dangerous operation symbol detected: %s, in your maven setting path: %s",
dangerArgs.stream().collect(Collectors.joining(",")), setting));
"Invalid maven-setting file path, dangerous args: %s, in your maven setting path: %s",
dangerArgs, setting));
}
}
return cmdBuffer.toString();
}

private List<String> getDangerArgs(String param) {
private String getDangerArgs(String param) {
Pattern pattern = Pattern.compile("(`.*?`)|(\\$\\((.*?)\\))");
Matcher matcher = pattern.matcher(param);
if (matcher.find()) {
String dangerArgs = matcher.group(1);
if (dangerArgs == null) {
dangerArgs = matcher.group(2);
}
return dangerArgs;
}

String[] args = param.split("\\s+");
List<String> dangerArgs = new ArrayList<>();
for (String arg : args) {
Expand All @@ -263,7 +275,10 @@ private List<String> getDangerArgs(String param) {
}
}
}
return dangerArgs;
if (!dangerArgs.isEmpty()) {
return dangerArgs.stream().collect(Collectors.joining(","));
}
return null;
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ object FlinkSessionSubmitHelper extends Logger {
// upload flink-job jar
val uploadResult = Request
.post(s"$jmRestUrl/jars/upload")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(60))
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.body(
MultipartEntityBuilder
.create()
Expand Down Expand Up @@ -90,8 +90,8 @@ object FlinkSessionSubmitHelper extends Logger {
// refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
val resp = Request
.post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(60))
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.body(new StringEntity(Serialization.write(new JarRunRequest(flinkConfig))))
.execute
.returnContent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, DeploymentOptions, RestOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.hc.core5.util.Timeout

import javax.annotation.Nullable

import java.time.Duration

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

object KubernetesRetriever extends Logger {

// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
val FLINK_CLIENT_TIMEOUT_SEC = 60L
val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)

// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
val FLINK_REST_AWAIT_TIMEOUT_SEC = 30L
// see org.apache.flink.configuration.RestOptions.RETRY_MAX_ATTEMPTS
val FLINK_REST_RETRY_MAX_ATTEMPTS = 30
val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())

/** get new KubernetesClient */
@throws(classOf[KubernetesClientException])
Expand All @@ -70,9 +70,11 @@ object KubernetesRetriever extends Logger {
val flinkConfig = new Configuration()
flinkConfig.setString(DeploymentOptions.TARGET, executeMode.toString)
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId)
flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofSeconds(FLINK_CLIENT_TIMEOUT_SEC))
flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, FLINK_REST_AWAIT_TIMEOUT_SEC * 1000)
flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, FLINK_REST_RETRY_MAX_ATTEMPTS)
flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, ClientOptions.CLIENT_TIMEOUT.defaultValue())
flinkConfig.set(
RestOptions.AWAIT_LEADER_TIMEOUT,
RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
flinkConfig.set(RestOptions.RETRY_MAX_ATTEMPTS, RestOptions.RETRY_MAX_ATTEMPTS.defaultValue())
if (Try(namespace.isEmpty).getOrElse(true)) {
flinkConfig.setString(
KubernetesConfigOptions.NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.def
Checkpoint.as(
Request
.get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8)) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
val jobDetails = JobDetails.as(
Request
.get(s"$restUrl/jobs/overview")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent()
.asString(StandardCharsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/overview")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))
Expand All @@ -140,8 +140,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default
.as(
Request
.get(s"$flinkJmRestUrl/jobmanager/config")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent
.asString(StandardCharsets.UTF_8))
Expand Down

0 comments on commit 0e64672

Please sign in to comment.