Skip to content

Commit

Permalink
Merge pull request #163 from yangzhiyue/master
Browse files Browse the repository at this point in the history
close #162, I contributed the bml-engine-hook
  • Loading branch information
peacewong authored Nov 29, 2019
2 parents cc4f8ff + c23d63c commit da85420
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 1 deletion.
58 changes: 58 additions & 0 deletions bml/bml-engine-hook/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>linkis</artifactId>
<groupId>com.webank.wedatasphere.linkis</groupId>
<version>0.9.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>linkis-bml-hook</artifactId>
<version>${linkis.version}</version>

<dependencies>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-bmlclient</artifactId>
<version>${linkis.version}</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-bmlcommon</artifactId>
<version>${linkis.version}</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-ujes-engine</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
</build>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.webank.wedatasphere.linkis.bml.conf

import com.webank.wedatasphere.linkis.common.conf.CommonVars

/**
* created by cooperyang on 2019/9/23
* Description:
*/
object BmlHookConf {
val WORK_DIR_STR = CommonVars("wds.linkis.bml.work.dir", "user.dir")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.webank.wedatasphere.linkis.bml.exception

import com.webank.wedatasphere.linkis.common.exception.ErrorException

/**
* created by cooperyang on 2019/9/25
* Description:
*/
case class BmlHookDownloadException(errMsg:String) extends ErrorException(50046, errMsg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.webank.wedatasphere.linkis.bml.hook

import java.io.File
import java.util

import com.webank.wedatasphere.linkis.bml.client.{BmlClient, BmlClientFactory}
import com.webank.wedatasphere.linkis.bml.exception.BmlHookDownloadException
import com.webank.wedatasphere.linkis.bml.utils.BmlHookUtils
import com.webank.wedatasphere.linkis.common.exception.ErrorException
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engine.ResourceExecuteRequest
import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook
import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteRequest
import org.apache.commons.lang.StringUtils

import scala.collection.JavaConversions._
/**
* created by cooperyang on 2019/9/23
* Description:
*/
class BmlEnginePreExecuteHook extends EnginePreExecuteHook with Logging{
override val hookName: String = "BmlEnginePreExecuteHook"

val RESOURCES_STR = "resources"

val RESOURCE_ID_STR = "resourceId"

val VERSION_STR = "version"

val FILE_NAME_STR = "fileName"

val processUser:String = System.getProperty("user.name")

val defaultUser:String = "hadoop"

val bmlClient:BmlClient = if (StringUtils.isNotEmpty(processUser))
BmlClientFactory.createBmlClient(processUser) else BmlClientFactory.createBmlClient(defaultUser)

val seperator:String = File.separator

val pathType:String = "file://"

override def callPreExecuteHook(engineExecutorContext: EngineExecutorContext, executeRequest: ExecuteRequest): Unit = {
//1.删除工作目录以前的资源文件
//2.下载资源到当前进程的工作目录

val workDir = BmlHookUtils.getCurrentWorkDir
val jobId = engineExecutorContext.getJobId
executeRequest match {
case resourceExecuteRequest:ResourceExecuteRequest => val resources = resourceExecuteRequest.resources
resources foreach {
case resource:util.Map[String, Object] => val fileName = resource.get(FILE_NAME_STR).toString
val resourceId = resource.get(RESOURCE_ID_STR).toString
val version = resource.get(VERSION_STR).toString
val fullPath = if (workDir.endsWith(seperator)) pathType + workDir + fileName else
pathType + workDir + seperator + fileName
val response = Utils.tryCatch{
bmlClient.downloadResource(processUser, resourceId, version, fullPath, true)
}{
case error:ErrorException => logger.error("download resource for {} failed", error)
throw error
case t:Throwable => logger.error(s"download resource for $jobId failed", t)
val e1 = BmlHookDownloadException(t.getMessage)
e1.initCause(t)
throw t
}
if (response.isSuccess){
logger.info(s"for job $jobId resourceId $resourceId version $version download to path $fullPath ok")
}else{
logger.warn(s"for job $jobId resourceId $resourceId version $version download to path $fullPath Failed")
}
case _ => logger.warn("job resource cannot download")
}
case _ =>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.webank.wedatasphere.linkis.bml.hook

/**
* created by cooperyang on 2019/9/23
* Description:
*/

case class ResourceVersion(resourceId:String, version:String)


trait BmlResourceParser {
/**
* 通过传入的code
* @param code
* @return
*/
def getResource(code:String):Array[ResourceVersion]
}


object DefaultBmlResourceParser extends BmlResourceParser{
/**
* 通过传入的code
*
* @param code
* @return
*/
override def getResource(code: String): Array[ResourceVersion] = Array.empty
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.webank.wedatasphere.linkis.bml.utils

import com.webank.wedatasphere.linkis.common.utils.Utils

/**
* created by cooperyang on 2019/9/24
* Description:
*/
object BmlHookUtils {
val WORK_DIR_STR = "user.dir"
def getCurrentWorkDir:String = System.getProperty(WORK_DIR_STR)


def deleteAllFiles(workDir:String):Unit = {

}



}
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@
<module>publicService/configuration</module>
<module>publicService/variable</module>
<module>publicService/workspace</module>
<module>publicService/workspace/client/workspace-httpclient</module>
<module>metadata</module>
<module>ujes/engine</module>
<module>bml/bml-engine-hook</module>
<module>ujes/enginemanager</module>
<module>ujes/entrance</module>
<module>ujes/definedEngines/spark/engine</module>
Expand Down
5 changes: 5 additions & 0 deletions ujes/definedEngines/python/engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<artifactId>scalatest_2.11</artifactId>
<version>2.2.6</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-bml-hook</artifactId>
<version>${linkis.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
5 changes: 5 additions & 0 deletions ujes/definedEngines/spark/engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<artifactId>guava</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-bml-hook</artifactId>
<version>${linkis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.webank.wedatasphere.linkis.engine

/**
* created by cooperyang on 2019/11/29
* Description:
*/
trait ResourceExecuteRequest {
val resources:java.util.List[Object]
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ object EngineConfiguration {

val ENGINE_PUSH_PROGRESS_TO_ENTRANCE = CommonVars("wds.linkis.engine.push.progress.enable", true)


val ENGINE_PRE_EXECUTE_HOOK_CLASSES = CommonVars("wds.linkis.engine.pre.hook.class", "com.webank.wedatasphere.linkis.bml.hook.BmlEnginePreExecuteHook")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ package com.webank.wedatasphere.linkis.engine.execute

import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engine.conf.EngineConfiguration
import com.webank.wedatasphere.linkis.engine.exception.EngineErrorException
import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook
import com.webank.wedatasphere.linkis.resourcemanager.Resource
import com.webank.wedatasphere.linkis.scheduler.executer._
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.exception.ExceptionUtils

import scala.collection.mutable.ArrayBuffer

/**
* Created by enjoyyin on 2018/9/17.
*/
Expand All @@ -41,6 +45,24 @@ abstract class EngineExecutor(outputPrintLimit: Int, isSupportParallelism: Boole
private var succeedNum = 0


private val enginePreExecuteHooks:Array[EnginePreExecuteHook] = {
val hooks = new ArrayBuffer[EnginePreExecuteHook]()
EngineConfiguration.ENGINE_PRE_EXECUTE_HOOK_CLASSES.getValue.split(",") foreach {
hookStr => Utils.tryCatch{
val clazz = Class.forName(hookStr)
val obj = clazz.newInstance()
obj match {
case hook:EnginePreExecuteHook => hooks += hook
case _ => logger.warn(s"obj is not a engineHook obj is ${obj.getClass}")
}
}{
case e:Exception => logger.error("failed to load class", e)
}
}
hooks.toArray
}


def setCodeParser(codeParser: CodeParser) = this.codeParser = Some(codeParser)
def setResultSetListener(resultSetListener: ResultSetListener) = this.resultSetListener = Some(resultSetListener)
def getResultSetListener = resultSetListener
Expand Down Expand Up @@ -95,6 +117,15 @@ abstract class EngineExecutor(outputPrintLimit: Int, isSupportParallelism: Boole
else if(isSupportParallelism) whenAvailable(f) else ensureIdle(f)
ensureOp {
val engineExecutorContext = createEngineExecutorContext(executeRequest)
Utils.tryCatch{
enginePreExecuteHooks foreach {
hook => logger.info(s"${hook.hookName} begins to do a hook")
hook.callPreExecuteHook(engineExecutorContext, executeRequest)
logger.info(s"${hook.hookName} ends to do a hook")
}
}{
case e:Exception => logger.info("failed to do with hook")
}
var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes = Utils.tryCatch(codeParser.map(_.parse(executeRequest.code, engineExecutorContext)).getOrElse(Array(executeRequest.code))){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.webank.wedatasphere.linkis.engine.extension

import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteRequest

/**
* created by cooperyang on 2019/11/29
* Description:
*/
trait EnginePreExecuteHook {
val hookName:String
def callPreExecuteHook(engineExecutorContext:EngineExecutorContext, executeRequest: ExecuteRequest)
}

0 comments on commit da85420

Please sign in to comment.