diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index 8e98a7b92e..d91dc1d781 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -16,19 +16,20 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.util.ImplicitsUtils._ - import org.apache.commons.lang3.StringUtils import java.io._ import java.net.URL +import java.time.Duration import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID} +import java.util.concurrent.locks.LockSupport import java.util.jar.{JarFile, JarInputStream} +import scala.annotation.tailrec import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} -object Utils { +object Utils extends Logger { private[this] lazy val OS = System.getProperty("os.name").toLowerCase @@ -128,8 +129,9 @@ object Utils { c => { try { if (c != null) { - if (c.isInstanceOf[Flushable]) { - c.asInstanceOf[Flushable].flush() + c match { + case flushable: Flushable => flushable.flush() + case _ => } c.close() } @@ -139,4 +141,19 @@ object Utils { }) } + @tailrec + def retry[R](retryCount: Int, interval: Duration = Duration.ofSeconds(5))(f: => R): Try[R] = { + require(retryCount >= 0) + Try(f) match { + case Success(result) => Success(result) + case Failure(e) if retryCount > 0 => + logWarn(s"retry failed, execution caused by: ", e) + logWarn( + s"$retryCount times retry remaining, the next attempt will be in ${interval.toMillis} ms") + LockSupport.parkNanos(interval.toNanos) + retry(retryCount - 1, interval)(f) + case Failure(e) => Failure(e) + } + } + } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala index 6030cebb2b..fd844eaeec 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala @@ -243,46 +243,57 @@ object YarnUtils extends Logger { * @return */ def restRequest(url: String): String = { + if (url == null) return null - def request(reqUrl: String): String = { - logDebug("request url is " + reqUrl) - val config = RequestConfig.custom.setConnectTimeout(5000, TimeUnit.MILLISECONDS).build - if (hasYarnHttpKerberosAuth) { - HadoopUtils - .getUgi() - .doAs(new PrivilegedExceptionAction[String] { - override def run(): String = { - Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match { - case Success(v) => v - case Failure(e) => - logError("yarnUtils authRestRequest error, detail: ", e) - null - } - } - }) - } else { - val url = if (hasYarnHttpSampleAuth) { - s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}" - } else reqUrl - Try(HttpClientUtils.httpGetRequest(url, config)) match { + url match { + case u if u.matches("^http(|s)://.*") => + Try(request(url)) match { case Success(v) => v case Failure(e) => - logError("yarnUtils restRequest error, detail: ", e) + if (hasYarnHttpKerberosAuth) { + logError(s"yarnUtils authRestRequest error, url: $u, detail: $e") + } else { + logError(s"yarnUtils restRequest error, url: $u, detail: $e") + } null } - } + case _ => + Try(request(s"${getRMWebAppURL()}/$url")) match { + case Success(v) => v + case Failure(_) => + Utils.retry[String](5) { + request(s"${getRMWebAppURL(true)}/$url") + } match { + case Success(v) => v + case Failure(e) => + logError(s"yarnUtils restRequest retry 5 times all failed. detail: $e") + null + } + } } + } - url match { - case u if u.matches("^http(|s)://.*") => request(url) - case _ => - val resp = request(s"${getRMWebAppURL()}/$url") - if (resp != null) resp; + private[this] def request(reqUrl: String): String = { + val config = RequestConfig + .custom() + .setConnectTimeout(5000, TimeUnit.MILLISECONDS) + .build() + if (hasYarnHttpKerberosAuth) { + HadoopUtils + .getUgi() + .doAs(new PrivilegedExceptionAction[String] { + override def run(): String = { + HttpClientUtils.httpAuthGetRequest(reqUrl, config) + } + }) + } else { + val url = + if (!hasYarnHttpSampleAuth) reqUrl else { - request(s"${getRMWebAppURL(true)}/$url") + s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}" } + HttpClientUtils.httpGetRequest(url, config) } - } }