From ed8d3ead4c6bee4b42c176416b11da72aad79f53 Mon Sep 17 00:00:00 2001 From: Yuepeng Pan Date: Sun, 22 Oct 2023 22:09:02 +0800 Subject: [PATCH] [ISSUE-3062][Improve] Improve streampark-common module base on [5 Log] (#3265) --- .../streampark/common/conf/InternalConfigHolder.scala | 2 +- .../org/apache/streampark/common/util/HadoopUtils.scala | 6 +++--- .../org/apache/streampark/common/util/HdfsUtils.scala | 2 +- .../scala/org/apache/streampark/common/util/Utils.scala | 2 +- .../org/apache/streampark/common/util/YarnUtils.scala | 8 ++++---- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala index 5f00c20880..b19e494803 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala @@ -159,7 +159,7 @@ object InternalConfigHolder extends Logger { /** log the current configuration info. */ def log(): Unit = { val configKeys = keys() - logInfo(s"""registered configs: + logInfo(s"""Registered configs: |ConfigHub collected configs: ${configKeys.size} | ${configKeys .map( diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index 715afa00da..0ca6690945 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -121,7 +121,7 @@ object HadoopUtils extends Logger { val end = value.getEndTime.getTime ((end - start) * 0.90f).toLong case _ => - logWarn("get kerberos tgtRefreshTime failed, try get kerberos.ttl. ") + logWarn("Get kerberos tgtRefreshTime failed, try get kerberos.ttl. ") val timeUnit = DateUtils.getTimeUnit(InternalConfigHolder.get(CommonConfig.KERBEROS_TTL)) timeUnit._2 match { case TimeUnit.SECONDS => timeUnit._1 * 1000 @@ -199,7 +199,7 @@ object HadoopUtils extends Logger { } private[this] def getKerberosUGI(): UserGroupInformation = { - logInfo("kerberos login starting....") + logInfo("Kerberos login starting....") require( kerberosPrincipal.nonEmpty && kerberosKeytab.nonEmpty, @@ -221,7 +221,7 @@ object HadoopUtils extends Logger { val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(kerberosPrincipal, kerberosKeytab) UserGroupInformation.setLoginUser(ugi) - logInfo("kerberos authentication successful") + logInfo("Kerberos authentication successful") ugi } match { case Success(ugi) => ugi diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala index 2565841ba1..a8de42d0c5 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala @@ -139,7 +139,7 @@ object HdfsUtils extends Logger { if (HadoopUtils.hdfs.exists(path)) { HadoopUtils.hdfs.delete(path, true) } else { - logWarn(s"hdfs delete $src,but file $src is not exists!") + logWarn(s"HDFS delete $src, but file $src is not exists!") } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index d91dc1d781..2cd0080723 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -147,7 +147,7 @@ object Utils extends Logger { Try(f) match { case Success(result) => Success(result) case Failure(e) if retryCount > 0 => - logWarn(s"retry failed, execution caused by: ", e) + logWarn(s"Retry failed, execution caused by: ", e) logWarn( s"$retryCount times retry remaining, the next attempt will be in ${interval.toMillis} ms") LockSupport.parkNanos(interval.toNanos) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala index fd844eaeec..8496283628 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala @@ -141,12 +141,12 @@ object YarnUtils extends Logger { val activeRMId = { Option(RMHAUtils.findActiveRMHAId(yarnConf)) match { case Some(x) => - logInfo("findActiveRMHAId successful") + logInfo("'findActiveRMHAId' successful") x case None => // if you don't know why, don't modify it logWarn( - s"findActiveRMHAId is null,config yarn.acl.enable:${yarnConf.get("yarn.acl.enable")},now http try it.") + s"'findActiveRMHAId' is null,config yarn.acl.enable:${yarnConf.get("yarn.acl.enable")},now http try it.") // url ==> rmId val idUrlMap = new JavaHashMap[String, String] val rmIds = HAUtil.getRMHAIds(conf) @@ -181,7 +181,7 @@ object YarnUtils extends Logger { require( activeRMId != null, "[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn active node") - logInfo(s"current activeRMHAId: $activeRMId") + logInfo(s"Current activeRMHAId: $activeRMId") val appActiveRMKey = HAUtil.addSuffix(addressPrefix, activeRMId) val hostnameActiveRMKey = HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, activeRMId) @@ -220,7 +220,7 @@ object YarnUtils extends Logger { .append(address.getPort) .toString() } - logInfo(s"yarn resourceManager webapp url:$rmHttpURL") + logInfo(s"Yarn resourceManager webapp url:$rmHttpURL") } } rmHttpURL