From 5b727cc26dd296ff16072359b8fb52e3f3f48b36 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sun, 5 Nov 2023 22:07:57 +0800 Subject: [PATCH] [Improve] submit flink job userclassPaths improvement (#3313) * [Improve] submit flink job userclassPaths improvement * userClasspath minor improvement --- .../flink/client/bean/SubmitRequest.scala | 14 +- .../org/apache/commons/cli/CommandLine.java | 197 ------------------ .../client/impl/YarnApplicationClient.scala | 4 - .../flink/client/trait/FlinkClientTrait.scala | 24 +-- .../flink/core/FlinkSqlValidator.scala | 2 +- 5 files changed, 15 insertions(+), 226 deletions(-) delete mode 100644 streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 25e17120d4..2c1a72e518 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.client.bean import org.apache.streampark.common.Constant -import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace} +import org.apache.streampark.common.conf.{FlinkVersion, Workspace} import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums._ import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils} @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointResto import javax.annotation.Nullable import java.io.File +import java.net.URL import java.util.{Map => JavaMap} import scala.collection.convert.ImplicitConversions._ @@ -55,7 +56,8 @@ case class SubmitRequest( @Nullable k8sSubmitParam: KubernetesSubmitParam, @Nullable extraParameter: JavaMap[String, Any]) { - lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_PROPERTY_PREFIX) + private[this] lazy val appProperties: Map[String, String] = getParameterMap( + KEY_FLINK_PROPERTY_PREFIX) lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX) @@ -68,9 +70,15 @@ case class SubmitRequest( lazy val effectiveAppName: String = if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else this.appName + lazy val classPaths: List[URL] = { + val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib" + val lib = Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL]) + flinkVersion.flinkLibs ++ lib + } + lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString - lazy val allowNonRestoredState = Try( + lazy val allowNonRestoredState: Boolean = Try( properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean) .getOrElse(false) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java deleted file mode 100644 index f1380fd9aa..0000000000 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.commons.cli; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -public class CommandLine implements Serializable { - - /** The serial version UID. */ - private static final long serialVersionUID = 1L; - - /** The unrecognized options/arguments */ - private final List args = new LinkedList<>(); - - /** The processed options */ - private final List