Skip to content

Commit

Permalink
Merge branch 'dev' into swagger
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys authored Jul 21, 2024
2 parents c6a63ad + 9998ca6 commit c4d9379
Show file tree
Hide file tree
Showing 168 changed files with 2,355 additions and 1,347 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ jobs:
strategy:
matrix:
case:
- name: ExternalLinkPageTest
class: org.apache.streampark.e2e.cases.ExternalLinkPageTest
- name: YarnQueueTest
class: org.apache.streampark.e2e.cases.YarnQueueTest
- name: TokenManagementTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.enums;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** The flink deployment mode enum. */
public enum SparkDevelopmentMode {

/** Unknown type replace null */
UNKNOWN("Unknown", -1),

/** custom code */
CUSTOM_CODE("Custom Code", 1),

/** spark SQL */
SPARK_SQL("Spark SQL", 2);

private final String name;

private final Integer mode;

SparkDevelopmentMode(@Nonnull String name, @Nonnull Integer mode) {
this.name = name;
this.mode = mode;
}

/**
* Try to resolve the mode value into {@link SparkDevelopmentMode}.
*
* @param value The mode value of potential flink deployment mode.
* @return The parsed flink deployment mode.
*/
@Nonnull
public static SparkDevelopmentMode valueOf(@Nullable Integer value) {
for (SparkDevelopmentMode flinkDevelopmentMode : values()) {
if (flinkDevelopmentMode.mode.equals(value)) {
return flinkDevelopmentMode;
}
}
return SparkDevelopmentMode.UNKNOWN;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public enum SparkExecutionMode {

/** yarn client */
YARN_CLIENT(3, "yarn-client");

private final Integer mode;

private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.common.conf

import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.common.util.Implicits._

import java.util.Properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.streampark.common.conf

import org.apache.streampark.common.util.{CommandUtils, Logger}
import org.apache.streampark.common.util.Implicits._

import java.io.File
import java.net.{URL => NetURL}
import java.net.URL
import java.util.function.Consumer
import java.util.regex.Pattern

import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable

/** @param flinkHome actual flink home that must be a readable local path */
class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logger {
class FlinkVersion(val flinkHome: String) extends Serializable with Logger {

private[this] lazy val FLINK_VER_PATTERN = Pattern.compile("^(\\d+\\.\\d+)(\\.)?.*$")

Expand Down Expand Up @@ -63,7 +63,7 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg
lib
}

lazy val flinkLibs: List[NetURL] = flinkLib.listFiles().map(_.toURI.toURL).toList
lazy val flinkLibs: List[URL] = flinkLib.listFiles().map(_.toURI.toURL).toList

lazy val version: String = {
val cmd = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package org.apache.streampark.common.conf

import org.apache.streampark.common.Constant
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.common.util.Implicits._

import javax.annotation.{Nonnull, Nullable}

import java.util
import java.util.concurrent.ConcurrentHashMap

import scala.collection.convert.ImplicitConversions._

/**
* Thread-safe configuration storage containers. All configurations will be automatically
* initialized from the spring configuration items of the same name.
Expand Down Expand Up @@ -123,7 +121,7 @@ object InternalConfigHolder extends Logger {

/** Get keys of all registered ConfigOption. */
@Nonnull
def keys(): util.Set[String] = {
def keys(): JavaSet[String] = {
val map = new util.HashMap[String, InternalOption](confOptions.size())
map.putAll(confOptions)
map.keySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,45 @@
package org.apache.streampark.common.conf

import org.apache.streampark.common.util.{CommandUtils, Logger}

import org.apache.commons.lang3.StringUtils
import org.apache.streampark.common.util.Implicits._

import java.io.File
import java.net.{URL => NetURL}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.net.URL
import java.util.function.Consumer
import java.util.regex.Pattern

import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable

/** @param sparkHome actual spark home that must be a readable local path */
class SparkVersion(val sparkHome: String) extends java.io.Serializable with Logger {
class SparkVersion(val sparkHome: String) extends Serializable with Logger {

private[this] lazy val SPARK_VER_PATTERN = Pattern.compile("^(\\d+\\.\\d+)(\\.)?.*$")

private[this] lazy val SPARK_VERSION_PATTERN = Pattern.compile("(version) (\\d+\\.\\d+\\.\\d+)")

private[this] lazy val SPARK_SCALA_VERSION_PATTERN =
Pattern.compile("^spark-core_(.*)-[0-9].*.jar$")
private[this] lazy val SPARK_SCALA_VERSION_PATTERN = Pattern.compile("^spark-core_(.*)-[0-9].*.jar$")

lazy val scalaVersion: String = {
val matcher = SPARK_SCALA_VERSION_PATTERN.matcher(sparkCoreJar.getName)
if (matcher.matches()) {
matcher.group(1);
} else {
"2.12"
lazy val scalaVersion: String = SPARK_SCALA_VERSION_PATTERN.matcher(sparkCoreJar.getName).group(1)

lazy val sparkCoreJar: File = {
val distJar = sparkLib.listFiles().filter(_.getName.matches("spark-core.*\\.jar"))
distJar match {
case x if x.isEmpty =>
throw new IllegalArgumentException(s"[StreamPark] can no found spark-core jar in $sparkLib")
case x if x.length > 1 =>
throw new IllegalArgumentException(
s"[StreamPark] found multiple spark-core jar in $sparkLib")
case _ =>
}
distJar.head
}

def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
case Array(3, v, _) if v >= 1 && v <= 3 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported flink version: $version")
throw new UnsupportedOperationException(s"Unsupported spark version: $version")
} else {
false
}
Expand All @@ -73,7 +75,7 @@ class SparkVersion(val sparkHome: String) extends java.io.Serializable with Logg
lib
}

lazy val sparkLibs: List[NetURL] = sparkLib.listFiles().map(_.toURI.toURL).toList
lazy val sparkLibs: List[URL] = sparkLib.listFiles().map(_.toURI.toURL).toList

lazy val majorVersion: String = {
if (version == null) {
Expand All @@ -86,9 +88,8 @@ class SparkVersion(val sparkHome: String) extends java.io.Serializable with Logg
}

lazy val version: String = {
val sparkVersion = new AtomicReference[String]
var sparkVersion: String = null
val cmd = List(s"$sparkHome/bin/spark-submit --version")
val success = new AtomicBoolean(false)
val buffer = new mutable.StringBuilder
CommandUtils.execute(
sparkHome,
Expand All @@ -97,31 +98,18 @@ class SparkVersion(val sparkHome: String) extends java.io.Serializable with Logg
override def accept(out: String): Unit = {
buffer.append(out).append("\n")
val matcher = SPARK_VERSION_PATTERN.matcher(out)
if (matcher.find && StringUtils.isBlank(sparkVersion.get())) {
success.set(true)
sparkVersion.set(matcher.group(2))
if (matcher.find) {
sparkVersion = matcher.group(2)
}
}
})

logInfo(buffer.toString())
if (!success.get()) {
if (sparkVersion == null) {
throw new IllegalStateException(s"[StreamPark] parse spark version failed. $buffer")
}
buffer.clear()
sparkVersion.get
}

lazy val sparkCoreJar: File = {
val distJar = sparkLib.listFiles().filter(_.getName.matches("spark-core.*\\.jar"))
distJar match {
case x if x.isEmpty =>
throw new IllegalArgumentException(s"[StreamPark] can no found spark-core jar in $sparkLib")
case x if x.length > 1 =>
throw new IllegalArgumentException(
s"[StreamPark] found multiple spark-core jar in $sparkLib")
case _ =>
}
distJar.head
sparkVersion
}

override def toString: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.streampark.common.conf

import org.apache.streampark.common.enums.StorageType
import org.apache.streampark.common.util.{HdfsUtils, SystemPropertyUtils}
import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.common.util.Implicits._

import java.net.URI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.streampark.common.util

import org.apache.streampark.common.util.Implicits._
import org.apache.streampark.common.util.Utils.isEmpty

import javax.annotation.Nullable

import java.util

import scala.collection.convert.ImplicitConversions._

/** @since 2.2.0 */
object AssertUtils {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.streampark.common.util

import java.util
import org.apache.streampark.common.util.Implicits._

import scala.collection.convert.ImplicitConversions._
import java.util

class CURLBuilder(val url: String) {

Expand Down
Loading

0 comments on commit c4d9379

Please sign in to comment.