From ee655e8869e18d05701a3428f2d1b481b98243da Mon Sep 17 00:00:00 2001 From: chaogefeng <673120261@163.com> Date: Tue, 24 Sep 2019 13:14:05 +0800 Subject: [PATCH] Added JDBC engine support and updated JDBC installation scripts Closes #49 --- bin/install.sh | 19 +- bin/start-all.sh | 20 ++ bin/stop-all.sh | 17 + conf/config.sh | 6 + .../jdbc/entrance/bin/start-jdbcentrance.sh | 31 ++ .../jdbc/entrance/bin/stop-jdbcentrance.sh | 47 +++ ujes/definedEngines/jdbc/entrance/pom.xml | 136 ++++++++ .../src/main/assembly/distribution.xml | 320 ++++++++++++++++++ .../conf/executer/ConnectionManager.java | 198 +++++++++++ .../src/main/resources/application.yml | 23 ++ .../src/main/resources/linkis.properties | 32 ++ .../src/main/resources/log4j.properties | 36 ++ .../entrance/src/main/resources/log4j2.xml | 38 +++ .../entrance/conf/JDBCConfiguration.scala | 24 ++ .../conf/JDBCSpringConfiguration.scala | 55 +++ .../JDBCParamsIllegalException.scala | 24 ++ .../entrance/execute/JDBCEntranceJob.scala | 61 ++++ .../execute/JDBCJobExecuteRequest.scala | 22 ++ .../executer/JDBCEngineExecutor.scala | 212 ++++++++++++ .../JDBCEngineExecutorManagerImpl.scala | 107 ++++++ .../linkis/entrance/executer/JDBCHelper.java | 166 +++++++++ .../entrance/executer/JDBCSQLCodeParser.scala | 68 ++++ .../entrance/parser/JDBCEntranceParser.scala | 49 +++ 23 files changed, 1710 insertions(+), 1 deletion(-) create mode 100644 ujes/definedEngines/jdbc/entrance/bin/start-jdbcentrance.sh create mode 100644 ujes/definedEngines/jdbc/entrance/bin/stop-jdbcentrance.sh create mode 100644 ujes/definedEngines/jdbc/entrance/pom.xml create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/assembly/distribution.xml create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/java/com/webank/wedatasphere/linkis/entrance/conf/executer/ConnectionManager.java create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/resources/application.yml create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/resources/linkis.properties create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/resources/log4j.properties create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/resources/log4j2.xml create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCConfiguration.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCSpringConfiguration.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/exception/JDBCParamsIllegalException.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCEntranceJob.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCJobExecuteRequest.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutor.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutorManagerImpl.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCHelper.java create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCSQLCodeParser.scala create mode 100644 ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/parser/JDBCEntranceParser.scala diff --git a/bin/install.sh b/bin/install.sh index a799e6acee..d1431b8c16 100644 --- a/bin/install.sh +++ b/bin/install.sh @@ -423,4 +423,21 @@ ssh $SERVER_IP "sed -i \"s#wds.linkis.entrance.config.logPath.*#wds.linkis.entr ssh $SERVER_IP "sed -i \"s#wds.linkis.resultSet.store.path.*#wds.linkis.resultSet.store.path=$HDFS_USER_ROOT_PATH#g\" $SERVER_CONF_PATH" isSuccess "subsitution linkis.properties of $SERVERNAME" echo "<----------------$SERVERNAME:end------------------->" -##SparkEntrance install end \ No newline at end of file +##SparkEntrance install end + + +##JDBCEntrance install +PACKAGE_DIR=linkis/ujes/jdbc +SERVERNAME=linkis-ujes-jdbc-entrance +SERVER_PORT=$JDBC_ENTRANCE_PORT +###install dir +installPackage +###update linkis.properties +echo "$SERVERNAME-step4:update linkis conf" +SERVER_CONF_PATH=$SERVER_HOME/$SERVERNAME/conf/linkis.properties +ssh $SERVER_IP "sed -i \"s#wds.linkis.entrance.config.logPath.*#wds.linkis.entrance.config.logPath=$WORKSPACE_USER_ROOT_PATH#g\" $SERVER_CONF_PATH" +ssh $SERVER_IP "sed -i \"s#wds.linkis.resultSet.store.path.*#wds.linkis.resultSet.store.path=$HDFS_USER_ROOT_PATH#g\" $SERVER_CONF_PATH" +ssh $SERVER_IP "cd $SERVER_HOME/$SERVERNAME/lib;" $SERVER_CONF_PATH" +isSuccess "subsitution linkis.properties of $SERVERNAME" +echo "<----------------$SERVERNAME:end------------------->" +##SparkEntrance install end diff --git a/bin/start-all.sh b/bin/start-all.sh index 717e0f26b1..a76a857d40 100644 --- a/bin/start-all.sh +++ b/bin/start-all.sh @@ -229,6 +229,26 @@ echo "<-------------------------------->" sleep 3 + + +#JDBCEntrance +echo "<-------------------------------->" +echo "Begin to start JDBC Entrance" +JDBC_ENTRANCE_NAME="ujes-jdbc-entrance" +JDBC_ENTRANCE_BIN=${LINKIS_INSTALL_HOME}/${APP_PREFIX}${JDBC_ENTRANCE_NAME}/bin +JDBC_ENTRANCE_START_CMD="if [ -d ${JDBC_ENTRANCE_BIN} ];then cd ${JDBC_ENTRANCE_BIN}; dos2unix ./* > /dev/null 2>&1; dos2unix ../conf/* > /dev/null 2>&1; sh start-jdbcentrance.sh > /dev/null;else echo 'WARNING:JDBC Entrance will not start';fi" +if [ -n "${JDBC_INSTALL_IP}" ];then + ssh ${JDBC_INSTALL_IP} "${JDBC_ENTRANCE_START_CMD}" +else + ssh ${local_host} "${JDBC_ENTRANCE_START_CMD}" +fi +echo "End to start JDBC Entrance" +echo "<-------------------------------->" + +sleep 3 + + + ##PipelineEntrance #echo "Pipeline Entrance is Starting" #PIPELINE_ENTRANCE_NAME="ujes-pipeline-entrance" diff --git a/bin/stop-all.sh b/bin/stop-all.sh index 9794403c1b..51c73c54ae 100644 --- a/bin/stop-all.sh +++ b/bin/stop-all.sh @@ -221,6 +221,23 @@ echo "<-------------------------------->" +#JDBCEntrance +echo "<-------------------------------->" +echo "Begin to stop JDBC Entrance" +JDBC_ENTRANCE_NAME="ujes-jdbc-entrance" +JDBC_ENTRANCE_BIN=${LINKIS_INSTALL_HOME}/${APP_PREFIX}${JDBC_ENTRANCE_NAME}/bin +JDBC_ENTRANCE_STOP_CMD="if [ -d ${JDBC_ENTRANCE_BIN} ];then cd ${JDBC_ENTRANCE_BIN}; dos2unix ./* > /dev/null 2>&1; dos2unix ../conf/* > /dev/null 2>&1; sh stop-jdbcentrance.sh > /dev/null;else echo 'WARNING:JDBC Entrance will not start';fi" +if [ -n "${JDBC_INSTALL_IP}" ];then + ssh ${JDBC_INSTALL_IP} "${JDBC_ENTRANCE_STOP_CMD}" +else + ssh ${local_host} "${JDBC_ENTRANCE_STOP_CMD}" +fi +echo "End to stop JDBC Entrance" +echo "<-------------------------------->" + +sleep 3 + + ##PipelineEntrance #echo "Pipeline Entrance is Stoping" #PIPELINE_ENTRANCE_NAME="ujes-pipeline-entrance" diff --git a/conf/config.sh b/conf/config.sh index 2dfe6118b5..d846685b21 100644 --- a/conf/config.sh +++ b/conf/config.sh @@ -76,6 +76,12 @@ PYTHON_EM_PORT=12001 PYTHON_ENTRANCE_PORT=12002 +### JDBC +### This service is used to provide jdbc capability. +JDBC_INSTALL_IP=127.0.0.1 +JDBC_ENTRANCE_PORT=9888 + + ######################################################################################## diff --git a/ujes/definedEngines/jdbc/entrance/bin/start-jdbcentrance.sh b/ujes/definedEngines/jdbc/entrance/bin/start-jdbcentrance.sh new file mode 100644 index 0000000000..da572e24ff --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/bin/start-jdbcentrance.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +cd `dirname $0` +cd .. +HOME=`pwd` +export DWS_ENGINE_MANAGER_HOME=$HOME + +export DWS_ENGINE_MANAGER_PID=$HOME/bin/linkis.pid + +if [[ -f "${DWS_ENGINE_MANAGER_PID}" ]]; then + pid=$(cat ${DWS_ENGINE_MANAGER_PID}) + if kill -0 ${pid} >/dev/null 2>&1; then + echo "JDBC Entrance is already running." + return 0; + fi +fi + +export DWS_ENGINE_MANAGER_LOG_PATH=$HOME/logs +export DWS_ENGINE_MANAGER_HEAP_SIZE="1G" +export DWS_ENGINE_MANAGER_JAVA_OPTS="-Xms$DWS_ENGINE_MANAGER_HEAP_SIZE -Xmx$DWS_ENGINE_MANAGER_HEAP_SIZE -XX:+UseG1GC -XX:MaxPermSize=500m -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=11716" + +nohup java $DWS_ENGINE_MANAGER_JAVA_OPTS -cp $HOME/conf:$HOME/lib/* com.webank.wedatasphere.linkis.DataWorkCloudApplication 2>&1 > $DWS_ENGINE_MANAGER_LOG_PATH/linkis.out & +pid=$! +if [[ -z "${pid}" ]]; then + echo "JDBC Entrance start failed!" + exit 1 +else + echo "JDBC Entrance start succeeded!" + echo $pid > $DWS_ENGINE_MANAGER_PID + sleep 1 +fi diff --git a/ujes/definedEngines/jdbc/entrance/bin/stop-jdbcentrance.sh b/ujes/definedEngines/jdbc/entrance/bin/stop-jdbcentrance.sh new file mode 100644 index 0000000000..5182a964d6 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/bin/stop-jdbcentrance.sh @@ -0,0 +1,47 @@ +#!/bin/bash + +cd `dirname $0` +cd .. +HOME=`pwd` + +export DWS_ENGINE_MANAGER_PID=$HOME/bin/linkis.pid + +function wait_for_DWS_ENGINE_MANAGER_to_die() { + local pid + local count + pid=$1 + timeout=$2 + count=0 + timeoutTime=$(date "+%s") + let "timeoutTime+=$timeout" + currentTime=$(date "+%s") + forceKill=1 + + while [[ $currentTime -lt $timeoutTime ]]; do + $(kill ${pid} > /dev/null 2> /dev/null) + if kill -0 ${pid} > /dev/null 2>&1; then + sleep 3 + else + forceKill=0 + break + fi + currentTime=$(date "+%s") + done + + if [[ forceKill -ne 0 ]]; then + $(kill -9 ${pid} > /dev/null 2> /dev/null) + fi +} + +if [[ ! -f "${DWS_ENGINE_MANAGER_PID}" ]]; then + echo "JDBC Entrance is not running" +else + pid=$(cat ${DWS_ENGINE_MANAGER_PID}) + if [[ -z "${pid}" ]]; then + echo "JDBC Entrance is not running" + else + wait_for_DWS_ENGINE_MANAGER_to_die $pid 40 + $(rm -f ${DWS_ENGINE_MANAGER_PID}) + echo "JDBC Entrance is stopped." + fi +fi diff --git a/ujes/definedEngines/jdbc/entrance/pom.xml b/ujes/definedEngines/jdbc/entrance/pom.xml new file mode 100644 index 0000000000..00ca563937 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/pom.xml @@ -0,0 +1,136 @@ + + + + + + + linkis + com.webank.wedatasphere.linkis + 0.9.0 + + 4.0.0 + + linkis-jdbc-entrance + + + + + com.webank.wedatasphere.linkis + linkis-ujes-entrance + ${linkis.version} + + + com.typesafe + config + 1.2.1 + + + org.postgresql + postgresql + 42.2.6 + + + + org.apache.hive + hive-jdbc + 1.2.1 + + + org.apache.hive + hive-shims + + + org.eclipse.jetty.aggregate + jetty-all + + + + + + + org.apache.thrift + libthrift + 0.9.2 + pom + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-assembly-plugin + 2.3 + false + + + make-assembly + package + + single + + + + src/main/assembly/distribution.xml + + + + + + false + linkis-ujes-jdbc-entrance + false + false + + src/main/assembly/distribution.xml + + + + + + + ${basedir}/src/main/resources + + **/*.properties + **/*.xml + **/*.yml + + + + ${project.artifactId}-${project.version} + + + \ No newline at end of file diff --git a/ujes/definedEngines/jdbc/entrance/src/main/assembly/distribution.xml b/ujes/definedEngines/jdbc/entrance/src/main/assembly/distribution.xml new file mode 100644 index 0000000000..8a45752f23 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/assembly/distribution.xml @@ -0,0 +1,320 @@ + + + + linkis-ujes-jdbc-entrance + + zip + + true + linkis-ujes-jdbc-entrance + + + + + + lib + true + true + false + true + true + + + antlr:antlr:jar + aopalliance:aopalliance:jar + asm:asm:jar + cglib:cglib:jar + com.amazonaws:aws-java-sdk-autoscaling:jar + com.amazonaws:aws-java-sdk-core:jar + com.amazonaws:aws-java-sdk-ec2:jar + com.amazonaws:aws-java-sdk-route53:jar + com.amazonaws:aws-java-sdk-sts:jar + com.amazonaws:jmespath-java:jar + com.fasterxml.jackson.core:jackson-annotations:jar + com.fasterxml.jackson.core:jackson-core:jar + com.fasterxml.jackson.core:jackson-databind:jar + com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar + com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar + com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar + com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar + com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar + com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar + com.fasterxml.jackson.module:jackson-module-parameter-names:jar + com.fasterxml.jackson.module:jackson-module-paranamer:jar + com.fasterxml.jackson.module:jackson-module-scala_2.11:jar + com.github.andrewoma.dexx:dexx-collections:jar + com.github.vlsi.compactmap:compactmap:jar + com.google.code.findbugs:annotations:jar + com.google.code.findbugs:jsr305:jar + com.google.code.gson:gson:jar + com.google.guava:guava:jar + com.google.inject:guice:jar + com.google.protobuf:protobuf-java:jar + com.netflix.archaius:archaius-core:jar + com.netflix.eureka:eureka-client:jar + com.netflix.eureka:eureka-core:jar + com.netflix.hystrix:hystrix-core:jar + com.netflix.netflix-commons:netflix-commons-util:jar + com.netflix.netflix-commons:netflix-eventbus:jar + com.netflix.netflix-commons:netflix-infix:jar + com.netflix.netflix-commons:netflix-statistics:jar + com.netflix.ribbon:ribbon:jar + com.netflix.ribbon:ribbon-core:jar + com.netflix.ribbon:ribbon-eureka:jar + com.netflix.ribbon:ribbon-httpclient:jar + com.netflix.ribbon:ribbon-loadbalancer:jar + com.netflix.ribbon:ribbon-transport:jar + com.netflix.servo:servo-core:jar + com.ning:async-http-client:jar + com.sun.jersey.contribs:jersey-apache-client4:jar + com.sun.jersey:jersey-client:jar + com.sun.jersey:jersey-core:jar + com.sun.jersey:jersey-json:jar + com.sun.jersey:jersey-server:jar + com.sun.jersey:jersey-servlet:jar + com.sun.xml.bind:jaxb-impl:jar + com.thoughtworks.paranamer:paranamer:jar + com.thoughtworks.xstream:xstream:jar + com.webank.wedatasphere.linkis:linkis-common:jar + com.webank.wedatasphere.linkis:linkis-module:jar + commons-beanutils:commons-beanutils:jar + commons-beanutils:commons-beanutils-core:jar + commons-cli:commons-cli:jar + commons-codec:commons-codec:jar + commons-collections:commons-collections:jar + commons-configuration:commons-configuration:jar + commons-daemon:commons-daemon:jar + commons-dbcp:commons-dbcp:jar + commons-digester:commons-digester:jar + commons-httpclient:commons-httpclient:jar + commons-io:commons-io:jar + commons-jxpath:commons-jxpath:jar + commons-lang:commons-lang:jar + commons-logging:commons-logging:jar + commons-net:commons-net:jar + commons-pool:commons-pool:jar + io.micrometer:micrometer-core:jar + io.netty:netty:jar + io.netty:netty-all:jar + io.netty:netty-buffer:jar + io.netty:netty-codec:jar + io.netty:netty-codec-http:jar + io.netty:netty-common:jar + io.netty:netty-handler:jar + io.netty:netty-transport:jar + io.netty:netty-transport-native-epoll:jar + io.reactivex:rxjava:jar + io.reactivex:rxnetty:jar + io.reactivex:rxnetty-contexts:jar + io.reactivex:rxnetty-servo:jar + javax.activation:activation:jar + javax.annotation:javax.annotation-api:jar + javax.inject:javax.inject:jar + javax.servlet:javax.servlet-api:jar + javax.servlet.jsp:jsp-api:jar + javax.validation:validation-api:jar + javax.websocket:javax.websocket-api:jar + javax.ws.rs:javax.ws.rs-api:jar + javax.xml.bind:jaxb-api:jar + javax.xml.stream:stax-api:jar + joda-time:joda-time:jar + log4j:log4j:jar + mysql:mysql-connector-java:jar + net.databinder.dispatch:dispatch-core_2.11:jar + net.databinder.dispatch:dispatch-json4s-jackson_2.11:jar + org.antlr:antlr-runtime:jar + org.antlr:stringtemplate:jar + org.apache.commons:commons-compress:jar + org.apache.commons:commons-math:jar + org.apache.commons:commons-math3:jar + org.apache.curator:curator-client:jar + org.apache.curator:curator-framework:jar + org.apache.curator:curator-recipes:jar + org.apache.directory.api:api-asn1-api:jar + org.apache.directory.api:api-util:jar + org.apache.directory.server:apacheds-i18n:jar + org.apache.directory.server:apacheds-kerberos-codec:jar + org.apache.hadoop:hadoop-annotations:jar + org.apache.hadoop:hadoop-auth:jar + org.apache.hadoop:hadoop-common:jar + org.apache.hadoop:hadoop-hdfs:jar + org.apache.htrace:htrace-core:jar + org.apache.httpcomponents:httpclient:jar + org.apache.httpcomponents:httpcore:jar + org.apache.logging.log4j:log4j-api:jar + org.apache.logging.log4j:log4j-core:jar + org.apache.logging.log4j:log4j-jul:jar + org.apache.logging.log4j:log4j-slf4j-impl:jar + org.apache.zookeeper:zookeeper:jar + org.aspectj:aspectjweaver:jar + org.bouncycastle:bcpkix-jdk15on:jar + org.bouncycastle:bcprov-jdk15on:jar + org.codehaus.jackson:jackson-jaxrs:jar + org.codehaus.jackson:jackson-xc:jar + org.codehaus.jettison:jettison:jar + org.codehaus.woodstox:stax2-api:jar + org.codehaus.woodstox:woodstox-core-asl:jar + org.eclipse.jetty:jetty-annotations:jar + org.eclipse.jetty:jetty-client:jar + org.eclipse.jetty:jetty-continuation:jar + org.eclipse.jetty:jetty-http:jar + org.eclipse.jetty:jetty-io:jar + org.eclipse.jetty:jetty-jndi:jar + org.eclipse.jetty:jetty-plus:jar + org.eclipse.jetty:jetty-security:jar + org.eclipse.jetty:jetty-server:jar + org.eclipse.jetty:jetty-servlet:jar + org.eclipse.jetty:jetty-servlets:jar + org.eclipse.jetty:jetty-util:jar + org.eclipse.jetty:jetty-webapp:jar + org.eclipse.jetty:jetty-xml:jar + org.eclipse.jetty.websocket:javax-websocket-client-impl:jar + org.eclipse.jetty.websocket:javax-websocket-server-impl:jar + org.eclipse.jetty.websocket:websocket-api:jar + org.eclipse.jetty.websocket:websocket-client:jar + org.eclipse.jetty.websocket:websocket-common:jar + org.eclipse.jetty.websocket:websocket-server:jar + org.eclipse.jetty.websocket:websocket-servlet:jar + org.fusesource.leveldbjni:leveldbjni-all:jar + org.glassfish.hk2:class-model:jar + org.glassfish.hk2:config-types:jar + org.glassfish.hk2.external:aopalliance-repackaged:jar + org.glassfish.hk2.external:asm-all-repackaged:jar + org.glassfish.hk2.external:bean-validator:jar + org.glassfish.hk2.external:javax.inject:jar + org.glassfish.hk2:hk2:jar + org.glassfish.hk2:hk2-api:jar + org.glassfish.hk2:hk2-config:jar + org.glassfish.hk2:hk2-core:jar + org.glassfish.hk2:hk2-locator:jar + org.glassfish.hk2:hk2-runlevel:jar + org.glassfish.hk2:hk2-utils:jar + org.glassfish.hk2:osgi-resource-locator:jar + org.glassfish.hk2:spring-bridge:jar + org.glassfish.jersey.bundles:jaxrs-ri:jar + org.glassfish.jersey.bundles.repackaged:jersey-guava:jar + org.glassfish.jersey.containers:jersey-container-servlet:jar + org.glassfish.jersey.containers:jersey-container-servlet-core:jar + org.glassfish.jersey.core:jersey-client:jar + org.glassfish.jersey.core:jersey-common:jar + org.glassfish.jersey.core:jersey-server:jar + org.glassfish.jersey.ext:jersey-entity-filtering:jar + org.glassfish.jersey.ext:jersey-spring3:jar + org.glassfish.jersey.media:jersey-media-jaxb:jar + org.glassfish.jersey.media:jersey-media-json-jackson:jar + org.glassfish.jersey.media:jersey-media-multipart:jar + org.hdrhistogram:HdrHistogram:jar + org.javassist:javassist:jar + org.json4s:json4s-ast_2.11:jar + org.json4s:json4s-core_2.11:jar + org.json4s:json4s-jackson_2.11:jar + org.jsoup:jsoup:jar + org.jvnet.mimepull:mimepull:jar + org.jvnet:tiger-types:jar + org.latencyutils:LatencyUtils:jar + org.mortbay.jasper:apache-el:jar + org.mortbay.jetty:jetty:jar + org.mortbay.jetty:jetty-util:jar + org.ow2.asm:asm-analysis:jar + org.ow2.asm:asm-commons:jar + org.ow2.asm:asm-tree:jar + org.reflections:reflections:jar + org.scala-lang.modules:scala-parser-combinators_2.11:jar + org.scala-lang.modules:scala-xml_2.11:jar + org.scala-lang:scala-compiler:jar + org.scala-lang:scala-library:jar + org.scala-lang:scala-reflect:jar + org.scala-lang:scalap:jar + org.slf4j:jul-to-slf4j:jar + org.slf4j:slf4j-api:jar + org.springframework.boot:spring-boot:jar + org.springframework.boot:spring-boot-actuator:jar + org.springframework.boot:spring-boot-actuator-autoconfigure:jar + org.springframework.boot:spring-boot-autoconfigure:jar + org.springframework.boot:spring-boot-starter:jar + org.springframework.boot:spring-boot-starter-actuator:jar + org.springframework.boot:spring-boot-starter-aop:jar + org.springframework.boot:spring-boot-starter-jetty:jar + org.springframework.boot:spring-boot-starter-json:jar + org.springframework.boot:spring-boot-starter-log4j2:jar + org.springframework.boot:spring-boot-starter-web:jar + org.springframework.cloud:spring-cloud-commons:jar + org.springframework.cloud:spring-cloud-config-client:jar + org.springframework.cloud:spring-cloud-context:jar + org.springframework.cloud:spring-cloud-netflix-archaius:jar + org.springframework.cloud:spring-cloud-netflix-core:jar + org.springframework.cloud:spring-cloud-netflix-eureka-client:jar + org.springframework.cloud:spring-cloud-netflix-ribbon:jar + org.springframework.cloud:spring-cloud-starter:jar + org.springframework.cloud:spring-cloud-starter-config:jar + org.springframework.cloud:spring-cloud-starter-eureka:jar + org.springframework.cloud:spring-cloud-starter-netflix-archaius:jar + org.springframework.cloud:spring-cloud-starter-netflix-eureka-client:jar + org.springframework.cloud:spring-cloud-starter-netflix-ribbon:jar + org.springframework.security:spring-security-crypto:jar + org.springframework.security:spring-security-rsa:jar + org.springframework:spring-aop:jar + org.springframework:spring-beans:jar + org.springframework:spring-context:jar + org.springframework:spring-core:jar + org.springframework:spring-expression:jar + org.springframework:spring-jcl:jar + org.springframework:spring-web:jar + org.springframework:spring-webmvc:jar + org.tukaani:xz:jar + org.yaml:snakeyaml:jar + software.amazon.ion:ion-java:jar + xerces:xercesImpl:jar + xmlenc:xmlenc:jar + xmlpull:xmlpull:jar + xpp3:xpp3_min:jar + + + + + + + ${basedir}/../entrance/src/main/resources + + * + + 0777 + conf + unix + + + ${basedir}/bin + + * + + 0777 + bin + unix + + + . + + */** + + logs + + + + \ No newline at end of file diff --git a/ujes/definedEngines/jdbc/entrance/src/main/java/com/webank/wedatasphere/linkis/entrance/conf/executer/ConnectionManager.java b/ujes/definedEngines/jdbc/entrance/src/main/java/com/webank/wedatasphere/linkis/entrance/conf/executer/ConnectionManager.java new file mode 100644 index 0000000000..793f4c54d9 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/java/com/webank/wedatasphere/linkis/entrance/conf/executer/ConnectionManager.java @@ -0,0 +1,198 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.conf.executer; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.commons.dbcp.BasicDataSourceFactory; +import org.apache.commons.lang.StringUtils; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionManager { + + Logger logger = LoggerFactory.getLogger(ConnectionManager.class); + + private final Map databaseToDataSources = new HashMap(); + + private final Map supportedDBs = new HashMap(); + private final List supportedDBNames = new ArrayList(); + + private volatile static ConnectionManager connectionManager; + private ConnectionManager(){ + } + public static ConnectionManager getInstance(){ + if (connectionManager== null) { + synchronized (ConnectionManager.class) { + if (connectionManager== null) { + connectionManager= new ConnectionManager(); + } + } + } + return connectionManager; + } + + { + String supportedDBString ="mysql=>com.mysql.jdbc.Driver,postgresql=>org.postgresql.Driver," + + "oracle=>oracle.jdbc.driver.OracleDriver,hive2=>org.apache.hive.jdbc.HiveDriver"; + String[] supportedDBs = supportedDBString.split(","); + for (String supportedDB : supportedDBs) { + String[] supportedDBInfo = supportedDB.split("=>"); + if(supportedDBInfo.length != 2) { + throw new IllegalArgumentException("Illegal driver info " + supportedDB); + } + try { + Class.forName(supportedDBInfo[1]); + } catch (ClassNotFoundException e) { + logger.info("Load " + supportedDBInfo[0] + " driver failed",e); + } + supportedDBNames.add(supportedDBInfo[0]); + this.supportedDBs.put(supportedDBInfo[0], supportedDBInfo[1]); + } + } + + private void validateURL(String url) { + if(StringUtils.isEmpty(url)) { + throw new NullPointerException("jdbc.url cannot be null."); + } + if(!url.matches("jdbc:\\w+://\\S+:[0-9]{2,6}(/\\S*)?")) { + throw new IllegalArgumentException("Unknown jdbc.url " + url); + } + for (String supportedDBName: supportedDBNames) { + if(url.indexOf(supportedDBName) > 0) { + return; + } + } + throw new IllegalArgumentException("Illegal url or not supported url type (url: " + url + ")."); + } + + private final Pattern pattern = Pattern.compile("^(jdbc:\\w+://\\S+:[0-9]+)\\s*"); + private String getRealURL(String url) { + Matcher matcher = pattern.matcher(url.trim()); + matcher.find(); + return matcher.group(1); + } + + + protected DataSource createDataSources(HashMap properties) throws SQLException { + String url = properties.get("jdbc.url"); + String username = properties.get("jdbc.username").trim(); + String password = StringUtils.trim(properties.get("jdbc.password")); + validateURL(url); + int index = url.indexOf(":") + 1; + String dbType = url.substring(index, url.indexOf(":", index)); + System.out.println(String.format("Try to Create a new %s JDBC DBCP with url(%s), username(%s), password(%s).", dbType, url, username, password)); + Properties props = new Properties(); + props.put("driverClassName", supportedDBs.get(dbType)); + props.put("url", url.trim()); + props.put("username", username); + props.put("password", password); + props.put("maxIdle", 5); + props.put("minIdle", 0); + props.put("maxActive", 20); + props.put("initialSize", 1); + props.put("testOnBorrow", false); + props.put("testWhileIdle", true); + props.put("validationQuery", "select 1"); + props.put("initialSize", 1); + BasicDataSource dataSource; + try { + dataSource = (BasicDataSource) BasicDataSourceFactory.createDataSource(props); + } catch (Exception e) { + throw new SQLException(e); + } +// ComboPooledDataSource dataSource = new ComboPooledDataSource(); +// dataSource.setUser(username); +// dataSource.setPassword(password); +// dataSource.setJdbcUrl(url.trim()); +// try { +// dataSource.setDriverClass(supportedDBs.get(dbType)); +// } catch (PropertyVetoException e) { +// throw new SQLException(e); +// } +// dataSource.setInitialPoolSize(1); +// dataSource.setMinPoolSize(0); +// dataSource.setMaxPoolSize(20); +// dataSource.setMaxStatements(40); +// dataSource.setMaxIdleTime(60); + return dataSource; + } + + public Connection getConnection(HashMap properties) throws SQLException { + String url = properties.get("jdbc.url"); + if(StringUtils.isEmpty(properties.get("jdbc.username"))) { + throw new NullPointerException("jdbc.username cannot be null."); + } + url = getRealURL(url); + //这里通过URL+username识别 + String key = url + "/" + properties.get("jdbc.username").trim(); + DataSource dataSource = databaseToDataSources.get(key); + if(dataSource == null) { + synchronized (databaseToDataSources) { + if(dataSource == null) { + dataSource = createDataSources(properties); + databaseToDataSources.put(key, dataSource); + } + } + } + return dataSource.getConnection(); + } + + public void close() { + for (DataSource dataSource: this.databaseToDataSources.values()) { + try { +// DataSources.destroy(dataSource); + ((BasicDataSource)dataSource).close(); + } catch (SQLException e) {} + } + } + + public static void main(String[] args) throws Exception { +// Pattern pattern = Pattern.compile("^(jdbc:\\w+://\\S+:[0-9]+)\\s*"); + String url = "jdbc:mysql://xxx.xxx.xxx.xxx:8504/xx?useUnicode=true&characterEncoding=UTF-8&createDatabaseIfNotExist=true"; + Properties properties = new Properties(); + properties.put("driverClassName", "org.apache.hive.jdbc.HiveDriver"); + properties.put("url", "jdbc:hive2://xxx.xxx.xxx.xxx:10000/"); + properties.put("username", "username"); + properties.put("password", "*****"); + properties.put("maxIdle", 20); + properties.put("minIdle", 0); + properties.put("initialSize", 1); + properties.put("testOnBorrow", false); + properties.put("testWhileIdle", true); + properties.put("validationQuery", "select 1"); + properties.put("initialSize", 1); + BasicDataSource dataSource = (BasicDataSource) BasicDataSourceFactory.createDataSource(properties); + Connection conn = dataSource.getConnection(); + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("show tables"); + while(rs.next()) { + System.out.println(rs.getObject(1)); + } + rs.close(); + statement.close(); + conn.close(); + dataSource.close(); + } +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/resources/application.yml b/ujes/definedEngines/jdbc/entrance/src/main/resources/application.yml new file mode 100644 index 0000000000..19637260cb --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/resources/application.yml @@ -0,0 +1,23 @@ +server: + port: 18975 +spring: + application: + name: jdbcEntrance + + +eureka: + client: + serviceUrl: + defaultZone: http://xxx.xxx.xxx.xxx:20303/eureka/ + instance: + metadata-map: + test: chaogefeng + +management: + endpoints: + web: + exposure: + include: refresh,info +logging: + config: classpath:log4j2.xml + diff --git a/ujes/definedEngines/jdbc/entrance/src/main/resources/linkis.properties b/ujes/definedEngines/jdbc/entrance/src/main/resources/linkis.properties new file mode 100644 index 0000000000..290d56dcb7 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/resources/linkis.properties @@ -0,0 +1,32 @@ +# +# Copyright 2019 WeBank +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +wds.linkis.server.restful.scan.packages=com.webank.wedatasphere.linkis.entrance.restful + +wds.linkis.engine.application.name=jdbcEngine +wds.linkis.enginemanager.application.name=jdbc + +wds.linkis.query.application.name=cloud-publicservice + +wds.linkis.console.config.application.name=cloud-publicservice +wds.linkis.engine.creation.wait.time.max=20m +wds.linkis.server.version=v1 +#hadoop.config.dir=/appcom/config/hadoop-config +wds.linkis.entrance.config.logPath=file:///tmp/linkis/ + +wds.linkis.resultSet.store.path=file:///tmp/linkis + +wds.linkis.server.socket.mode=true diff --git a/ujes/definedEngines/jdbc/entrance/src/main/resources/log4j.properties b/ujes/definedEngines/jdbc/entrance/src/main/resources/log4j.properties new file mode 100644 index 0000000000..178f8dfa26 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/resources/log4j.properties @@ -0,0 +1,36 @@ +# +# Copyright 2019 WeBank +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +### set log levels ### + +log4j.rootCategory=INFO,console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=INFO +log4j.appender.console.layout=org.apache.log4j.PatternLayout +#log4j.appender.console.layout.ConversionPattern= %d{ISO8601} %-5p (%t) [%F:%M(%L)] - %m%n +log4j.appender.console.layout.ConversionPattern= %d{ISO8601} %-5p (%t) %p %c{1} - %m%n + + +log4j.appender.com.webank.bdp.ide.core=org.apache.log4j.DailyRollingFileAppender +log4j.appender.com.webank.bdp.ide.core.Threshold=INFO +log4j.additivity.com.webank.bdp.ide.core=false +log4j.appender.com.webank.bdp.ide.core.layout=org.apache.log4j.PatternLayout +log4j.appender.com.webank.bdp.ide.core.Append=true +log4j.appender.com.webank.bdp.ide.core.File=logs/linkis.log +log4j.appender.com.webank.bdp.ide.core.layout.ConversionPattern= %d{ISO8601} %-5p (%t) [%F:%M(%L)] - %m%n + +log4j.logger.org.springframework=INFO \ No newline at end of file diff --git a/ujes/definedEngines/jdbc/entrance/src/main/resources/log4j2.xml b/ujes/definedEngines/jdbc/entrance/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..c098c58e49 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/resources/log4j2.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCConfiguration.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCConfiguration.scala new file mode 100644 index 0000000000..3cb26e2eb0 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCConfiguration.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.conf + +import com.webank.wedatasphere.linkis.common.conf.{ByteType, CommonVars} + +object JDBCConfiguration { + val ENGINE_RESULT_SET_MAX_CACHE = CommonVars("wds.linkis.resultSet.cache.max", new ByteType("512k")) + val ENGINE_DEFAULT_LIMIT = CommonVars("wds.linkis.jdbc.default.limit", 5000) + val JDBC_QUERY_TIMEOUT = CommonVars("wds.linkis.jdbc.query.timeout", 1800) +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCSpringConfiguration.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCSpringConfiguration.scala new file mode 100644 index 0000000000..6922157fd5 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/conf/JDBCSpringConfiguration.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.conf + +import com.webank.wedatasphere.linkis.entrance.EntranceParser +import com.webank.wedatasphere.linkis.entrance.annotation._ +import com.webank.wedatasphere.linkis.entrance.execute._ +import com.webank.wedatasphere.linkis.entrance.executer.JDBCEngineExecutorManagerImpl +import com.webank.wedatasphere.linkis.entrance.parser.JDBCEntranceParser +import com.webank.wedatasphere.linkis.scheduler.queue.GroupFactory +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.annotation.Configuration + +@Configuration +class JDBCSpringConfiguration { + + + private val logger = LoggerFactory.getLogger(classOf[JDBCSpringConfiguration]) + + @EntranceExecutorManagerBeanAnnotation + @ConditionalOnMissingBean(value = Array(classOf[EntranceExecutorManager])) + def generateEntranceExecutorManager(@GroupFactoryBeanAnnotation.GroupFactoryAutowiredAnnotation groupFactory: GroupFactory, + @EngineBuilderBeanAnnotation.EngineBuilderAutowiredAnnotation engineBuilder: EngineBuilder, + @EngineRequesterBeanAnnotation.EngineRequesterAutowiredAnnotation engineRequester: EngineRequester, + @EngineSelectorBeanAnnotation.EngineSelectorAutowiredAnnotation engineSelector: EngineSelector, + @EngineManagerBeanAnnotation.EngineManagerAutowiredAnnotation engineManager: EngineManager, + @Autowired entranceExecutorRulers: Array[EntranceExecutorRuler]): EntranceExecutorManager = + new JDBCEngineExecutorManagerImpl(groupFactory, engineBuilder, engineRequester, engineSelector, engineManager, entranceExecutorRulers) + + + + + @EntranceParserBeanAnnotation + @ConditionalOnMissingBean(name = Array(EntranceParserBeanAnnotation.BEAN_NAME)) + def generateEntranceParser():EntranceParser = { + logger.info("begin to get JDBC Entrance parser") + new JDBCEntranceParser() + } + +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/exception/JDBCParamsIllegalException.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/exception/JDBCParamsIllegalException.scala new file mode 100644 index 0000000000..d26434535f --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/exception/JDBCParamsIllegalException.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.exception + +import com.webank.wedatasphere.linkis.common.exception.ErrorException + +case class JDBCParamsIllegalException(errorMsg: String) extends ErrorException(70012, errorMsg) + +case class JDBCSQLFeatureNotSupportedException(errorMsg: String) extends ErrorException(70013, errorMsg) + +case class JDBCStateMentNotInitialException(errorMsg: String) extends ErrorException(70014, errorMsg) \ No newline at end of file diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCEntranceJob.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCEntranceJob.scala new file mode 100644 index 0000000000..2d606fff25 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCEntranceJob.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.execute + +import com.webank.wedatasphere.linkis.common.utils.Utils +import com.webank.wedatasphere.linkis.entrance.job.EntranceExecutionJob +import com.webank.wedatasphere.linkis.protocol.query.RequestPersistTask +import com.webank.wedatasphere.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse, ExecuteRequest} +import com.webank.wedatasphere.linkis.scheduler.queue.Job +import com.webank.wedatasphere.linkis.scheduler.queue.SchedulerEventState.Running + +class JDBCEntranceJob extends EntranceExecutionJob{ + + override def jobToExecuteRequest(): ExecuteRequest = { + new ExecuteRequest with StorePathExecuteRequest with JDBCJobExecuteRequest { + override val code: String = JDBCEntranceJob.this.getTask match{ + case requestPersistTask:RequestPersistTask => requestPersistTask.getExecutionCode + case _ => null + } + override val storePath: String = JDBCEntranceJob.this.getTask match{ + case requestPersistTask:RequestPersistTask => requestPersistTask.getResultLocation + case _ => "" + } + override val job: Job = JDBCEntranceJob.this + } + } + + //use executor execute jdbc code (使用executor执行jdbc脚本代码) + override def run(): Unit = { + if(!isScheduled) return + startTime = System.currentTimeMillis + Utils.tryAndWarn(transition(Running)) + + val executeResponse = Utils.tryCatch(getExecutor.execute(jobToExecuteRequest())){ + case t: InterruptedException => + warn(s"job $toString is interrupted by user!", t) + ErrorExecuteResponse("job is interrupted by user!", t) + case t:ErrorExecuteResponse => + warn(s"execute job $toString failed!", t) + ErrorExecuteResponse("execute job failed!", t) + } + executeResponse match { + case r: CompletedExecuteResponse => + transitionCompleted(r) + case _ => logger.error("not completed") + } + } +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCJobExecuteRequest.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCJobExecuteRequest.scala new file mode 100644 index 0000000000..7ae449635f --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/execute/JDBCJobExecuteRequest.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.execute + +import com.webank.wedatasphere.linkis.scheduler.queue.Job + +trait JDBCJobExecuteRequest { + val job:Job +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutor.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutor.scala new file mode 100644 index 0000000000..c53b2691a7 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutor.scala @@ -0,0 +1,212 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.executer + +import java.sql.{SQLFeatureNotSupportedException, Statement} +import java.util + +import com.webank.wedatasphere.linkis.common.io.FsPath +import com.webank.wedatasphere.linkis.common.utils.Utils +import com.webank.wedatasphere.linkis.entrance.conf.JDBCConfiguration.ENGINE_RESULT_SET_MAX_CACHE +import com.webank.wedatasphere.linkis.entrance.conf.executer.ConnectionManager +import com.webank.wedatasphere.linkis.entrance.exception.{JDBCSQLFeatureNotSupportedException, JDBCStateMentNotInitialException} +import com.webank.wedatasphere.linkis.entrance.execute.{EngineExecuteAsynReturn, EntranceEngine, JDBCJobExecuteRequest, StorePathExecuteRequest} +import com.webank.wedatasphere.linkis.entrance.persistence.EntranceResultSetEngine +import com.webank.wedatasphere.linkis.protocol.engine.{JobProgressInfo, RequestTask} +import com.webank.wedatasphere.linkis.rpc.Sender +import com.webank.wedatasphere.linkis.scheduler.executer._ +import com.webank.wedatasphere.linkis.storage.domain.{Column, DataType} +import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, TableRecord} +import com.webank.wedatasphere.linkis.storage.resultset.{ResultSetFactory, ResultSetWriter} +import org.apache.commons.lang.StringUtils +import org.slf4j.LoggerFactory + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +class JDBCEngineExecutor(outputPrintLimit: Int, properties: util.HashMap[String, String]) + extends EntranceEngine(id = 0) with SingleTaskOperateSupport with SingleTaskInfoSupport { + + private val LOG = LoggerFactory.getLogger(getClass) + private val connectionManager = ConnectionManager.getInstance() + private var statement: Statement = null + private val name: String = Sender.getThisServiceInstance.getInstance + private val persistEngine = new EntranceResultSetEngine() + //execute line number,as alias and progress line + private var codeLine = 0 + //total line number + private var totalCodeLineNumber = 0 + + protected def executeLine(code: String, storePath: String, alias: String): ExecuteResponse = { + val realCode = code.trim() + LOG.info(s"jdbc client begins to run jdbc code:\n ${realCode.trim}") + val connection = connectionManager.getConnection(properties) + statement = connection.createStatement() + LOG.info(s"create statement is: $statement") + val isResultSetAvailable = statement.execute(code) + LOG.info(s"Is ResultSet available ? : $isResultSetAvailable") + if(isResultSetAvailable){ + LOG.info("ResultSet is available") + val JDBCResultSet = statement.getResultSet + if(isDDLCommand(statement.getUpdateCount(),JDBCResultSet.getMetaData().getColumnCount)){ + LOG.info(s"current result is a ResultSet Object , but there are no more results :${code} ") + Utils.tryQuietly { + JDBCResultSet.close() + statement.close() + connection.close() + } + return SuccessExecuteResponse() + }else{ + val md = JDBCResultSet.getMetaData + val metaArrayBuffer = new ArrayBuffer[Tuple2[String, String]]() + for (i <- 1 to md.getColumnCount) { + metaArrayBuffer.add(Tuple2(md.getColumnName(i), JDBCHelper.getTypeStr(md.getColumnType(i)))) + } + val columns = metaArrayBuffer.map { c => Column(c._1, DataType.toDataType(c._2), "") }.toArray[Column] + val metaData = new TableMetaData(columns) + val resultSet = ResultSetFactory.getInstance.getResultSetByType(ResultSetFactory.TABLE_TYPE) + val resultSetPath = resultSet.getResultSetPath(new FsPath(storePath), alias) + val resultSetWriter = ResultSetWriter.getResultSetWriter(resultSet, ENGINE_RESULT_SET_MAX_CACHE.getValue.toLong, resultSetPath) + resultSetWriter.addMetaData(metaData) + var count = 0 + Utils.tryCatch({ + while (count < outputPrintLimit && JDBCResultSet.next()) { + val r: Array[Any] = columns.indices.map { i => + val data = JDBCResultSet.getObject(i + 1) match { + case value: Any => value.toString + case _ => null + } + data + }.toArray + resultSetWriter.addRecord(new TableRecord(r)) + count += 1 + } + }) { + case e: Exception => return ErrorExecuteResponse("query jdbc failed", e) + } + val output = if (resultSetWriter != null) resultSetWriter.toString else null + Utils.tryQuietly { + JDBCResultSet.close() + statement.close() + connection.close() + } + LOG.info("sql execute completed") + AliasOutputExecuteResponse(alias, output) + } + }else{ + LOG.info(s"only return affect rows : ${statement.getUpdateCount}") + Utils.tryQuietly{ + statement.close() + connection.close() + } + return SuccessExecuteResponse() + } + } + + + override protected def callExecute(request: RequestTask): EngineExecuteAsynReturn = null + + override def progress(): Float = { + if (totalCodeLineNumber != 0){ + return (codeLine/totalCodeLineNumber.asInstanceOf[Float]) + }else{ + return 0.0f + } + } + + override def getProgressInfo: Array[JobProgressInfo] = Array.empty[JobProgressInfo] + + def getName: String = name + + + override def kill(): Boolean = { + if (statement != null) { + try { + statement.cancel() + } catch { + case e: SQLFeatureNotSupportedException => + throw new JDBCSQLFeatureNotSupportedException("unsupport sql feature ") + case _ => + throw new JDBCStateMentNotInitialException("jdbc statement not initial") + } + } + true + } + + override def pause(): Boolean = ??? + + override def resume(): Boolean = ??? + + override def log(): String = "JDBC Engine is running" + + override protected def callback(): Unit = {} + + override def close(): Unit = { + } + + protected def isDDLCommand(updatedCount: Int, columnCount: Int): Boolean = { + if (updatedCount<0 && columnCount<=0){ + return true + }else{ + return false + } + } + override def execute(executeRequest: ExecuteRequest): ExecuteResponse = { + if (StringUtils.isEmpty(executeRequest.code)) { + return IncompleteExecuteResponse("execute codes can not be empty)") + } + + val storePath = executeRequest match { + case storePathExecuteRequest: StorePathExecuteRequest => storePathExecuteRequest.storePath + case _ => "" + } + + val codes = JDBCSQLCodeParser.parse(executeRequest.code) + + if (!codes.isEmpty) { + totalCodeLineNumber = codes.length + codeLine = 0 + codes.foreach { code => + try { + val executeRes = executeLine(code, storePath, codeLine.toString) + executeRes match { + case aliasOutputExecuteResponse: AliasOutputExecuteResponse => + persistEngine.persistResultSet(executeRequest.asInstanceOf[JDBCJobExecuteRequest].job, aliasOutputExecuteResponse) + case SuccessExecuteResponse() => + LOG.info(s"sql execute successfully : ${code}") + case IncompleteExecuteResponse(_) => + LOG.error(s"sql execute failed : ${code}") + case _ => + LOG.warn("no matching exception") + } + codeLine = codeLine + 1 + } catch { + case e: Exception => + totalCodeLineNumber=0 + LOG.error("JDBC exception", e) + return ErrorExecuteResponse("JDBC exception", e) + case t: Throwable => + totalCodeLineNumber=0 + logger.error("JDBC query failed", t) + return ErrorExecuteResponse("JDBC query failed", t) + } + } + } + totalCodeLineNumber=0 + SuccessExecuteResponse() + } +} + diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutorManagerImpl.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutorManagerImpl.scala new file mode 100644 index 0000000000..64e6248f71 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCEngineExecutorManagerImpl.scala @@ -0,0 +1,107 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.executer + +import java.util + +import com.webank.wedatasphere.linkis.common.utils.Logging +import com.webank.wedatasphere.linkis.entrance.exception.JDBCParamsIllegalException +import com.webank.wedatasphere.linkis.entrance.execute._ +import com.webank.wedatasphere.linkis.entrance.execute.impl.EntranceExecutorManagerImpl +import com.webank.wedatasphere.linkis.scheduler.executer.Executor +import com.webank.wedatasphere.linkis.scheduler.listener.ExecutorListener +import com.webank.wedatasphere.linkis.scheduler.queue.{GroupFactory, Job, SchedulerEvent} +import org.apache.commons.lang.StringUtils + +import scala.concurrent.duration.Duration + + +class JDBCEngineExecutorManagerImpl(groupFactory: GroupFactory, + engineBuilder: EngineBuilder, + engineRequester: EngineRequester, + engineSelector: EngineSelector, + engineManager: EngineManager, + entranceExecutorRulers: Array[EntranceExecutorRuler]) + extends EntranceExecutorManagerImpl(groupFactory,engineBuilder, engineRequester, + engineSelector, engineManager, entranceExecutorRulers) with Logging{ + private val JDBCEngineExecutor = new util.HashMap[String, JDBCEngineExecutor]() + logger.info("JDBC EngineManager Registered") + override protected def createExecutor(event: SchedulerEvent): EntranceEngine = event match { + case job: JDBCEntranceJob => + val JDBCParams = new util.HashMap[String, String]() + val params = job.getParams + val url = if (params.get("jdbc.url") != null) params.get("jdbc.url").toString + else throw JDBCParamsIllegalException("jdbc url is null") + val username = if (params.get("jdbc.username") != null) params.get("jdbc.username").toString + else throw JDBCParamsIllegalException("jdbc username is null") + val password = if (params.get("jdbc.password") != null) params.get("jdbc.password").toString + else throw JDBCParamsIllegalException("jdbc password is null") + JDBCParams.put("jdbc.url",url) + JDBCParams.put("jdbc.username",username) + JDBCParams.put("jdbc.password",password) + if (!StringUtils.isEmpty(url) && !StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) { + JDBCEngineExecutor.put(url + ":" + username + ":" + password, new JDBCEngineExecutor(5000, JDBCParams)) + new JDBCEngineExecutor(5000, JDBCParams) + }else { + logger.error(s"jdbc url is $url, jdbc username is $username") + throw JDBCParamsIllegalException("jdbc url or username or password may be null at least") + } + } + + override def setExecutorListener(executorListener: ExecutorListener): Unit = ??? + + override def askExecutor(event: SchedulerEvent): Option[Executor] = event match{ + case job:JDBCEntranceJob => + findUsefulExecutor(job).orElse(Some(createExecutor(event))) + case _ => None + } + + + override def askExecutor(event: SchedulerEvent, wait: Duration): Option[Executor] = event match { + case job:JDBCEntranceJob => + findUsefulExecutor(job).orElse(Some(createExecutor(event))) + case _ => None + } + + + private def findUsefulExecutor(job: Job): Option[Executor] = job match{ + case job:JDBCEntranceJob => + val params = job.getParams + val url = if (params.get("jdbc.url") != null) params.get("jdbc.url").toString + else throw JDBCParamsIllegalException("jdbc url is null") + val username = if (params.get("jdbc.username") != null) params.get("jdbc.username").toString + else throw JDBCParamsIllegalException("jdbc username is null") + val password = if (params.get("jdbc.password") != null) params.get("jdbc.password").toString + else throw JDBCParamsIllegalException("jdbc password is null") + val key = url + ":" + username + ":" + password + if (JDBCEngineExecutor.containsKey(key)){ + Some(JDBCEngineExecutor.get(key)) + }else{ + None + } + } + + + + override def getById(id: Long): Option[Executor] = ??? + + override def getByGroup(groupName: String): Array[Executor] = ??? + + override protected def delete(executor: Executor): Unit = ??? + + override def shutdown(): Unit = ??? + +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCHelper.java b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCHelper.java new file mode 100644 index 0000000000..ab44d63c28 --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCHelper.java @@ -0,0 +1,166 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.executer; + +import com.webank.wedatasphere.linkis.storage.domain.*; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; + +public class JDBCHelper { + protected String helper(ResultSet rs, int dataType, int col) + throws SQLException { + String retVal = null; + Integer intObj; + switch (dataType) { + case Types.DATE: + java.sql.Date date = rs.getDate(col); + retVal = date.toString(); + break; + case Types.TIME: + java.sql.Time time = rs.getTime(col); + retVal = time.toString(); + break; + case Types.TIMESTAMP: + java.sql.Timestamp timestamp = rs.getTimestamp(col); + retVal = timestamp.toString(); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + retVal = rs.getString(col); + break; + case Types.NUMERIC: + case Types.DECIMAL: + java.math.BigDecimal numeric = rs.getBigDecimal(col, 10); + retVal = numeric.toString(); + break; + case Types.BIT: + boolean bit = rs.getBoolean(col); + Boolean boolObj = new Boolean(bit); + retVal = boolObj.toString(); + break; + case Types.TINYINT: + byte tinyint = rs.getByte(col); + intObj = new Integer(tinyint); + retVal = intObj.toString(); + break; + case Types.SMALLINT: + short smallint = rs.getShort(col); + intObj = new Integer(smallint); + retVal = intObj.toString(); + break; + case Types.INTEGER: + int integer = rs.getInt(col); + intObj = new Integer(integer); + retVal = intObj.toString(); + break; + case Types.BIGINT: + long bigint = rs.getLong(col); + Long longObj = new Long(bigint); + retVal = longObj.toString(); + break; + case Types.REAL: + float real = rs.getFloat(col); + Float floatObj = new Float(real); + retVal = floatObj.toString(); + break; + case Types.FLOAT: + case Types.DOUBLE: + double longreal = rs.getDouble(col); + Double doubleObj = new Double(longreal); + retVal = doubleObj.toString(); + break; + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + byte[] binary = rs.getBytes(col); + retVal = new String(binary); + break; + default: + break; + } + return retVal; + } + + public static String getTypeStr(int type) { + String retVal = null; + Integer intObj; + switch (type) { + case Types.NULL: + retVal = NullType.typeName(); + break; + case Types.VARCHAR: + retVal = StringType.typeName(); + break; + case Types.BOOLEAN: + retVal = BooleanType.typeName(); + break; + case Types.TINYINT: + retVal = TinyIntType.typeName(); + break; + case Types.SMALLINT: + retVal = ShortIntType.typeName(); + break; + case Types.INTEGER: + retVal = IntType.typeName(); + break; + case Types.LONGNVARCHAR: + retVal = LongType.typeName(); + break; + case Types.LONGVARCHAR: + retVal = StringType.typeName(); + break; + case Types.FLOAT: + retVal = FloatType.typeName(); + break; + case Types.DOUBLE: + retVal = DoubleType.typeName(); + break; + case Types.CHAR: + retVal = CharType.typeName(); + break; + case Types.DATE: + retVal = DateType.typeName(); + break; + case Types.TIMESTAMP: + retVal = TimestampType.typeName(); + break; + case Types.BINARY: + retVal = BinaryType.typeName(); + break; + case Types.DECIMAL: + retVal = DecimalType.typeName(); + break; + case Types.ARRAY: + retVal = ArrayType.typeName(); + break; + case Types.STRUCT: + retVal = StructType.typeName(); + break; + case Types.BIGINT: + retVal = LongType.typeName(); + break; + case Types.REAL: + retVal = DoubleType.typeName(); + break; + default: + break; + } + return retVal; + } +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCSQLCodeParser.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCSQLCodeParser.scala new file mode 100644 index 0000000000..08ef65213c --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/executer/JDBCSQLCodeParser.scala @@ -0,0 +1,68 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.executer + +import com.webank.wedatasphere.linkis.entrance.conf.JDBCConfiguration +import org.apache.commons.lang.StringUtils + +import scala.collection.mutable.ArrayBuffer + +object JDBCSQLCodeParser { + + val separator = ";" + val defaultLimit: Int = JDBCConfiguration.ENGINE_DEFAULT_LIMIT.getValue + + def parse(code: String): Array[String] = { + val codeBuffer = new ArrayBuffer[String]() + + def appendStatement(sqlStatement: String): Unit = { + codeBuffer.append(sqlStatement) + } + + if (StringUtils.contains(code, separator)) { + StringUtils.split(code, ";").foreach { + case s if StringUtils.isBlank(s) => + case s if isSelectCmdNoLimit(s) => appendStatement(s + " limit " + defaultLimit); + case s => appendStatement(s); + } + } else { + code match { + case s if StringUtils.isBlank(s) => + case s if isSelectCmdNoLimit(s) => appendStatement(s + " limit " + defaultLimit); + case s => appendStatement(s); + } + } + codeBuffer.toArray + } + + def isSelectCmdNoLimit(cmd: String): Boolean = { + var code = cmd.trim + if (!cmd.split("\\s+")(0).equalsIgnoreCase("select")) return false + if (code.contains("limit")) code = code.substring(code.lastIndexOf("limit")).trim + else if (code.contains("LIMIT")) code = code.substring(code.lastIndexOf("LIMIT")).trim.toLowerCase + else return true + val hasLimit = code.matches("limit\\s+\\d+\\s*;?") + if (hasLimit) { + if (code.indexOf(";") > 0) code = code.substring(5, code.length - 1).trim + else code = code.substring(5).trim + val limitNum = code.toInt + if (limitNum > defaultLimit) throw new IllegalArgumentException("We at most allowed to limit " + defaultLimit + ", but your SQL has been over the max rows.") + } + !hasLimit + } + + +} diff --git a/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/parser/JDBCEntranceParser.scala b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/parser/JDBCEntranceParser.scala new file mode 100644 index 0000000000..300df71acd --- /dev/null +++ b/ujes/definedEngines/jdbc/entrance/src/main/scala/com/webank/wedatasphere/linkis/entrance/parser/JDBCEntranceParser.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2019 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.webank.wedatasphere.linkis.entrance.parser + +import java.util + +import com.webank.wedatasphere.linkis.common.utils.Logging +import com.webank.wedatasphere.linkis.entrance.execute.JDBCEntranceJob +import com.webank.wedatasphere.linkis.protocol.query.RequestPersistTask +import com.webank.wedatasphere.linkis.protocol.task.Task +import com.webank.wedatasphere.linkis.scheduler.queue.Job + +class JDBCEntranceParser extends CommonEntranceParser with Logging { + logger.info("JDBC EntranceParser Registered") + + /** + * Parse a task into an executable job(将一个task解析成一个可执行的job) + * + * @param task + * @return + */ + override def parseToJob(task: Task): Job = { + val job = new JDBCEntranceJob + task match { + case requestPersistTask: RequestPersistTask => + job.setTask(task) + job.setUser(requestPersistTask.getUmUser) + job.setCreator(requestPersistTask.getRequestApplicationName) + job.setParams(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]]) + job.setEntranceListenerBus(getEntranceContext.getOrCreateEventListenerBus) + job.setListenerEventBus(null) + job.setProgress(0f) + } + job + } +}