Skip to content

Commit

Permalink
[Improve] Yarn request retry improvement (#3217)
Browse files Browse the repository at this point in the history
Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Oct 7, 2023
1 parent 5755289 commit fe98be8
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@
*/
package org.apache.streampark.common.util

import org.apache.streampark.common.util.ImplicitsUtils._

import org.apache.commons.lang3.StringUtils

import java.io._
import java.net.URL
import java.time.Duration
import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID}
import java.util.concurrent.locks.LockSupport
import java.util.jar.{JarFile, JarInputStream}

import scala.annotation.tailrec
import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}

object Utils {
object Utils extends Logger {

private[this] lazy val OS = System.getProperty("os.name").toLowerCase

Expand Down Expand Up @@ -128,8 +129,9 @@ object Utils {
c => {
try {
if (c != null) {
if (c.isInstanceOf[Flushable]) {
c.asInstanceOf[Flushable].flush()
c match {
case flushable: Flushable => flushable.flush()
case _ =>
}
c.close()
}
Expand All @@ -139,4 +141,19 @@ object Utils {
})
}

@tailrec
def retry[R](retryCount: Int, interval: Duration = Duration.ofSeconds(5))(f: => R): Try[R] = {
require(retryCount >= 0)
Try(f) match {
case Success(result) => Success(result)
case Failure(e) if retryCount > 0 =>
logWarn(s"retry failed, execution caused by: ", e)
logWarn(
s"$retryCount times retry remaining, the next attempt will be in ${interval.toMillis} ms")
LockSupport.parkNanos(interval.toNanos)
retry(retryCount - 1, interval)(f)
case Failure(e) => Failure(e)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,46 +243,57 @@ object YarnUtils extends Logger {
* @return
*/
def restRequest(url: String): String = {
if (url == null) return null

def request(reqUrl: String): String = {
logDebug("request url is " + reqUrl)
val config = RequestConfig.custom.setConnectTimeout(5000, TimeUnit.MILLISECONDS).build
if (hasYarnHttpKerberosAuth) {
HadoopUtils
.getUgi()
.doAs(new PrivilegedExceptionAction[String] {
override def run(): String = {
Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match {
case Success(v) => v
case Failure(e) =>
logError("yarnUtils authRestRequest error, detail: ", e)
null
}
}
})
} else {
val url = if (hasYarnHttpSampleAuth) {
s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
} else reqUrl
Try(HttpClientUtils.httpGetRequest(url, config)) match {
url match {
case u if u.matches("^http(|s)://.*") =>
Try(request(url)) match {
case Success(v) => v
case Failure(e) =>
logError("yarnUtils restRequest error, detail: ", e)
if (hasYarnHttpKerberosAuth) {
logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
} else {
logError(s"yarnUtils restRequest error, url: $u, detail: $e")
}
null
}
}
case _ =>
Try(request(s"${getRMWebAppURL()}/$url")) match {
case Success(v) => v
case Failure(_) =>
Utils.retry[String](5) {
request(s"${getRMWebAppURL(true)}/$url")
} match {
case Success(v) => v
case Failure(e) =>
logError(s"yarnUtils restRequest retry 5 times all failed. detail: $e")
null
}
}
}
}

url match {
case u if u.matches("^http(|s)://.*") => request(url)
case _ =>
val resp = request(s"${getRMWebAppURL()}/$url")
if (resp != null) resp;
private[this] def request(reqUrl: String): String = {
val config = RequestConfig
.custom()
.setConnectTimeout(5000, TimeUnit.MILLISECONDS)
.build()
if (hasYarnHttpKerberosAuth) {
HadoopUtils
.getUgi()
.doAs(new PrivilegedExceptionAction[String] {
override def run(): String = {
HttpClientUtils.httpAuthGetRequest(reqUrl, config)
}
})
} else {
val url =
if (!hasYarnHttpSampleAuth) reqUrl
else {
request(s"${getRMWebAppURL(true)}/$url")
s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
}
HttpClientUtils.httpGetRequest(url, config)
}

}

}

0 comments on commit fe98be8

Please sign in to comment.