From cd9db377e9157774a38f051a7b49735f3d645ba2 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 20 Jul 2024 14:20:39 +0800 Subject: [PATCH] [Improve] spark-submit improvements (#3900) --- .../common/util}/ChildFirstClassLoader.scala | 60 ++----- .../util}/ClassLoaderObjectInputStream.scala | 2 +- .../flink/client/trait/FlinkClientTrait.scala | 2 +- .../flink/proxy/FlinkShimsProxy.scala | 28 +++- .../spark/client/bean/SubmitRequest.scala | 36 ++--- .../client/conf/SparkConfiguration.scala | 9 +- .../client/proxy/ChildFirstClassLoader.scala | 132 ---------------- .../proxy/ClassLoaderObjectInputStream.scala | 82 ---------- .../spark/client/proxy/SparkShimsProxy.scala | 39 +++-- .../client/impl/YarnApplicationClient.scala | 146 ++++++++---------- .../spark/client/trait/SparkClientTrait.scala | 38 +++-- 11 files changed, 175 insertions(+), 399 deletions(-) rename {streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy => streampark-common/src/main/scala/org/apache/streampark/common/util}/ChildFirstClassLoader.scala (64%) rename {streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy => streampark-common/src/main/scala/org/apache/streampark/common/util}/ClassLoaderObjectInputStream.scala (98%) delete mode 100644 streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala delete mode 100644 streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala similarity index 64% rename from streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala rename to streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala index ec8f360b59..fabe9e73b2 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ChildFirstClassLoader.scala @@ -15,13 +15,11 @@ * limitations under the License. */ -package org.apache.streampark.flink.proxy +package org.apache.streampark.common.util import java.io.{File, IOException} import java.net.{URL, URLClassLoader} import java.util -import java.util.function.Consumer -import java.util.regex.Pattern import scala.util.Try @@ -36,40 +34,12 @@ import scala.util.Try class ChildFirstClassLoader( urls: Array[URL], parent: ClassLoader, - flinkResourcePattern: Pattern, - classLoadingExceptionHandler: Consumer[Throwable]) + parentFirstClasses: List[String], + loadJarFilter: String => Boolean) extends URLClassLoader(urls, parent) { ClassLoader.registerAsParallelCapable() - def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: Pattern) { - this( - urls, - parent, - flinkResourcePattern, - (t: Throwable) => throw t) - } - - ClassLoader.registerAsParallelCapable() - - private val FLINK_PATTERN = - Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) - - private val JAR_PROTOCOL = "jar" - - private val PARENT_FIRST_PATTERNS = List( - "java.", - "javax.xml", - "org.slf4j", - "org.apache.log4j", - "org.apache.logging", - "org.apache.commons.logging", - "org.apache.commons.cli", - "ch.qos.logback", - "org.xml", - "org.w3c", - "org.apache.hadoop") - @throws[ClassNotFoundException] override def loadClass(name: String, resolve: Boolean): Class[_] = { try { @@ -78,7 +48,7 @@ class ChildFirstClassLoader( super.findLoadedClass(name) match { case null => // check whether the class should go parent-first - PARENT_FIRST_PATTERNS.find(name.startsWith) match { + parentFirstClasses.find(name.startsWith) match { case Some(_) => super.loadClass(name, resolve) case _ => Try(findClass(name)).getOrElse(super.loadClass(name, resolve)) } @@ -90,9 +60,7 @@ class ChildFirstClassLoader( } } } catch { - case e: Throwable => - classLoadingExceptionHandler.accept(e) - null + case e: Throwable => throw e } } @@ -105,20 +73,14 @@ class ChildFirstClassLoader( } /** - * e.g. flinkResourcePattern: flink-1.12

flink-1.12.jar/resource flink-1.14.jar/resource - * other.jar/resource \=> after filterFlinkShimsResource \=> flink-1.12.jar/resource - * other.jar/resource - * * @param urlClassLoaderResource * @return */ - private def filterFlinkShimsResource(urlClassLoaderResource: URL): URL = { - if (urlClassLoaderResource != null && JAR_PROTOCOL == urlClassLoaderResource.getProtocol) { + private def filterResource(urlClassLoaderResource: URL): URL = { + if (urlClassLoaderResource != null && "jar" == urlClassLoaderResource.getProtocol) { val spec = urlClassLoaderResource.getFile - val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName - val matchState = - FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern.matcher(filename).matches - if (matchState) { + val jarName = new File(spec.substring(0, spec.indexOf("!/"))).getName + if (loadJarFilter(jarName)) { return null } } @@ -127,7 +89,7 @@ class ChildFirstClassLoader( private def addResources(result: util.List[URL], resources: util.Enumeration[URL]) = { while (resources.hasMoreElements) { - val urlClassLoaderResource = filterFlinkShimsResource(resources.nextElement) + val urlClassLoaderResource = filterResource(resources.nextElement) if (urlClassLoaderResource != null) { result.add(urlClassLoaderResource) } @@ -145,7 +107,7 @@ class ChildFirstClassLoader( addResources(result, parent.getResources(name)) } new util.Enumeration[URL]() { - final private[proxy] val iter = result.iterator + private[this] val iter = result.iterator override def hasMoreElements: Boolean = iter.hasNext diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala similarity index 98% rename from streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala rename to streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala index ae08662927..574c7d51ab 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ClassLoaderObjectInputStream.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderObjectInputStream.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.flink.proxy +package org.apache.streampark.common.util import java.io.{InputStream, IOException, ObjectInputStream, ObjectStreamClass} import java.lang.reflect.Proxy diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 1d328207a7..da91874261 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -64,7 +64,7 @@ trait FlinkClientTrait extends Logger { |--------------------------------------- flink job start --------------------------------------- | userFlinkHome : ${submitRequest.flinkVersion.flinkHome} | flinkVersion : ${submitRequest.flinkVersion.version} - | appName : ${submitRequest.appName} + | appName : ${submitRequest.effectiveAppName} | devMode : ${submitRequest.developmentMode.name()} | execMode : ${submitRequest.executionMode.name()} | k8sNamespace : ${submitRequest.kubernetesNamespace} diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala index 1d95303acb..5dd25ec5ff 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala +++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala @@ -19,7 +19,7 @@ package org.apache.streampark.flink.proxy import org.apache.streampark.common.Constant import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion} -import org.apache.streampark.common.util.{ClassLoaderUtils, Logger} +import org.apache.streampark.common.util.{ChildFirstClassLoader, ClassLoaderObjectInputStream, ClassLoaderUtils, Logger} import org.apache.streampark.common.util.ImplicitsUtils._ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} @@ -35,6 +35,8 @@ object FlinkShimsProxy extends Logger { private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]() + private[this] val FLINK_JAR_PATTERN = Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) + private[this] val INCLUDE_PATTERN: Pattern = Pattern.compile("(streampark-shaded-jackson-)(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) private[this] def getFlinkShimsResourcePattern(majorVersion: String) = @@ -42,6 +44,19 @@ object FlinkShimsProxy extends Logger { private[this] lazy val FLINK_SHIMS_PREFIX = "streampark-flink-shims_flink" + private[this] lazy val PARENT_FIRST_PATTERNS = List( + "java.", + "javax.xml", + "org.slf4j", + "org.apache.log4j", + "org.apache.logging", + "org.apache.commons.logging", + "org.apache.commons.cli", + "ch.qos.logback", + "org.xml", + "org.w3c", + "org.apache.hadoop") + /** * Get shimsClassLoader to execute for scala API * @@ -97,10 +112,16 @@ object FlinkShimsProxy extends Logger { new ChildFirstClassLoader( shimsUrls.toArray, Thread.currentThread().getContextClassLoader, - getFlinkShimsResourcePattern(flinkVersion.majorVersion)) + PARENT_FIRST_PATTERNS, + jarName => loadJarFilter(jarName, flinkVersion)) }) } + private def loadJarFilter(jarName: String, flinkVersion: FlinkVersion): Boolean = { + val childFirstPattern = getFlinkShimsResourcePattern(flinkVersion.majorVersion) + FLINK_JAR_PATTERN.matcher(jarName).matches && !childFirstPattern.matcher(jarName).matches + } + private def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => Unit): Unit = { val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME) require( @@ -177,7 +198,8 @@ object FlinkShimsProxy extends Logger { new ChildFirstClassLoader( shimsUrls.toArray, Thread.currentThread().getContextClassLoader, - getFlinkShimsResourcePattern(flinkVersion.majorVersion)) + PARENT_FIRST_PATTERNS, + jarName => loadJarFilter(jarName, flinkVersion)) }) } diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala index 146ccb5d39..2feca4b0f5 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala @@ -52,18 +52,25 @@ case class SubmitRequest( @Nullable buildResult: BuildResult, @Nullable extraParameter: JavaMap[String, Any]) { + val DEFAULT_SUBMIT_PARAM = Map[String, Any]( + "spark.driver.cores" -> "1", + "spark.driver.memory" -> "1g", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "1g") + private[this] lazy val appProperties: Map[String, String] = getParameterMap( KEY_SPARK_PROPERTY_PREFIX) lazy val appMain: String = this.developmentMode match { - case FlinkDevelopmentMode.FLINK_SQL => - Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS + case FlinkDevelopmentMode.FLINK_SQL => Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS) } - lazy val effectiveAppName: String = - if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) - else this.appName + lazy val effectiveAppName: String = if (this.appName == null) { + appProperties(KEY_FLINK_APP_NAME) + } else { + this.appName + } lazy val libs: List[URL] = { val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib" @@ -71,22 +78,11 @@ case class SubmitRequest( .getOrElse(List.empty[URL]) } - lazy val classPaths: List[URL] = sparkVersion.sparkLibs ++ libs - - lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString - - lazy val userJarFile: File = { + lazy val userJarPath: String = { executionMode match { case _ => checkBuildResult() - new File(buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath) - } - } - - lazy val safePackageProgram: Boolean = { - sparkVersion.version.split("\\.").map(_.trim.toInt) match { - case Array(a, b, c) if a >= 3 => b > 1 - case _ => false + buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath } } @@ -133,7 +129,7 @@ case class SubmitRequest( } @throws[IOException] - def isSymlink(file: File): Boolean = { + private def isSymlink(file: File): Boolean = { if (file == null) throw new NullPointerException("File must not be null") Files.isSymbolicLink(file.toPath) } @@ -163,7 +159,7 @@ case class SubmitRequest( } @throws[Exception] - def checkBuildResult(): Unit = { + private def checkBuildResult(): Unit = { executionMode match { case _ => if (this.buildResult == null) { diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala index 99e97d3b62..adf0679742 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/conf/SparkConfiguration.scala @@ -17,11 +17,4 @@ package org.apache.streampark.spark.client.conf -object SparkConfiguration { - val defaultParameters = Map[String, Any]( - "spark.driver.cores" -> "1", - "spark.driver.memory" -> "1g", - "spark.executor.cores" -> "1", - "spark.executor.memory" -> "1g") - -} +object SparkConfiguration {} diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala deleted file mode 100644 index 7ca8886cba..0000000000 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ChildFirstClassLoader.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.spark.client.proxy - -import java.io.{File, IOException} -import java.net.{URL, URLClassLoader} -import java.util -import java.util.regex.Pattern - -import scala.language.existentials -import scala.util.Try - -/** - * A variant of the URLClassLoader that first loads from the URLs and only after that from the - * parent. - * - *

{@link # getResourceAsStream ( String )} uses {@link # getResource ( String )} internally so - * we don't override that. - */ - -class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, resourcePattern: Pattern) - extends URLClassLoader(urls, parent) { - - ClassLoader.registerAsParallelCapable() - - private val SPARK_PATTERN = - Pattern.compile("spark-(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) - - private val JAR_PROTOCOL = "jar" - - private val PARENT_FIRST_PATTERNS = List( - "java.", - "javax.xml", - "org.slf4j", - "org.apache.log4j", - "org.apache.logging", - "org.apache.commons.logging", - "ch.qos.logback", - "org.xml", - "org.w3c", - "org.apache.hadoop") - - @throws[ClassNotFoundException] - override def loadClass(name: String, resolve: Boolean): Class[_] = { - this.synchronized { - // First, check if the class has already been loaded - val clazz = super.findLoadedClass(name) match { - case null => - // check whether the class should go parent-first - for (parentFirstPattern <- PARENT_FIRST_PATTERNS) { - if (name.startsWith(parentFirstPattern)) { - super.loadClass(name, resolve) - } - } - Try(findClass(name)).getOrElse(super.loadClass(name, resolve)) - case c: Class[_] => - if (resolve) { - resolveClass(c) - } - c - } - clazz - } - } - - override def getResource(name: String): URL = { - // first, try and find it via the URLClassloader - val urlClassLoaderResource = findResource(name) - if (urlClassLoaderResource != null) return urlClassLoaderResource - // delegate to super - super.getResource(name) - } - - private def filterShimsResource(urlClassLoaderResource: URL): URL = { - if (urlClassLoaderResource != null && JAR_PROTOCOL == urlClassLoaderResource.getProtocol) { - val spec = urlClassLoaderResource.getFile - val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName - val matchState = - SPARK_PATTERN.matcher(filename).matches && !resourcePattern - .matcher(filename) - .matches - if (matchState) { - return null - } - } - urlClassLoaderResource - } - - private def addResources(result: util.List[URL], resources: util.Enumeration[URL]) = { - while (resources.hasMoreElements) { - val urlClassLoaderResource = filterShimsResource(resources.nextElement) - if (urlClassLoaderResource != null) { - result.add(urlClassLoaderResource) - } - } - result - } - - @throws[IOException] - override def getResources(name: String): util.Enumeration[URL] = { - // first get resources from URLClassloader - val result = addResources(new util.ArrayList[URL], findResources(name)) - val parent = getParent - if (parent != null) { - // get parent urls - addResources(result, parent.getResources(name)) - } - new util.Enumeration[URL]() { - final private[proxy] val iter = result.iterator - - override def hasMoreElements: Boolean = iter.hasNext - - override def nextElement: URL = iter.next - } - } - -} diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala deleted file mode 100644 index a03c97018c..0000000000 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/ClassLoaderObjectInputStream.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.spark.client.proxy - -import java.io.{InputStream, IOException, ObjectInputStream, ObjectStreamClass} -import java.lang.reflect.Proxy - -import scala.util.Try - -/** - * A special ObjectInputStream that loads a class based on a specified ClassLoader - * rather than the system default.

This is useful in dynamic container environments. - * - * @since 1.1 - */ -class ClassLoaderObjectInputStream(classLoader: ClassLoader, inputStream: InputStream) - extends ObjectInputStream(inputStream) { - - /** - * Resolve a class specified by the descriptor using the specified ClassLoader or the super - * ClassLoader. - * - * @param objectStreamClass - * descriptor of the class - * @return - * the Class object described by the ObjectStreamClass - * @throws IOException - * in case of an I/O error - * @throws ClassNotFoundException - * if the Class cannot be found - */ - @throws[IOException] - @throws[ClassNotFoundException] - override protected def resolveClass(objectStreamClass: ObjectStreamClass): Class[_] = { - // delegate to super class loader which can resolve primitives - Try(Class.forName(objectStreamClass.getName, false, classLoader)) - .getOrElse(super.resolveClass(objectStreamClass)) - } - - /** - * Create a proxy class that implements the specified interfaces using the specified ClassLoader - * or the super ClassLoader. - * - * @param interfaces - * the interfaces to implement - * @return - * a proxy class implementing the interfaces - * @throws IOException - * in case of an I/O error - * @throws ClassNotFoundException - * if the Class cannot be found - * @see - * ObjectInputStream#resolveProxyClass(String[]) - * @since 2.1 - */ - @throws[IOException] - @throws[ClassNotFoundException] - override protected def resolveProxyClass(interfaces: Array[String]): Class[_] = { - val interfaceClasses = new Array[Class[_]](interfaces.length) - for (i <- interfaces.indices) { - interfaceClasses(i) = Class.forName(interfaces(i), false, classLoader) - } - Try(Proxy.getProxyClass(classLoader, interfaceClasses: _*)) - .getOrElse(super.resolveProxyClass(interfaces)) - } - -} diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala index 5cdfb90635..97e749e2b0 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala @@ -19,7 +19,7 @@ package org.apache.streampark.spark.client.proxy import org.apache.streampark.common.Constant import org.apache.streampark.common.conf.{ConfigKeys, SparkVersion} -import org.apache.streampark.common.util.{ClassLoaderUtils, Logger} +import org.apache.streampark.common.util.{ChildFirstClassLoader, ClassLoaderObjectInputStream, ClassLoaderUtils, Logger} import org.apache.streampark.common.util.ImplicitsUtils._ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} @@ -33,21 +33,31 @@ object SparkShimsProxy extends Logger { private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]() - private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = - MutableMap[String, ClassLoader]() + private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, ClassLoader]() - private[this] val INCLUDE_PATTERN: Pattern = - Pattern.compile( - "(streampark-shaded-jackson-)(.*).jar", - Pattern.CASE_INSENSITIVE | Pattern.DOTALL) + private[this] val INCLUDE_PATTERN: Pattern = Pattern.compile("(streampark-shaded-jackson-)(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) private[this] def getSparkShimsResourcePattern(sparkLargeVersion: String) = Pattern.compile( s"spark-(.*)-$sparkLargeVersion(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) + private[this] lazy val SPARK_JAR_PATTERN = Pattern.compile("spark-(.*).jar", Pattern.CASE_INSENSITIVE | Pattern.DOTALL) + private[this] lazy val SPARK_SHIMS_PREFIX = "streampark-spark-shims_spark" + private[this] lazy val PARENT_FIRST_PATTERNS = List( + "java.", + "javax.xml", + "org.slf4j", + "org.apache.log4j", + "org.apache.logging", + "org.apache.commons.logging", + "ch.qos.logback", + "org.xml", + "org.w3c", + "org.apache.hadoop") + def proxy[T](sparkVersion: SparkVersion, func: ClassLoader => T): T = { val shimsClassLoader = getSparkShimsClassLoader(sparkVersion) ClassLoaderUtils @@ -61,7 +71,7 @@ object SparkShimsProxy extends Logger { } // need to load all spark-table dependencies compatible with different versions - def getVerifySqlLibClassLoader(sparkVersion: SparkVersion): ClassLoader = { + private def getVerifySqlLibClassLoader(sparkVersion: SparkVersion): ClassLoader = { logInfo(s"Add verify sql lib,spark version: $sparkVersion") VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate( s"${sparkVersion.fullVersion}", { @@ -87,11 +97,17 @@ object SparkShimsProxy extends Logger { new ChildFirstClassLoader( shimsUrls.toArray, Thread.currentThread().getContextClassLoader, - getSparkShimsResourcePattern(sparkVersion.majorVersion)) + PARENT_FIRST_PATTERNS, + jarName => loadJarFilter(jarName, sparkVersion)) }) } - def addShimsUrls(sparkVersion: SparkVersion, addShimUrl: File => Unit): Unit = { + private def loadJarFilter(jarName: String, sparkVersion: SparkVersion): Boolean = { + val childFirstPattern = getSparkShimsResourcePattern(sparkVersion.majorVersion) + SPARK_JAR_PATTERN.matcher(jarName).matches && !childFirstPattern.matcher(jarName).matches + } + + private def addShimsUrls(sparkVersion: SparkVersion, addShimUrl: File => Unit): Unit = { val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME) require( appHome != null, @@ -155,7 +171,8 @@ object SparkShimsProxy extends Logger { new ChildFirstClassLoader( shimsUrls.toArray, Thread.currentThread().getContextClassLoader, - getSparkShimsResourcePattern(sparkVersion.majorVersion)) + PARENT_FIRST_PATTERNS, + jarName => loadJarFilter(jarName, sparkVersion)) }) } diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala index 9870799c0d..af5776f18e 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala @@ -17,27 +17,23 @@ package org.apache.streampark.spark.client.impl -import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.enums.SparkExecutionMode import org.apache.streampark.common.util.HadoopUtils -import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse import org.apache.streampark.spark.client.`trait`.SparkClientTrait import org.apache.streampark.spark.client.bean._ -import org.apache.streampark.spark.client.conf.SparkConfiguration import org.apache.commons.collections.MapUtils import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} -import java.util.concurrent.{CountDownLatch, Executors, ExecutorService} +import java.util.concurrent.CountDownLatch + +import scala.collection.convert.ImplicitConversions._ +import scala.util.{Failure, Success, Try} /** yarn application mode submit */ object YarnApplicationClient extends SparkClientTrait { - private val threadPool: ExecutorService = Executors.newFixedThreadPool(1) - - private[this] lazy val workspace = Workspace.remote - override def doStop(stopRequest: StopRequest): StopResponse = { HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId)) null @@ -46,96 +42,80 @@ object YarnApplicationClient extends SparkClientTrait { override def setConfig(submitRequest: SubmitRequest): Unit = {} override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = { - launch(submitRequest) + // 1) prepare sparkLauncher + val launcher: SparkLauncher = prepareSparkLauncher(submitRequest) + + // 2) set spark config + setSparkConfig(submitRequest, launcher) + + // 3) launch + Try(launch(launcher)) match { + case Success(handle: SparkAppHandle) => + logger.info(s"[StreamPark][YarnApplicationClient] spark job: ${submitRequest.effectiveAppName} is submit successful, " + + s"appid: ${handle.getAppId}, " + + s"state: ${handle.getState}") + SubmitResponse(null, null, handle.getAppId) + case Failure(e) => throw e + } + } + + private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = { + logger.info("[StreamPark][YarnApplicationClient] The spark task start") + val submitFinished: CountDownLatch = new CountDownLatch(1) + val sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() { + override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {} + override def stateChanged(handle: SparkAppHandle): Unit = { + if (handle.getAppId != null) { + logger.info("{} stateChanged :{}", Array(handle.getAppId, handle.getState.toString)) + } else { + logger.info("stateChanged :{}", handle.getState.toString) + } + if (SparkAppHandle.State.FAILED == handle.getState) { + logger.error("Task run failure stateChanged :{}", handle.getState.toString) + } + if (handle.getState.isFinal) { + submitFinished.countDown() + } + } + }) + submitFinished.await() + sparkAppHandle } - private def launch(submitRequest: SubmitRequest): SubmitResponse = { - val launcher: SparkLauncher = new SparkLauncher() + private def prepareSparkLauncher(submitRequest: SubmitRequest) = { + new SparkLauncher() .setSparkHome(submitRequest.sparkVersion.sparkHome) - .setAppResource(submitRequest.buildResult - .asInstanceOf[ShadedBuildResponse] - .shadedJarPath) + .setAppResource(submitRequest.userJarPath) .setMainClass(submitRequest.appMain) + .setAppName(submitRequest.effectiveAppName) + .setConf( + "spark.yarn.jars", + submitRequest.hdfsWorkspace.sparkLib + "/*.jar") + .setVerbose(true) .setMaster("yarn") .setDeployMode(submitRequest.executionMode match { case SparkExecutionMode.YARN_CLIENT => "client" case SparkExecutionMode.YARN_CLUSTER => "cluster" case _ => - throw new IllegalArgumentException( + throw new UnsupportedOperationException( "[StreamPark][YarnApplicationClient] Yarn mode only support \"client\" and \"cluster\".") - }) - .setAppName(submitRequest.appName) - .setConf( - "spark.yarn.jars", - submitRequest - .hdfsWorkspace - .sparkLib + "/*.jar") - .setVerbose(true) - - import scala.collection.JavaConverters._ - setDynamicProperties(launcher, submitRequest.properties.asScala.toMap) + } - // TODO: Adds command line arguments for the application. - // launcher.addAppArgs() + private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher: SparkLauncher): Unit = { + logger.info("[StreamPark][SparkClient][YarnApplicationClient] set spark configuration.") + // 1) set spark conf + submitRequest.properties.foreach(prop => { + val k = prop._1 + val v = prop._2.toString + logInfo(s"| $k : $v") + sparkLauncher.setConf(k, v) + }) + // 2) appArgs... if (MapUtils.isNotEmpty(submitRequest.extraParameter) && submitRequest.extraParameter .containsKey("sql")) { - launcher.addAppArgs("--sql", submitRequest.extraParameter.get("sql").toString) - } - - logger.info("[StreamPark][YarnApplicationClient] The spark task start") - val cdlForApplicationId: CountDownLatch = new CountDownLatch(1) - - var sparkAppHandle: SparkAppHandle = null - threadPool.execute(new Runnable { - override def run(): Unit = { - try { - val countDownLatch: CountDownLatch = new CountDownLatch(1) - sparkAppHandle = launcher.startApplication(new SparkAppHandle.Listener() { - override def stateChanged(handle: SparkAppHandle): Unit = { - if (handle.getAppId != null) { - if (cdlForApplicationId.getCount != 0) { - cdlForApplicationId.countDown() - } - logger.info("{} stateChanged :{}", Array(handle.getAppId, handle.getState.toString)) - } else logger.info("stateChanged :{}", handle.getState.toString) - - if (SparkAppHandle.State.FAILED.toString == handle.getState.toString) { - logger.error("Task run failure stateChanged :{}", handle.getState.toString) - } - - if (handle.getState.isFinal) { - countDownLatch.countDown() - } - } - - override def infoChanged(handle: SparkAppHandle): Unit = {} - }) - countDownLatch.await() - } catch { - case e: Exception => - logger.error(e.getMessage, e) - } - } - }) - - cdlForApplicationId.await() - logger.info( - "[StreamPark][YarnApplicationClient] The task is executing, handle current state is {}, appid is {}", - Array(sparkAppHandle.getState.toString, sparkAppHandle.getAppId)) - SubmitResponse(null, null, sparkAppHandle.getAppId) - } - - private def setDynamicProperties(sparkLauncher: SparkLauncher, properties: Map[String, Any]): Unit = { - logger.info("[StreamPark][YarnApplicationClient] Spark launcher start configuration.") - val finalProperties: Map[String, Any] = SparkConfiguration.defaultParameters ++ properties - for ((k, v) <- finalProperties) { - if (k.startsWith("spark.")) { - sparkLauncher.setConf(k, v.toString) - } else { - logger.info("[StreamPark][YarnApplicationClient] \"{}\" doesn't start with \"spark.\". Skip it.", k) - } + sparkLauncher.addAppArgs("--sql", submitRequest.extraParameter.get("sql").toString) } } diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala index 93f32aad01..af7778ebc8 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala @@ -21,6 +21,7 @@ import org.apache.streampark.common.util._ import org.apache.streampark.spark.client.bean._ import scala.collection.convert.ImplicitConversions._ +import scala.util.{Failure, Success, Try} trait SparkClientTrait extends Logger { @@ -31,7 +32,7 @@ trait SparkClientTrait extends Logger { |--------------------------------------- spark job start ----------------------------------- | userSparkHome : ${submitRequest.sparkVersion.sparkHome} | sparkVersion : ${submitRequest.sparkVersion.version} - | appName : ${submitRequest.appName} + | appName : ${submitRequest.effectiveAppName} | devMode : ${submitRequest.developmentMode.name()} | execMode : ${submitRequest.executionMode.name()} | applicationType : ${submitRequest.applicationType.getName} @@ -41,17 +42,19 @@ trait SparkClientTrait extends Logger { |------------------------------------------------------------------------------------------- |""".stripMargin) - submitRequest.developmentMode match { - case _ => - if (submitRequest.userJarFile != null) { - val uri = submitRequest.userJarFile.getAbsolutePath - } - } + prepareConfig(submitRequest) setConfig(submitRequest) - doSubmit(submitRequest) - + Try(doSubmit(submitRequest)) match { + case Success(resp) => resp + case Failure(e) => + logError( + s"spark job ${submitRequest.appName} start failed, " + + s"executionMode: ${submitRequest.executionMode.getName}, " + + s"detail: ${ExceptionUtils.stringifyException(e)}") + throw e + } } def setConfig(submitRequest: SubmitRequest): Unit @@ -78,4 +81,21 @@ trait SparkClientTrait extends Logger { @throws[Exception] def doStop(stopRequest: StopRequest): StopResponse + private def prepareConfig(submitRequest: SubmitRequest): Unit = { + // 1) set default config + val userConfig = submitRequest.properties.filter(c => { + val k = c._1 + if (k.startsWith("spark.")) { + true + } else { + logger.warn("[StreamPark] config {} doesn't start with \"spark.\" Skip it.", k) + false + } + }) + val defaultConfig = submitRequest.DEFAULT_SUBMIT_PARAM.filter(c => !userConfig.containsKey(c._1)) + submitRequest.properties.clear() + submitRequest.properties.putAll(userConfig) + submitRequest.properties.putAll(defaultConfig) + } + }