Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] sync quick-install support from dev-2.1.4 #3763

Merged
merged 1 commit into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
import java.util.Scanner
import java.util.stream.Collectors

import scala.collection.convert.ImplicitConversions._
Expand Down Expand Up @@ -282,4 +283,21 @@ object FileUtils {
null
}

@throws[IOException]
def readString(file: File): String = {
require(file != null && file.isFile)
val reader = new FileReader(file)
val scanner = new Scanner(reader)
val buffer = new mutable.StringBuilder()
if (scanner.hasNextLine) {
buffer.append(scanner.nextLine())
}
while (scanner.hasNextLine) {
buffer.append("\r\n")
buffer.append(scanner.nextLine())
}
Utils.close(scanner, reader)
buffer.toString()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ object PropertiesUtils extends Logger {
})
.toMap
case text =>
val value = text match {
case null => ""
case other => other.toString
}
prefix match {
case "" => proper += k -> value
case other => proper += s"$other.$k" -> value
if (text != null) {
val value = text.toString.trim
prefix match {
case "" => proper += k -> value
case other => proper += s"$other.$k" -> value
}
}
proper.toMap
}
Expand Down Expand Up @@ -276,7 +275,7 @@ object PropertiesUtils extends Logger {

/** extract flink configuration from application.properties */
@Nonnull def extractDynamicProperties(properties: String): Map[String, String] = {
if (StringUtils.isBlank(properties)) Map.empty[String, String]
if (StringUtils.isEmpty(properties)) Map.empty[String, String]
else {
val map = mutable.Map[String, String]()
val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
Expand Down Expand Up @@ -308,28 +307,76 @@ object PropertiesUtils extends Logger {
@Nonnull def extractArguments(args: String): List[String] = {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(args)) {
val array = args.split("\\s+")
val iter = array.iterator
while (iter.hasNext) {
val v = iter.next()
val p = v.take(1)
p match {
case "'" | "\"" =>
var value = v
if (!v.endsWith(p)) {
while (!value.endsWith(p) && iter.hasNext) {
value += s" ${iter.next()}"
}
return extractArguments(args.split("\\s+"))
}
programArgs.toList
}

def extractArguments(array: Array[String]): List[String] = {
val programArgs = new ArrayBuffer[String]()
val iter = array.iterator
while (iter.hasNext) {
val v = iter.next()
val p = v.take(1)
p match {
case "'" | "\"" =>
var value = v
if (!v.endsWith(p)) {
while (!value.endsWith(p) && iter.hasNext) {
value += s" ${iter.next()}"
}
programArgs += value.substring(1, value.length - 1)
case _ => programArgs += v
}
}
programArgs += value.substring(1, value.length - 1)
case _ =>
val regexp = "(.*)='(.*)'$"
if (v.matches(regexp)) {
programArgs += v.replaceAll(regexp, "$1=$2")
} else {
val regexp = "(.*)=\"(.*)\"$"
if (v.matches(regexp)) {
programArgs += v.replaceAll(regexp, "$1=$2")
} else {
programArgs += v
}
}
}
}
programArgs.toList
}

def extractMultipleArguments(array: Array[String]): Map[String, Map[String, String]] = {
val iter = array.iterator
val map = mutable.Map[String, mutable.Map[String, String]]()
while (iter.hasNext) {
val v = iter.next()
v.take(2) match {
case "--" =>
val kv = iter.next()
val regexp = "(.*)=(.*)"
if (kv.matches(regexp)) {
val values = kv.split("=")
val k1 = values(0).trim
val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
val k = v.drop(2)
map.get(k) match {
case Some(m) => m += k1 -> v1
case _ => map += k -> mutable.Map(k1 -> v1)
}
}
case _ =>
}
}
map.map(x => x._1 -> x._2.toMap).toMap
}

@Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] =
new JavaMap[String, String](extractDynamicProperties(properties).asJava)

@Nonnull def extractMultipleArgumentsAsJava(
args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
val map =
extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String, String](c._2.asJava))
new JavaMap[String, JavaMap[String, String]](map.asJava)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ APP_BASE="$APP_HOME"
APP_CONF="$APP_BASE"/conf
APP_LIB="$APP_BASE"/lib
APP_LOG="$APP_BASE"/logs
APP_PID="$APP_BASE"/streampark.pid
APP_PID="$APP_BASE"/.pid
APP_OUT="$APP_LOG"/streampark.out
# shellcheck disable=SC2034
APP_TMPDIR="$APP_BASE"/temp
Expand Down Expand Up @@ -241,10 +241,16 @@ if [[ "$USE_NOHUP" = "true" ]]; then
NOHUP="nohup"
fi

BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
CONFIG="${APP_CONF}/config.yaml"
# shellcheck disable=SC2006
if [[ ! -f "$CONFIG" ]] ; then
echo_r "can not found config.yaml in \"conf\" directory, please check."
exit 1;
fi

BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"

SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG")
JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh

JVM_ARGS=""
Expand Down Expand Up @@ -276,21 +282,8 @@ print_logo() {
printf ' %s WebSite: https://streampark.apache.org%s\n' $BLUE $RESET
printf ' %s GitHub : http://github.com/apache/streampark%s\n\n' $BLUE $RESET
printf ' %s ──────── Apache StreamPark, Make stream processing easier ô~ô!%s\n\n' $PRIMARY $RESET
}

init_env() {
# shellcheck disable=SC2006
CONFIG="${APP_CONF}/application.yml"
if [[ -f "$CONFIG" ]] ; then
echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" file. The \"application.yml\" file is deprecated.
For compatibility, this application.yml will be used preferentially. The latest configuration file is \"config.yaml\". It is recommended to use \"config.yaml\".
Note: \"application.yml\" will be completely deprecated in version 2.2.0. """
else
CONFIG="${APP_CONF}/config.yaml"
if [[ ! -f "$CONFIG" ]] ; then
echo_r "can not found config.yaml in \"conf\" directory, please check."
exit 1;
fi
if [[ "$1"x == "start"x ]]; then
printf ' %s http://localhost:%s %s\n\n' $PRIMARY $SERVER_PORT $RESET
fi
}

Expand All @@ -313,19 +306,19 @@ get_pid() {
fi

# shellcheck disable=SC2006
local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG"`
if [[ x"${serverPort}" == x"" ]]; then
if [[ "${SERVER_PORT}"x == ""x ]]; then
echo_r "server.port is required, please check $CONFIG"
exit 1;
else
# shellcheck disable=SC2006
# shellcheck disable=SC2155
local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$serverPort"`
if [[ x"${used}" == x"used" ]]; then
local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$SERVER_PORT"`
if [[ "${used}"x == "used"x ]]; then
# shellcheck disable=SC2006
local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
# shellcheck disable=SC2236
if [[ ! -z $PID ]]; then
echo $PID
echo "$PID"
else
echo 0
fi
Expand Down Expand Up @@ -411,7 +404,7 @@ start() {
-Dapp.home="${APP_HOME}" \
-Dlogging.config="${APP_CONF}/logback-spring.xml" \
-Djava.io.tmpdir="$APP_TMPDIR" \
$APP_MAIN >> "$APP_OUT" 2>&1 "&"
$APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&"

local PID=$!
local IS_NUMBER="^[0-9]+$"
Expand Down Expand Up @@ -565,27 +558,31 @@ restart() {
}

main() {
print_logo
init_env
case "$1" in
"debug")
DEBUG_PORT=$2
debug
;;
"start")
start
shift
start "$@"
[[ $? -eq 0 ]] && print_logo "start"
;;
"start_docker")
print_logo
start_docker
;;
"stop")
print_logo
stop
;;
"status")
print_logo
status
;;
"restart")
restart
[[ $? -eq 0 ]] && print_logo "start"
;;
*)
echo_r "Unknown command: $1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,30 @@

package org.apache.streampark.console.base.util;

import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.PropertiesUtils;

import org.apache.commons.io.output.NullOutputStream;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.ServerSocket;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Map;

public class BashJavaUtils {

private static String localhost = "localhost";

public static void main(String[] args) throws IOException {
String action = args[0].toLowerCase();
String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
Expand All @@ -39,12 +54,77 @@ public static void main(String[] args) throws IOException {
System.out.println(value);
break;
case "--check_port":
Integer port = Integer.parseInt(actionArgs[0]);
try {
new ServerSocket(port);
int port = Integer.parseInt(actionArgs[0]);
try (Socket ignored = new Socket(localhost, port)) {
System.out.println("used");
} catch (Exception e) {
System.out.println("free");
}
break;
case "--free_port":
int start = Integer.parseInt(actionArgs[0]);
for (port = start; port < 65535; port++) {
try (Socket ignored = new Socket(localhost, port)) {
} catch (Exception e) {
System.out.println(port);
break;
}
}
break;
case "--read_flink":
String input = actionArgs[0];
String[] inputs = input.split(":");
String flinkDist =
Arrays.stream(inputs).filter(c -> c.contains("flink-dist-")).findFirst().get();
File flinkHome = new File(flinkDist.replaceAll("/lib/.*", ""));
FlinkVersion flinkVersion = new FlinkVersion(flinkHome.getAbsolutePath());

PrintStream originalOut = System.out;
System.setOut(new PrintStream(new NullOutputStream()));

String version = flinkVersion.majorVersion();
float ver = Float.parseFloat(version);
File yaml =
new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" : "/conf/config.yaml");

Map<String, String> config = PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath());
String flinkPort = config.getOrDefault("rest.port", "8081");
System.setOut(originalOut);
System.out.println(
flinkHome
.getAbsolutePath()
.concat(",")
.concat(flinkHome.getName())
.concat(",")
.concat(flinkPort));
break;
case "--replace":
String filePath = actionArgs[0];
String[] text = actionArgs[1].split("\\|\\|");
String searchText = text[0];
String replaceText = text[1];
try {
File file = new File(filePath);
String content = FileUtils.readString(file);
content = content.replace(searchText, replaceText);
FileWriter writer = new FileWriter(filePath);
writer.write(content);
writer.flush();
writer.close();
System.exit(0);
} catch (IOException e) {
System.exit(1);
}
break;
case "--download":
try {
URL url = new URL(actionArgs[0]);
Path path = Paths.get(actionArgs[1]).toAbsolutePath().normalize();
try (InputStream inStream = url.openStream()) {
Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
}
} catch (Exception e) {
System.out.println("used");
System.exit(1);
}
break;
default:
Expand Down
Loading
Loading