Skip to content

Commit

Permalink
add bml engineHook adapted to engine
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzhiyue committed Nov 29, 2019
1 parent 3b1136a commit c23d63c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.webank.wedatasphere.linkis.engine

/**
* created by cooperyang on 2019/11/29
* Description:
*/
trait ResourceExecuteRequest {
val resources:java.util.List[Object]
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ object EngineConfiguration {

val ENGINE_PUSH_PROGRESS_TO_ENTRANCE = CommonVars("wds.linkis.engine.push.progress.enable", true)


val ENGINE_PRE_EXECUTE_HOOK_CLASSES = CommonVars("wds.linkis.engine.pre.hook.class", "com.webank.wedatasphere.linkis.bml.hook.BmlEnginePreExecuteHook")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ package com.webank.wedatasphere.linkis.engine.execute

import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engine.conf.EngineConfiguration
import com.webank.wedatasphere.linkis.engine.exception.EngineErrorException
import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook
import com.webank.wedatasphere.linkis.resourcemanager.Resource
import com.webank.wedatasphere.linkis.scheduler.executer._
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.exception.ExceptionUtils

import scala.collection.mutable.ArrayBuffer

/**
* Created by enjoyyin on 2018/9/17.
*/
Expand All @@ -41,6 +45,24 @@ abstract class EngineExecutor(outputPrintLimit: Int, isSupportParallelism: Boole
private var succeedNum = 0


private val enginePreExecuteHooks:Array[EnginePreExecuteHook] = {
val hooks = new ArrayBuffer[EnginePreExecuteHook]()
EngineConfiguration.ENGINE_PRE_EXECUTE_HOOK_CLASSES.getValue.split(",") foreach {
hookStr => Utils.tryCatch{
val clazz = Class.forName(hookStr)
val obj = clazz.newInstance()
obj match {
case hook:EnginePreExecuteHook => hooks += hook
case _ => logger.warn(s"obj is not a engineHook obj is ${obj.getClass}")
}
}{
case e:Exception => logger.error("failed to load class", e)
}
}
hooks.toArray
}


def setCodeParser(codeParser: CodeParser) = this.codeParser = Some(codeParser)
def setResultSetListener(resultSetListener: ResultSetListener) = this.resultSetListener = Some(resultSetListener)
def getResultSetListener = resultSetListener
Expand Down Expand Up @@ -95,6 +117,15 @@ abstract class EngineExecutor(outputPrintLimit: Int, isSupportParallelism: Boole
else if(isSupportParallelism) whenAvailable(f) else ensureIdle(f)
ensureOp {
val engineExecutorContext = createEngineExecutorContext(executeRequest)
Utils.tryCatch{
enginePreExecuteHooks foreach {
hook => logger.info(s"${hook.hookName} begins to do a hook")
hook.callPreExecuteHook(engineExecutorContext, executeRequest)
logger.info(s"${hook.hookName} ends to do a hook")
}
}{
case e:Exception => logger.info("failed to do with hook")
}
var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes = Utils.tryCatch(codeParser.map(_.parse(executeRequest.code, engineExecutorContext)).getOrElse(Array(executeRequest.code))){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.webank.wedatasphere.linkis.engine.extension

import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteRequest

/**
* created by cooperyang on 2019/11/29
* Description:
*/
trait EnginePreExecuteHook {
val hookName:String
def callPreExecuteHook(engineExecutorContext:EngineExecutorContext, executeRequest: ExecuteRequest)
}

0 comments on commit c23d63c

Please sign in to comment.