diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala index 3ac3cb7c88..45f3f49bed 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala @@ -17,7 +17,9 @@ package org.apache.linkis.computation.client.once +import org.apache.linkis.common.utils.Utils import org.apache.linkis.computation.client.once.action.{ + AskEngineConnAction, CreateEngineConnAction, EngineConnOperateAction, GetEngineConnAction, @@ -25,6 +27,7 @@ import org.apache.linkis.computation.client.once.action.{ LinkisManagerAction } import org.apache.linkis.computation.client.once.result.{ + AskEngineConnResult, CreateEngineConnResult, EngineConnOperateResult, GetEngineConnResult, @@ -39,6 +42,8 @@ import java.io.Closeable trait LinkisManagerClient extends Closeable { + def askEngineConn(askEngineConnAction: AskEngineConnAction): AskEngineConnResult + def createEngineConn(createEngineConnAction: CreateEngineConnAction): CreateEngineConnResult def getEngineConn(getEngineConnAction: GetEngineConnAction): GetEngineConnResult @@ -82,7 +87,21 @@ class LinkisManagerClientImpl(ujesClient: UJESClient) extends LinkisManagerClien override def executeEngineConnOperation( engineConnOperateAction: EngineConnOperateAction - ): EngineConnOperateResult = execute(engineConnOperateAction) + ): EngineConnOperateResult = { + Utils.tryCatch { + val rs = execute[EngineConnOperateResult](engineConnOperateAction) + rs + } { case e: Exception => + val rs = new EngineConnOperateResult + rs.setIsError(true) + rs.setErrorMsg(e.getMessage) + rs + } + } override def close(): Unit = ujesClient.close() + + override def askEngineConn(askEngineConnAction: AskEngineConnAction): AskEngineConnResult = + execute(askEngineConnAction) + } diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala new file mode 100644 index 0000000000..4b89b53764 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.computation.client.once.action + +import org.apache.linkis.httpclient.dws.DWSHttpClient +import org.apache.linkis.httpclient.request.POSTAction +import org.apache.linkis.ujes.client.exception.UJESJobException + +import org.apache.commons.lang3.StringUtils + +import java.util + +class AskEngineConnAction extends POSTAction with LinkisManagerAction { + + override def getRequestPayload: String = + DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads) + + override def suffixURLs: Array[String] = Array("linkisManager", "askEngineConn") + +} + +object AskEngineConnAction { + + def newBuilder(): Builder = new Builder + + class Builder private[AskEngineConnAction] () { + private var user: String = _ + private var properties: util.Map[String, String] = _ + private var labels: util.Map[String, String] = _ + private var maxSubmitTime: Long = _ + private var createService: String = _ + private var description: String = _ + + def setUser(user: String): Builder = { + this.user = user + this + } + + def setProperties(properties: util.Map[String, String]): Builder = { + this.properties = properties + this + } + + def setLabels(labels: java.util.Map[String, String]): Builder = { + this.labels = labels + this + } + + def setMaxSubmitTime(maxSubmitTime: Long): Builder = { + this.maxSubmitTime = maxSubmitTime + this + } + + def setCreateService(createService: String): Builder = { + this.createService = createService + this + } + + def setDescription(description: String): Builder = { + this.description = description + this + } + + def build(): AskEngineConnAction = { + val action = new AskEngineConnAction() + if (user == null) throw new UJESJobException("user is needed!") + if (properties == null) properties = new java.util.HashMap[String, String] + if (labels == null) throw new UJESJobException("labels is needed!") + action.setUser(user) + action.addRequestPayload("properties", properties) + action.addRequestPayload("labels", labels) + if (StringUtils.isNotBlank(createService)) { + action.addRequestPayload("createService", createService) + } + if (null != maxSubmitTime) { + action.addRequestPayload("timeOut", maxSubmitTime) + } + if (StringUtils.isNotBlank(description)) { + action.addRequestPayload("description", description) + } + action + } + + } + +} diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala new file mode 100644 index 0000000000..58c6085b45 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.computation.client.once.result + +import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult + +@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/askEngineConn") +class AskEngineConnResult extends GetEngineConnResult diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala index 1bf12e0418..50df73bd10 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala @@ -33,9 +33,11 @@ class EngineConnOperateResult extends LinkisManagerResult { this.result = result } + def getErrorMsg(): String = errorMsg + def setErrorMsg(errorMsg: String): Unit = this.errorMsg = errorMsg - def setError(isError: Boolean): Unit = this.isError = isError + def getIsError(): Boolean = isError def setIsError(isError: Boolean): Unit = this.isError = isError diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala index 492ae76b68..4992b17814 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala @@ -95,6 +95,10 @@ trait SimpleOnceJob extends OnceJob { case operator => operator } + def getEcServiceInstance: ServiceInstance = serviceInstance + + def getEcTicketId: String = ticketId + } class SubmittableSimpleOnceJob( diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala index 83399bf371..a1dba63404 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala @@ -19,6 +19,7 @@ package org.apache.linkis.computation.client.operator.impl import org.apache.linkis.computation.client.once.result.EngineConnOperateResult import org.apache.linkis.computation.client.operator.OnceJobOperator +import org.apache.linkis.governance.common.constant.ec.ECConstants import org.apache.linkis.ujes.client.exception.UJESJobException class EngineConnApplicationInfoOperator extends OnceJobOperator[ApplicationInfo] { @@ -28,7 +29,7 @@ class EngineConnApplicationInfoOperator extends OnceJobOperator[ApplicationInfo] override protected def resultToObject(result: EngineConnOperateResult): ApplicationInfo = { ApplicationInfo( result - .getAsOption("applicationId") + .getAsOption(ECConstants.YARN_APPID_NAME_KEY) .getOrElse( throw new UJESJobException( 20300, @@ -36,14 +37,14 @@ class EngineConnApplicationInfoOperator extends OnceJobOperator[ApplicationInfo] ) ), result - .getAsOption("applicationUrl") + .getAsOption(ECConstants.YARN_APP_URL_KEY) .getOrElse( throw new UJESJobException( 20300, s"Cannot get applicationUrl from EngineConn $getServiceInstance." ) ), - result.getAs("queue") + result.getAs(ECConstants.QUEUE) ) } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java new file mode 100644 index 0000000000..37c6fc8d92 --- /dev/null +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.governance.common.enums; + +public enum OnceJobOperationBoundary { + ECM("ecm"), + EC("ec"); + + private String name; + + OnceJobOperationBoundary(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java index c0d295755a..89d3c9eba4 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java @@ -37,4 +37,6 @@ public class EngineConnExecutorErrorCode { public static final int SEND_TO_ENTRANCE_ERROR = 40105; public static final int INIT_EXECUTOR_FAILED = 40106; + + public static final int INVALID_APPLICATION_ID = 40107; } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala index fc7d1c8904..0029faa91a 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala @@ -86,6 +86,10 @@ object GovernanceCommonConf { CommonVars(envKey, "").getValue } + // value ECConstants.EC_CLIENT_TYPE_ATTACH + val EC_APP_MANAGE_MODE = + CommonVars("linkis.ec.app.manage.mode", "attach") + val SCALA_PARSE_APPEND_CODE_ENABLED = CommonVars("linkis.scala.parse.append.code.enable", true).getValue diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala index fe48f6887d..a94eadf422 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala @@ -35,4 +35,43 @@ object ECConstants { val YARN_QUEUE_NAME_CONFIG_KEY = "wds.linkis.rm.yarnqueue" + val QUEUE = "queue" + + val EC_CLIENT_TYPE_ATTACH = "attach" + + val EC_CLIENT_TYPE_DETACH = "detach" + + val YARN_APPID_NAME_KEY = "applicationId" + + val YARN_APP_URL_KEY = "applicationUrl" + + val YARN_APP_NAME_KEY = "appicationName" + + val YARN_MODE_KEY = "yarnMode" + + val EC_SERVICE_INSTANCE_KEY = "serviceInstance" + + val ECM_SERVICE_INSTANCE_KEY = "ecmServiceInstance" + + val MANAGER_SERVICE_INSTANCE_KEY = "managerServiceInstance" + + val NODE_STATUS_KEY = "nodeStatus" + + val EC_LAST_UNLOCK_TIMESTAMP = "lastUnlockTimestamp" + + val YARN_APP_TYPE_LIST_KEY = "yarnAppTypeList" + + val YARN_APP_STATE_LIST_KEY = "yarnAppStateList" + + val YARN_APP_TYPE_KEY = "yarnAppType" + + val YARN_APP_TYPE_SPARK = "spark" + + val YARN_APP_TYPE_FLINK = "flink" + + val EC_OPERATE_LIST = "list" + + val EC_OPERATE_STATUS = "status" + + val YARN_APP_RESULT_LIST_KEY = "yarnAppResultList" } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java index de6bb440dd..a84f581153 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java @@ -55,7 +55,7 @@ public class TimingMonitorService implements InitializingBean, Runnable { @Override public void afterPropertiesSet() throws Exception { - if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM()) { + if ((Boolean) AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM().getValue()) { Utils.defaultScheduler() .scheduleAtFixedRate( this, 3 * 60 * 1000, MONITOR_INTERVAL.getValue().toLong(), TimeUnit.MILLISECONDS); diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala index eccf54bfad..010ced97fd 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala @@ -21,10 +21,13 @@ import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.engineconn.acessible.executor.info.NodeHeartbeatMsgManager import org.apache.linkis.engineconn.computation.executor.metrics.ComputationEngineConnMetrics import org.apache.linkis.engineconn.core.EngineConnObject -import org.apache.linkis.engineconn.executor.entity.{Executor, SensibleExecutor} +import org.apache.linkis.engineconn.executor.entity.{Executor, SensibleExecutor, YarnExecutor} import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.linkis.server.BDPJettyServerHelper +import org.apache.commons.lang3.StringUtils + import org.springframework.stereotype.Component import java.util @@ -72,6 +75,22 @@ class DefaultNodeHeartbeatMsgManager extends NodeHeartbeatMsgManager with Loggin engineParams.get(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY).asInstanceOf[Object] ) } + executor match { + case yarnExecutor: YarnExecutor => + if (StringUtils.isNotBlank(yarnExecutor.getQueue)) { + msgMap.put(ECConstants.YARN_QUEUE_NAME_KEY, yarnExecutor.getQueue) + } + if (StringUtils.isNotBlank(yarnExecutor.getApplicationId)) { + msgMap.put(ECConstants.YARN_APPID_NAME_KEY, yarnExecutor.getApplicationId) + } + if (StringUtils.isNotBlank(yarnExecutor.getApplicationURL)) { + msgMap.put(ECConstants.YARN_APP_URL_KEY, yarnExecutor.getApplicationURL) + } + if (StringUtils.isNotBlank(yarnExecutor.getYarnMode)) { + msgMap.put(ECConstants.YARN_MODE_KEY, yarnExecutor.getYarnMode) + } + case _ => + } Utils.tryCatch(BDPJettyServerHelper.gson.toJson(msgMap)) { case e: Exception => val msgs = msgMap.asScala .map { case (k, v) => if (null == v) s"${k}->null" else s"${k}->${v.toString}" } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala index 7c180731a4..61242beaae 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala @@ -137,7 +137,7 @@ class LabelExecutorManagerImpl extends LabelExecutorManager with Logging { } protected def getLabelKey(labels: Array[Label[_]]): String = - labels.map(_.getStringValue).mkString("&") + labels.filter(null != _).map(_.getStringValue).mkString("&") protected def createExecutor( engineCreationContext: EngineCreationContext, diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java index 28a75d3f93..66e1c575f0 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java @@ -22,6 +22,7 @@ import org.apache.linkis.engineconn.core.executor.LabelExecutorManager; import org.apache.linkis.engineconn.executor.entity.Executor; import org.apache.linkis.engineconn.executor.entity.YarnExecutor; +import org.apache.linkis.governance.common.constant.ec.ECConstants; import org.apache.linkis.manager.common.operator.Operator; import java.util.HashMap; @@ -43,10 +44,10 @@ public Map apply(Map parameters) { if (reportExecutor instanceof YarnExecutor) { YarnExecutor yarnExecutor = (YarnExecutor) reportExecutor; Map result = new HashMap<>(); - result.put("applicationId", yarnExecutor.getApplicationId()); - result.put("applicationUrl", yarnExecutor.getApplicationURL()); - result.put("queue", yarnExecutor.getQueue()); - result.put("yarnMode", yarnExecutor.getYarnMode()); + result.put(ECConstants.YARN_APPID_NAME_KEY(), yarnExecutor.getApplicationId()); + result.put(ECConstants.YARN_APP_URL_KEY(), yarnExecutor.getApplicationURL()); + result.put(ECConstants.QUEUE(), yarnExecutor.getQueue()); + result.put(ECConstants.YARN_MODE_KEY(), yarnExecutor.getYarnMode()); return result; } else { throw new EngineConnException( diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala index 26a25a1539..95a01202e8 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala @@ -42,8 +42,8 @@ object AccessibleExecutorConfiguration { val ENGINECONN_LOCK_CHECK_INTERVAL = CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m")) - val ENGINECONN_SUPPORT_PARALLELISM: Boolean = - CommonVars("wds.linkis.engineconn.support.parallelism", false).getValue + val ENGINECONN_SUPPORT_PARALLELISM = + CommonVars("wds.linkis.engineconn.support.parallelism", false) val ENGINECONN_HEARTBEAT_TIME = CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m")) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala index 53cdd44b05..93cb41f344 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala @@ -43,9 +43,13 @@ class AccessibleExecutorSpringConfiguration extends Logging { def createLockManager(): LockService = { val lockService = - if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) { + if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()) { + logger.info("Engine supports parallelism.") new EngineConnConcurrentLockService - } else new EngineConnTimedLockService + } else { + logger.info("Engine doesn't support parallelism.") + new EngineConnTimedLockService + } asyncListenerBusContext.addListener(lockService) lockService } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala new file mode 100644 index 0000000000..12e42c66a5 --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconn.acessible.executor.hook + +import org.apache.linkis.manager.common.protocol.engine.{ + EngineOperateRequest, + EngineOperateResponse +} + +import scala.collection.mutable.ArrayBuffer + +trait OperationHook { + def getName(): String + + def doPreOperation( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit + + def doPostOperation( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit + +} + +object OperationHook { + private var operationHooks: ArrayBuffer[OperationHook] = new ArrayBuffer[OperationHook]() + + def registerOperationHook(operationHook: OperationHook): Unit = { + operationHooks.append(operationHook) + } + + def getOperationHooks(): Array[OperationHook] = operationHooks.toArray +} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala index 06fd13b0e9..8ef944fc9c 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala @@ -74,7 +74,7 @@ class DefaultAccessibleService extends AccessibleService with Logging { DataWorkCloudApplication.getServiceInstance.equals(engineSuicideRequest.getServiceInstance) ) { stopEngine() - logger.info(s"engine will suiside now.") + logger.info(s"engine was asked to suiside by ${engineSuicideRequest.getUser} now.") ShutdownHook.getShutdownHook.notifyStop() } else { if (null != engineSuicideRequest.getServiceInstance) { diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala index 067e0d2cbb..ea3248ba6d 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala @@ -78,6 +78,7 @@ class DefaultExecutorHeartbeatService heartbeatTime, TimeUnit.MILLISECONDS ) + ExecutorHeartbeatServiceHolder.registerHeartBeatService(this) } /** diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala index 1040df40a6..f82c5e9441 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala @@ -18,6 +18,7 @@ package org.apache.linkis.engineconn.acessible.executor.service import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.acessible.executor.hook.OperationHook import org.apache.linkis.manager.common.operator.OperatorFactory import org.apache.linkis.manager.common.protocol.engine.{ EngineOperateRequest, @@ -31,6 +32,8 @@ import org.springframework.stereotype.Service import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter + @Service class DefaultOperateService extends OperateService with Logging { @@ -38,27 +41,59 @@ class DefaultOperateService extends OperateService with Logging { override def executeOperation( engineOperateRequest: EngineOperateRequest ): EngineOperateResponse = { - val parameters = engineOperateRequest.getParameters() + var response: EngineOperateResponse = null + + val parameters = + engineOperateRequest.getParameters.asScala.toMap.asInstanceOf[util.Map[String, Object]] val operator = Utils.tryCatch(OperatorFactory.apply().getOperatorRequest(parameters)) { t => logger.error(s"Get operator failed, parameters is ${engineOperateRequest.getParameters}.", t) - return new EngineOperateResponse( - new util.HashMap, + response = new EngineOperateResponse( + new util.HashMap[String, Object](), true, ExceptionUtils.getRootCauseMessage(t) ) + doPostHook(engineOperateRequest, response) + return response } logger.info( s"Try to execute operator ${operator.getClass.getSimpleName} with parameters ${engineOperateRequest.getParameters}." ) val result = Utils.tryCatch(operator(parameters)) { t => logger.error(s"Execute ${operator.getClass.getSimpleName} failed.", t) - return new EngineOperateResponse( - new util.HashMap, + response = new EngineOperateResponse( + new util.HashMap[String, Object](), true, ExceptionUtils.getRootCauseMessage(t) ) + doPostHook(engineOperateRequest, response) + return response + } + logger.info(s"End to execute operator ${operator.getClass.getSimpleName}.") + response = new EngineOperateResponse(result) + doPostHook(engineOperateRequest, response) + response + } + + private def doPreHook( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit = { + Utils.tryAndWarn { + OperationHook + .getOperationHooks() + .foreach(hook => hook.doPreOperation(engineOperateRequest, engineOperateResponse)) + } + } + + private def doPostHook( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit = { + Utils.tryAndWarn { + OperationHook + .getOperationHooks() + .foreach(hook => hook.doPostOperation(engineOperateRequest, engineOperateResponse)) } - new EngineOperateResponse(result) } } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala index 21325f42bc..b5bbc26f92 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala @@ -51,7 +51,7 @@ class EngineConnTimedLockService extends LockService with Logging { private var lockType: EngineLockType = EngineLockType.Timed private def isSupportParallelism: Boolean = - AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM + AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue() /** * @param lock diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala index bfecf73252..7abcbe8dcf 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala @@ -34,3 +34,14 @@ trait ExecutorHeartbeatService { def dealNodeHeartbeatRequest(nodeHeartbeatRequest: NodeHeartbeatRequest): NodeHeartbeatMsg } + +object ExecutorHeartbeatServiceHolder { + + private var executorHeartbeatService: ExecutorHeartbeatService = _ + + def registerHeartBeatService(executorHeartbeatService: ExecutorHeartbeatService): Unit = + this.executorHeartbeatService = executorHeartbeatService + + def getDefaultHeartbeatService(): ExecutorHeartbeatService = executorHeartbeatService + +} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java index 3734e3bdf6..c05768739c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java @@ -28,7 +28,11 @@ public enum AMErrorCode implements LinkisErrorCode { NOT_EXISTS_ENGINE_CONN(210003, "Not exists EngineConn(不存在的引擎)"), - AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)"); + AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)"), + + ASK_ENGINE_ERROR_RETRY(210005, "Ask engine error, retry(请求引擎失败,重试)"), + + EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)"); private final int errorCode; diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java index 3d6e0bc395..14cad1380e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java @@ -23,28 +23,28 @@ import org.apache.linkis.common.utils.ByteTimeUtils; import org.apache.linkis.common.utils.JsonUtils; import org.apache.linkis.governance.common.conf.GovernanceCommonConf; +import org.apache.linkis.governance.common.constant.ec.ECConstants; +import org.apache.linkis.governance.common.utils.JobUtils; +import org.apache.linkis.governance.common.utils.LoggerUtils; import org.apache.linkis.manager.am.conf.AMConfiguration; import org.apache.linkis.manager.am.exception.AMErrorCode; import org.apache.linkis.manager.am.exception.AMErrorException; import org.apache.linkis.manager.am.manager.EngineNodeManager; import org.apache.linkis.manager.am.service.ECResourceInfoService; -import org.apache.linkis.manager.am.service.engine.EngineCreateService; -import org.apache.linkis.manager.am.service.engine.EngineInfoService; -import org.apache.linkis.manager.am.service.engine.EngineOperateService; -import org.apache.linkis.manager.am.service.engine.EngineStopService; +import org.apache.linkis.manager.am.service.engine.*; import org.apache.linkis.manager.am.util.ECResourceInfoUtils; import org.apache.linkis.manager.am.utils.AMUtils; import org.apache.linkis.manager.am.vo.AMEngineNodeVo; +import org.apache.linkis.manager.common.constant.AMConstant; import org.apache.linkis.manager.common.entity.enumeration.NodeStatus; import org.apache.linkis.manager.common.entity.node.AMEMNode; +import org.apache.linkis.manager.common.entity.node.EMNode; import org.apache.linkis.manager.common.entity.node.EngineNode; import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord; -import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest; -import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest; -import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse; -import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest; +import org.apache.linkis.manager.common.protocol.engine.*; import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory; import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext; +import org.apache.linkis.manager.label.constant.LabelKeyConstant; import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.UserModifiable; import org.apache.linkis.manager.label.exception.LabelErrorException; @@ -65,6 +65,7 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.*; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -103,6 +104,8 @@ public class EngineRestfulApi { @Autowired private ECResourceInfoService ecResourceInfoService; + @Autowired private EngineReuseService engineReuseService; + private final ObjectMapper objectMapper = new ObjectMapper(); private LabelBuilderFactory stdLabelBuilderFactory = @@ -110,6 +113,183 @@ public class EngineRestfulApi { private static final Logger logger = LoggerFactory.getLogger(EngineRestfulApi.class); + @ApiOperation(value = "askEngineConn", response = Message.class) + @ApiOperationSupport(ignoreParameters = {"jsonNode"}) + @RequestMapping(path = "/askEngineConn", method = RequestMethod.POST) + public Message askEngineConn( + HttpServletRequest req, @RequestBody EngineAskRequest engineAskRequest) + throws IOException, InterruptedException { + String userName = ModuleUserUtils.getOperationUser(req, "askEngineConn"); + engineAskRequest.setUser(userName); + long timeout = engineAskRequest.getTimeOut(); + if (timeout <= 0) { + timeout = AMConfiguration.ENGINE_CONN_START_REST_MAX_WAIT_TIME.getValue().toLong(); + engineAskRequest.setTimeOut(timeout); + } + Map retEngineNode = new HashMap<>(); + logger.info( + "User {} try to ask an engineConn with maxStartTime {}. EngineAskRequest is {}.", + userName, + ByteTimeUtils.msDurationToString(timeout), + engineAskRequest); + Sender sender = Sender.getSender(Sender.getThisServiceInstance()); + EngineNode engineNode = null; + + // try to reuse ec first + String taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties()); + LoggerUtils.setJobIdMDC(taskId); + logger.info("received task : {}, engineAskRequest : {}", taskId, engineAskRequest); + if (!engineAskRequest.getLabels().containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) { + EngineReuseRequest engineReuseRequest = new EngineReuseRequest(); + engineReuseRequest.setLabels(engineAskRequest.getLabels()); + engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut()); + engineReuseRequest.setUser(engineAskRequest.getUser()); + engineReuseRequest.setProperties(engineAskRequest.getProperties()); + boolean end = false; + EngineNode reuseNode = null; + int count = 0; + int MAX_RETRY = 2; + while (!end) { + try { + reuseNode = engineReuseService.reuseEngine(engineReuseRequest, sender); + end = true; + } catch (LinkisRetryException e) { + logger.error( + "task: {}, user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e); + Thread.sleep(1000); + end = false; + count += 1; + if (count > MAX_RETRY) { + end = true; + } + } catch (Exception e1) { + logger.info( + "task: {} user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e1); + end = true; + } + } + if (null != reuseNode) { + logger.info( + "Finished to ask engine for task: {}, user: {} by reuse node {}", + taskId, + engineReuseRequest.getUser(), + reuseNode); + LoggerUtils.removeJobIdMDC(); + engineNode = reuseNode; + } + } + + if (null != engineNode) { + fillResultEngineNode(retEngineNode, engineNode); + return Message.ok("reuse engineConn ended.").data("engine", retEngineNode); + } + + String engineAskAsyncId = AMUtils.getAsyncId(); + Callable createECTask = + new Callable() { + @Override + public Object call() { + LoggerUtils.setJobIdMDC(taskId); + logger.info( + "Task: {}, start to async({}) createEngine: {}", + taskId, + engineAskAsyncId, + engineAskRequest.getCreateService()); + // 如果原来的labels含engineInstance ,先去掉 + engineAskRequest.getLabels().remove("engineInstance"); + EngineCreateRequest engineCreateRequest = new EngineCreateRequest(); + engineCreateRequest.setLabels(engineAskRequest.getLabels()); + engineCreateRequest.setTimeout(engineAskRequest.getTimeOut()); + engineCreateRequest.setUser(engineAskRequest.getUser()); + engineCreateRequest.setProperties(engineAskRequest.getProperties()); + engineCreateRequest.setCreateService(engineAskRequest.getCreateService()); + try { + EngineNode createNode = engineCreateService.createEngine(engineCreateRequest, sender); + long timeout = 0L; + if (engineCreateRequest.getTimeout() <= 0) { + timeout = AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong(); + } else { + timeout = engineCreateRequest.getTimeout(); + } + // useEngine 需要加上超时 + EngineNode createEngineNode = engineNodeManager.useEngine(createNode, timeout); + if (null == createEngineNode) { + throw new LinkisRetryException( + AMConstant.EM_ERROR_CODE, + "create engine${createNode.getServiceInstance} success, but to use engine failed"); + } + logger.info( + "Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"); + return createEngineNode; + } catch (Exception e) { + logger.error( + "Task: {} failed to ask engine for user {} by create node", taskId, userName, e); + return new LinkisRetryException(AMConstant.EM_ERROR_CODE, e.getMessage()); + } finally { + LoggerUtils.removeJobIdMDC(); + } + } + }; + + try { + Object rs = createECTask.call(); + if (rs instanceof LinkisRetryException) { + throw (LinkisRetryException) rs; + } else { + engineNode = (EngineNode) rs; + } + } catch (LinkisRetryException retryException) { + logger.error( + "User {} create engineConn failed get retry exception. can be Retry", + userName, + retryException); + return Message.error( + String.format( + "Create engineConn failed, caused by %s.", + ExceptionUtils.getRootCauseMessage(retryException))) + .data("canRetry", true); + } catch (Exception e) { + LoggerUtils.removeJobIdMDC(); + logger.error("User {} create engineConn failed get retry exception", userName, e); + return Message.error( + String.format( + "Create engineConn failed, caused by %s.", ExceptionUtils.getRootCauseMessage(e))); + } + + LoggerUtils.removeJobIdMDC(); + fillResultEngineNode(retEngineNode, engineNode); + logger.info( + "Finished to create a engineConn for user {}. NodeInfo is {}.", userName, engineNode); + // to transform to a map + return Message.ok("create engineConn ended.").data("engine", retEngineNode); + } + + private void fillNullNode( + Map retEngineNode, EngineAskAsyncResponse askAsyncResponse) { + retEngineNode.put(AMConstant.EC_ASYNC_START_RESULT_KEY, AMConstant.EC_ASYNC_START_RESULT_FAIL); + retEngineNode.put( + AMConstant.EC_ASYNC_START_FAIL_MSG_KEY, + "Got null response for asyId : " + askAsyncResponse.getId()); + retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(), Sender.getThisServiceInstance()); + } + + private void fillResultEngineNode(Map retEngineNode, EngineNode engineNode) { + retEngineNode.put( + AMConstant.EC_ASYNC_START_RESULT_KEY, AMConstant.EC_ASYNC_START_RESULT_SUCCESS); + retEngineNode.put("serviceInstance", engineNode.getServiceInstance()); + if (null == engineNode.getNodeStatus()) { + engineNode.setNodeStatus(NodeStatus.Starting); + } + retEngineNode.put(ECConstants.NODE_STATUS_KEY(), engineNode.getNodeStatus().toString()); + retEngineNode.put(ECConstants.EC_TICKET_ID_KEY(), engineNode.getTicketId()); + EMNode emNode = engineNode.getEMNode(); + if (null != emNode) { + retEngineNode.put( + ECConstants.ECM_SERVICE_INSTANCE_KEY(), engineNode.getEMNode().getServiceInstance()); + } + retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(), Sender.getThisServiceInstance()); + } + @ApiOperation(value = "createEngineConn", response = Message.class) @ApiOperationSupport(ignoreParameters = {"jsonNode"}) @RequestMapping(path = "/createEngineConn", method = RequestMethod.POST) @@ -149,13 +329,7 @@ public Message createEngineConn( "Finished to create a engineConn for user {}. NodeInfo is {}.", userName, engineNode); // to transform to a map Map retEngineNode = new HashMap<>(); - retEngineNode.put("serviceInstance", engineNode.getServiceInstance()); - if (null == engineNode.getNodeStatus()) { - engineNode.setNodeStatus(NodeStatus.Starting); - } - retEngineNode.put("nodeStatus", engineNode.getNodeStatus().toString()); - retEngineNode.put("ticketId", engineNode.getTicketId()); - retEngineNode.put("ecmServiceInstance", engineNode.getEMNode().getServiceInstance()); + fillResultEngineNode(retEngineNode, engineNode); return Message.ok("create engineConn succeed.").data("engine", retEngineNode); } @@ -173,6 +347,7 @@ public Message getEngineConn(HttpServletRequest req, @RequestBody JsonNode jsonN } catch (Exception e) { logger.info("Instances {} does not exist", serviceInstance.getInstance()); } + String ecMetrics = null; if (null == engineNode) { ECResourceInfoRecord ecInfo = null; if (null != ticketIdNode) { @@ -189,12 +364,19 @@ public Message getEngineConn(HttpServletRequest req, @RequestBody JsonNode jsonN if (null == ecInfo) { return Message.error("Instance does not exist " + serviceInstance); } + if (null == ecMetrics) { + ecMetrics = ecInfo.getMetrics(); + } engineNode = ECResourceInfoUtils.convertECInfoTOECNode(ecInfo); + } else { + ecMetrics = engineNode.getEcMetrics(); } if (!userName.equals(engineNode.getOwner()) && Configuration.isNotAdmin(userName)) { return Message.error("You have no permission to access EngineConn " + serviceInstance); } - return Message.ok().data("engine", engineNode); + Message result = Message.ok().data("engine", engineNode); + result.data(AMConstant.EC_METRICS_KEY, ecMetrics); + return result; } @ApiOperation(value = "kill egineconn", notes = "kill engineconn", response = Message.class) @@ -487,6 +669,11 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J ServiceInstance serviceInstance = getServiceInstance(jsonNode); logger.info("User {} try to execute Engine Operation {}.", userName, serviceInstance); EngineNode engineNode = engineNodeManager.getEngineNode(serviceInstance); + if (null == engineNode) { + return Message.ok() + .data("isError", true) + .data("errorMsg", "Ec : " + serviceInstance.toString() + " not found."); + } if (!userName.equals(engineNode.getOwner()) && Configuration.isNotAdmin(userName)) { return Message.error("You have no permission to execute Engine Operation " + serviceInstance); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java index 36675ff842..ab5799063e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java @@ -22,6 +22,7 @@ import org.apache.linkis.governance.common.utils.LoggerUtils; import org.apache.linkis.manager.am.conf.AMConfiguration; import org.apache.linkis.manager.am.util.LinkisUtils; +import org.apache.linkis.manager.am.utils.AMUtils; import org.apache.linkis.manager.common.constant.AMConstant; import org.apache.linkis.manager.common.entity.node.EngineNode; import org.apache.linkis.manager.common.protocol.engine.*; @@ -36,7 +37,6 @@ import java.net.SocketTimeoutException; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import feign.RetryableException; import org.slf4j.Logger; @@ -50,8 +50,6 @@ public class DefaultEngineAskEngineService extends AbstractEngineService private EngineCreateService engineCreateService; private EngineReuseService engineReuseService; - private AtomicInteger idCreator = new AtomicInteger(); - private String idPrefix = Sender.getThisServiceInstance().getInstance(); private static final ThreadPoolExecutor EXECUTOR = LinkisUtils.newCachedThreadPool( @@ -103,7 +101,7 @@ public Object askEngine(EngineAskRequest engineAskRequest, Sender sender) { } } - String engineAskAsyncId = getAsyncId(); + String engineAskAsyncId = AMUtils.getAsyncId(); CompletableFuture createNodeThread = CompletableFuture.supplyAsync( () -> { @@ -197,8 +195,4 @@ public Object askEngine(EngineAskRequest engineAskRequest, Sender sender) { LoggerUtils.removeJobIdMDC(); return new EngineAskAsyncResponse(engineAskAsyncId, Sender.getThisServiceInstance()); } - - private String getAsyncId() { - return idPrefix + "_" + idCreator.getAndIncrement(); - } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java index 5fc8529661..85c7470ce5 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java @@ -125,6 +125,7 @@ public static AMEngineNode convertECInfoTOECNode(ECResourceInfoRecord ecInfo) { engineNode.setTicketId(ecInfo.getTicketId()); engineNode.setStartTime(ecInfo.getCreateTime()); engineNode.setUpdateTime(ecInfo.getReleaseTime()); + engineNode.setEcMetrics(ecInfo.getMetrics()); return engineNode; } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java index 43144e53f0..660d393238 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java @@ -25,11 +25,13 @@ import org.apache.linkis.manager.common.entity.resource.*; import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.rpc.Sender; import java.lang.reflect.Type; import java.text.SimpleDateFormat; import java.util.*; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -41,6 +43,9 @@ public class AMUtils { private static final Logger logger = LoggerFactory.getLogger(AMUtils.class); + private static final AtomicInteger idCreator = new AtomicInteger(); + private static String idPrefix = Sender.getThisServiceInstance().getInstance(); + private static Gson GSON = new GsonBuilder() .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") @@ -335,4 +340,8 @@ public static boolean isJson(String str) { return false; } } + + public static String getAsyncId() { + return idPrefix + "_" + idCreator.getAndIncrement(); + } } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java index bcb14a7045..362932083c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java @@ -63,4 +63,6 @@ public class LabelKeyConstant { public static final String TENANT_KEY = "tenant"; public static final String FIXED_EC_KEY = "fixedEngineConn"; + + public static final String MANAGER_KEY = "manager"; } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java new file mode 100644 index 0000000000..674cc605af --- /dev/null +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.manager.label.entity.engine; + +import org.apache.linkis.manager.label.constant.LabelKeyConstant; +import org.apache.linkis.manager.label.entity.EngineNodeLabel; +import org.apache.linkis.manager.label.entity.Feature; +import org.apache.linkis.manager.label.entity.GenericLabel; +import org.apache.linkis.manager.label.entity.annon.ValueSerialNum; + +import java.util.HashMap; + +public class ManagerLabel extends GenericLabel implements EngineNodeLabel { + + public ManagerLabel() { + setLabelKey(LabelKeyConstant.MANAGER_KEY); + } + + @Override + public Feature getFeature() { + return Feature.CORE; + } + + public String getManager() { + if (null == getValue()) { + return null; + } + return getValue().get(getLabelKey()); + } + + @ValueSerialNum(0) + public void setManager(String manager) { + if (null == getValue()) { + setValue(new HashMap<>()); + } + getValue().put(getLabelKey(), manager); + } +} diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala index a24445269a..21a067ed45 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala @@ -42,6 +42,7 @@ object RunType extends Enumeration { val ES_JSON = Value("esjson") val TRINO_SQL = Value("tsql") + val JSON = Value("json") val SEATUNNEL_ZETA = Value("szeta") diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java index 081e5e605e..09d802a951 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java @@ -34,4 +34,30 @@ public class AMConstant { public static final String START_REASON = "start_reason"; public static final String EC_CAN_RETRY = "ec_can_try"; + + public static final String EC_ASYNC_START_ID_KEY = "ecAsyncStartId"; + + public static final String EC_ASYNC_START_MANAGER_INSTANCE_KEY = "managerInstance"; + + /* + result : starting,success,failed + */ + public static final String EC_ASYNC_START_RESULT_KEY = "ecAsyncStartResult"; + + /* + default false + */ + public static final String EC_SYNC_START_KEY = "ecSyncStart"; + + public static final String EC_ASYNC_START_RESULT_SUCCESS = "success"; + + public static final String EC_ASYNC_START_RESULT_FAIL = "failed"; + + public static final String EC_ASYNC_START_RESULT_STARTING = "starting"; + + public static final String EC_ASYNC_START_FAIL_RETRY_KEY = "canRetry"; + + public static final String EC_ASYNC_START_FAIL_MSG_KEY = "failMsg"; + + public static final String EC_METRICS_KEY = "ecMetrics"; } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java index 7343aab2d2..a560eec339 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java @@ -62,6 +62,8 @@ public class AMEngineNode implements EngineNode, ScoreServiceInstance { private String ticketId; + private String ecMetrics; + public AMEngineNode() {} public AMEngineNode(double score, ServiceInstance serviceInstance) { @@ -210,6 +212,16 @@ public void setTicketId(String ticketId) { this.ticketId = ticketId; } + @Override + public String getEcMetrics() { + return ecMetrics; + } + + @Override + public void setEcMetrics(String metrics) { + this.ecMetrics = metrics; + } + @Override public Date getUpdateTime() { return updateTime; diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java index e3b8548bfc..627b41bc55 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java @@ -26,4 +26,12 @@ public interface EngineNode extends AMNode, RMNode, LabelNode { String getLock(); void setLock(String lock); + + String getTicketId(); + + void setTicketId(String ticketId); + + String getEcMetrics(); + + void setEcMetrics(String metrics); } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java index 8637887e57..a3e02f0212 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java @@ -58,7 +58,10 @@ public String toString() { + ", engineType='" + engineType + '\'' - + "} " + + ", heartbeatMsg='" + + heartbeatMsg + + '\'' + + '}' + super.toString(); } } diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java deleted file mode 100644 index ddf4838cdb..0000000000 --- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.engineconnplugin.flink.operator; - -import org.apache.linkis.common.exception.WarnException; -import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager$; -import org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter; -import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary; -import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException; -import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor; -import org.apache.linkis.manager.common.operator.Operator; - -import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TriggerSavepointOperator implements Operator { - - private static final Logger logger = LoggerFactory.getLogger(TriggerSavepointOperator.class); - - @Override - public String[] getNames() { - return new String[] {"doSavepoint"}; - } - - @Override - public Map apply(Map parameters) { - String savepoint = getAsThrow(parameters, "savepointPath"); - String mode = getAsThrow(parameters, "mode"); - logger.info("try to " + mode + " savepoint with path " + savepoint + "."); - - if (OnceExecutorManager$.MODULE$.getInstance().getReportExecutor() - instanceof FlinkOnceExecutor) { - FlinkOnceExecutor flinkExecutor = - (FlinkOnceExecutor) OnceExecutorManager$.MODULE$.getInstance().getReportExecutor(); - ClusterDescriptorAdapter clusterDescriptorAdapter = - (ClusterDescriptorAdapter) flinkExecutor.getClusterDescriptorAdapter(); - String writtenSavepoint = ""; - try { - writtenSavepoint = clusterDescriptorAdapter.doSavepoint(savepoint, mode); - } catch (JobExecutionException e) { - logger.info("doSavepoint failed", e); - throw new RuntimeException(e); - } - - Map stringMap = new HashMap<>(); - stringMap.put("writtenSavepoint", writtenSavepoint); - return stringMap; - } else { - throw new WarnException( - FlinkErrorCodeSummary.NOT_SUPPORT_SAVEPOTION.getErrorCode(), - MessageFormat.format( - FlinkErrorCodeSummary.NOT_SUPPORT_SAVEPOTION.getErrorDesc(), - OnceExecutorManager$.MODULE$ - .getInstance() - .getReportExecutor() - .getClass() - .getSimpleName())); - } - } - - public T getAsThrow(Map parameters, String key) { - Object value = parameters.get(key); - if (value != null) { - try { - return (T) value; - } catch (Exception e) { - throw new IllegalArgumentException("parameter " + key + " is invalid.", e); - } - } else { - throw new IllegalArgumentException("parameter " + key + " is required."); - } - } -} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 5f93b690e6..8115a128cd 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -132,6 +132,9 @@ object FlinkEnvConfiguration { val FLINK_ONCE_APP_STATUS_FETCH_INTERVAL = CommonVars("flink.app.fetch.status.interval", new TimeType("5s")) + val FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL = + CommonVars("flink.app.report.appid.interval", new TimeType("60s")) + val FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX = CommonVars("flink.app.fetch.status.failed.num", 3) val FLINK_REPORTER_ENABLE = CommonVars("linkis.flink.reporter.enable", false) @@ -153,4 +156,18 @@ object FlinkEnvConfiguration { val FLINK_PARAMS_BLANK_PLACEHOLER = CommonVars("linkis.flink.params.placeholder.blank", "\u0001") + val FLINK_MANAGER_MODE_CONFIG_KEY = CommonVars("linkis.flink.manager.mode.on", false) + + val FLINK_MANAGER_LOAD_TASK_MAX = CommonVars("linkis.flink.manager.load.task.max", 50) + + val HADOOP_CONF_DIR = CommonVars("linkis.flink.hadoop.conf.dir", System.getenv("HADOOP_CONF_DIR")) + + val FLINK_MANAGER_CLIENT_MAX_NUM = CommonVars("linkis.flink.client.num.max", 200) + + val FLINK_MANAGER_CLIENT_EXPIRE_MILLS = + CommonVars("linkis.flink.client.expire.mills", 3600 * 1000) + + val FLINK_HANDSHAKE_WAIT_TIME_MILLS = + CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) + } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala new file mode 100644 index 0000000000..e0519e5483 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.config + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.acessible.executor.service.{ + EngineConnConcurrentLockService, + EngineConnTimedLockService, + LockService +} +import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext +import org.apache.linkis.engineconnplugin.flink.util.ManagerUtil + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.annotation.{Bean, Configuration} + +@Configuration +class FlinkSrpingConfiguration extends Logging { + + private val asyncListenerBusContext = + ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus + + @Bean(Array("lockService")) + @ConditionalOnMissingBean + def createLockManager(): LockService = { + + val lockService = + if (ManagerUtil.isManager) { + logger.info("Engine is manager, supports parallelism.") + new EngineConnConcurrentLockService + } else { + logger.info("Engine is not manager, doesn't support parallelism.") + new EngineConnTimedLockService + } + asyncListenerBusContext.addListener(lockService) + FlinkLockerServiceHolder.registerLockService(lockService) + lockService + } + +} + +object FlinkLockerServiceHolder extends Logging { + + private var lockService: LockService = _ + + def registerLockService(service: LockService): Unit = { + Utils.tryAndError { + if (null != service) { + if (null == lockService) { + logger.info(s"Will register lockService : ${service.getClass.getName}") + lockService = service + } else { + logger.warn( + s"Default lockService has been registered to ${lockService.getClass.getName}, will not register : ${service.getClass.getName}" + ) + } + } else { + logger.warn("Cannot register null lockService") + } + } + } + + def getDefaultLockService(): LockService = lockService + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala new file mode 100644 index 0000000000..6b3a0d3562 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.constants + +object FlinkECConstant { + + val FLINK_MANAGER_OPERATION_TYPE_KEY = "operationType" + + val FLINK_OPERATION_BOUNDARY_KEY = "operationBoundary" + + val EC_INSTANCE_KEY = "ecInstance" + + val MSG_KEY = "msg" + + val SNAPHOT_KEY = "snapshot" + + val SAVAPOINT_PATH_KEY = "savepointPath" + + val SAVEPOINT_MODE_KEY = "mode" + + val RESULT_SAVEPOINT_KEY = "writtenSavepoint" +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala index 40bc732e54..eac4368510 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala @@ -52,6 +52,8 @@ class EnvironmentContext( private var deploymentTarget: String = YarnDeploymentTarget.PER_JOB.getName + private var extraParams: util.Map[String, Any] = _ + def this( defaultEnv: Environment, systemConfiguration: Configuration, @@ -63,7 +65,8 @@ class EnvironmentContext( providedLibDirsArray: Array[String], shipDirsArray: Array[String], dependencies: util.List[URL], - flinkExecutionTarget: String + flinkExecutionTarget: String, + extraParams: util.Map[String, Any] ) { this( defaultEnv, @@ -89,6 +92,8 @@ class EnvironmentContext( this.flinkConfig.set(LinkisYarnClusterClientFactory.YARN_CONFIG_DIR, this.yarnConfDir) // set flink dist-jar(设置 flink dist jar) this.flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR, distJarPath) + // other params + this.extraParams = extraParams } } @@ -114,6 +119,13 @@ class EnvironmentContext( def getDependencies: util.List[URL] = dependencies + def setExtraParams(params: util.Map[String, Any]): EnvironmentContext = { + this.extraParams = params + this + } + + def getExtraParams(): util.Map[String, Any] = extraParams + override def equals(o: Any): Boolean = o match { case context: EnvironmentContext => if (this eq context) return true diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala index 8e2da4fbee..659cfcbc27 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala @@ -18,13 +18,27 @@ package org.apache.linkis.engineconnplugin.flink.executor import org.apache.linkis.common.utils.Utils +import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder +import org.apache.linkis.engineconn.executor.service.ManagerService import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext -import org.apache.linkis.engineconnplugin.flink.client.deployment.AbstractApplicationClusterDescriptorAdapter +import org.apache.linkis.engineconnplugin.flink.client.deployment.{ + AbstractApplicationClusterDescriptorAdapter, + YarnApplicationClusterDescriptorAdapter +} +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._ import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext +import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil +import org.apache.linkis.governance.common.conf.GovernanceCommonConf +import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.commons.lang3.StringUtils +import java.util +import java.util.concurrent.{Future, TimeUnit} + import scala.concurrent.duration.Duration class FlinkJarOnceExecutor( @@ -32,6 +46,10 @@ class FlinkJarOnceExecutor( override protected val flinkEngineConnContext: FlinkEngineConnContext ) extends FlinkOnceExecutor[AbstractApplicationClusterDescriptorAdapter] { + private var daemonThread: Future[_] = _ + + private var firstReportAppIdTimestampMills: Long = 0L + override def doSubmit( onceExecutorExecutionContext: OnceExecutorExecutionContext, options: Map[String, String] @@ -48,6 +66,73 @@ class FlinkJarOnceExecutor( Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf) setJobID(clusterDescriptor.getJobId.toHexString) super.waitToRunning() + if (YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams())) { + waitToExit() + } + } + + override def close(): Unit = { + super.close() + if (null != daemonThread) { + daemonThread.cancel(true) + } + } + + override protected def closeYarnApp(): Unit = { + if (YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams())) { + if (getStatus == NodeStatus.Failed) { + logger.info("Will kill yarn app on close with clientType : detach, because status failed.") + super.closeYarnApp() + } else { + logger.info("Skip to kill yarn app on close with clientType : detach.") + } + } else { + logger.info("Will kill yarn app on close with clientType : attach.") + super.closeYarnApp() + } + } + + private def waitToExit(): Unit = { + // upload applicationId to manager and then exit + val thisExecutor = this + if (!isCompleted) { + daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay( + new Runnable { + override def run(): Unit = { + if (!isCompleted) { + Utils.waitUntil(() => StringUtils.isNotBlank(getApplicationId), Duration.apply("10s")) + if (StringUtils.isNotBlank(getApplicationId)) { + Utils.tryAndWarn { + val heartbeatService = ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService() + if (null == heartbeatService) { + logger.error("HeartbeatService not inited.") + return null + } + val heartbeatMsg = heartbeatService.generateHeartBeatMsg(thisExecutor) + ManagerService.getManagerService.heartbeatReport(heartbeatMsg) + logger.info( + s"Succeed to report heatbeatMsg : ${heartbeatMsg.getHeartBeatMsg}, will add handshake." + ) + if (0L >= firstReportAppIdTimestampMills) { + firstReportAppIdTimestampMills = System.currentTimeMillis() + } + if (!StatusOperator.isHandshaked) { + StatusOperator.addHandshake() + } else { + logger.info("Will exit with handshaked.") + trySucceed() + } + } + } + } + } + }, + 1000, + FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong, + TimeUnit.MILLISECONDS + ) + logger.info("waitToExit submited.") + } } } diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala new file mode 100644 index 0000000000..b204c2405a --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.executor + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration +import org.apache.linkis.engineconn.acessible.executor.service.EngineConnConcurrentLockService +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.computation.executor.execute.{ + ComputationExecutor, + ConcurrentComputationExecutor, + EngineExecutionContext +} +import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext +import org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter +import org.apache.linkis.engineconnplugin.flink.config.FlinkLockerServiceHolder +import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext +import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary +import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException +import org.apache.linkis.engineconnplugin.flink.util.ManagerUtil +import org.apache.linkis.manager.common.entity.enumeration.NodeStatus +import org.apache.linkis.protocol.engine.JobProgressInfo +import org.apache.linkis.scheduler.executer.{ + AsynReturnExecuteResponse, + ErrorExecuteResponse, + ExecuteResponse +} + +class FlinkManagerConcurrentExecutor( + val id: Long, + maxRunningNumber: Int, + val flinkEngineConnContext: FlinkEngineConnContext +) extends FlinkOnceExecutor[ClusterDescriptorAdapter] + with FlinkExecutor + with Logging { + + override protected def submit( + onceExecutorExecutionContext: OnceExecutorExecutionContext + ): Unit = { + logger.info("Succeed to init FlinkManagerExecutor.") + } + + override def execute( + onceExecutorExecutionContext: OnceExecutorExecutionContext + ): ExecuteResponse = { + val isManager = ManagerUtil.isManager + val lockService = FlinkLockerServiceHolder.getDefaultLockService() + if ( + isManager && null != lockService && lockService + .isInstanceOf[EngineConnConcurrentLockService] + ) { + val msg = "Succeed to init FlinkManagerExecutor." + logger.info(msg) + new AsynReturnExecuteResponse { + override def notify(rs: ExecuteResponse => Unit): Unit = { + logger.info(s"FlinkManagerExecutor will skip listener : ${rs}") + } + } + } else { + ErrorExecuteResponse( + "FlinkManagerExecutor got default lockService is not instance of EngineConnConcurrentLockService, will shutdown.", + null + ) + } + } + + override def getId: String = id.toString + + override def close(): Unit = { + logger.info(s"FlinkManagerExecutor : ${getId} will close.") + super.close() + } + + def getMaxRunningNumber: Int = maxRunningNumber + + def getFlinkContext(): FlinkEngineConnContext = flinkEngineConnContext + + override def doSubmit( + onceExecutorExecutionContext: OnceExecutorExecutionContext, + options: Map[String, String] + ): Unit = submit(onceExecutorExecutionContext) + + override protected def initOnceExecutorExecutionContext( + onceExecutorExecutionContext: OnceExecutorExecutionContext + ): Unit = {} + + override protected def createOnceExecutorExecutionContext( + engineCreationContext: EngineCreationContext + ): OnceExecutorExecutionContext = new OnceExecutorExecutionContext(null, null) + + override def tryReady(): Boolean = { + // set default status to Unlock + transition(NodeStatus.Unlock) + true + } + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala index f3d15a7e9c..c30e885847 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala @@ -120,9 +120,7 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] if (daemonThread != null) daemonThread.cancel(true) } - override def close(): Unit = { - super.close() - closeDaemon() + protected def closeYarnApp(): Unit = { if (clusterDescriptor != null) { clusterDescriptor.cancelJob() clusterDescriptor.close() @@ -130,6 +128,12 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] flinkEngineConnContext.getExecutionContext.getClusterClientFactory.close() } + override def close(): Unit = { + super.close() + closeDaemon() + closeYarnApp() + } + override protected def waitToRunning(): Unit = { if (!isCompleted) { daemonThread = Utils.defaultScheduler.scheduleAtFixedRate( diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index e6cdcc5da6..3d1b834472 100644 --- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -17,8 +17,11 @@ package org.apache.linkis.engineconnplugin.flink.factory +import org.apache.linkis.common.conf.CommonVars import org.apache.linkis.common.utils.{ClassUtils, Logging} +import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.launch.EngineConnServer import org.apache.linkis.engineconnplugin.flink.client.config.Environment import org.apache.linkis.engineconnplugin.flink.client.config.entries.ExecutionEntry import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext @@ -32,7 +35,8 @@ import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, Fli import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._ import org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException import org.apache.linkis.engineconnplugin.flink.setting.Settings -import org.apache.linkis.engineconnplugin.flink.util.ClassUtil +import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil} +import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration import org.apache.linkis.manager.engineplugin.common.creation.{ ExecutorFactory, @@ -41,6 +45,7 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine._ import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType +import org.apache.linkis.protocol.utils.TaskUtils import org.apache.commons.lang3.StringUtils import org.apache.flink.configuration._ @@ -108,6 +113,19 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",") // Ship directories val shipDirsArray = getShipDirectories(options) + // other params + val flinkClientType = GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue(options) + val otherParams = new util.HashMap[String, Any]() + val isManager = ManagerUtil.isManager + if (isManager) { +// logger.info( +// s"flink manager mode on. Will set ${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.key} to true." +// ) + logger.info( + s"support parallelism : ${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()}" + ) + } + otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, flinkClientType.toLowerCase()) val context = new EnvironmentContext( defaultEnv, new Configuration, @@ -119,7 +137,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging providedLibDirsArray, shipDirsArray, new util.ArrayList[URL], - flinkExecutionTarget + flinkExecutionTarget, + otherParams ) // Step1: environment-level configurations val jobName = options.getOrDefault("flink.app.name", "EngineConn-Flink") @@ -445,8 +464,14 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging ): FlinkEngineConnContext = new FlinkEngineConnContext(environmentContext) - override protected def getDefaultExecutorFactoryClass: Class[_ <: ExecutorFactory] = - classOf[FlinkCodeExecutorFactory] + override protected def getDefaultExecutorFactoryClass: Class[_ <: ExecutorFactory] = { + val options = EngineConnServer.getEngineCreationContext.getOptions + if (FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue(options)) { + classOf[FlinkManagerExecutorFactory] + } else { + classOf[FlinkCodeExecutorFactory] + } + } override protected def getEngineConnType: EngineType = EngineType.FLINK @@ -454,7 +479,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new FlinkSQLExecutorFactory), ClassUtil .getInstance(classOf[FlinkApplicationExecutorFactory], new FlinkApplicationExecutorFactory), - ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new FlinkCodeExecutorFactory) + ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new FlinkCodeExecutorFactory), + ClassUtil.getInstance(classOf[FlinkManagerExecutorFactory], new FlinkManagerExecutorFactory) ) override def getExecutorFactories: Array[ExecutorFactory] = executorFactoryArray diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala new file mode 100644 index 0000000000..6bdf432975 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.factory + +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.common.engineconn.EngineConn +import org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorFactory +import org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor +import org.apache.linkis.engineconn.once.executor.OnceExecutor +import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration +import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext +import org.apache.linkis.engineconnplugin.flink.executor.{ + FlinkCodeOnceExecutor, + FlinkManagerConcurrentExecutor +} +import org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory.setDefaultExecutor +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.RunType._ + +class FlinkManagerExecutorFactory extends OnceExecutorFactory { + + override protected def newExecutor( + id: Int, + engineCreationContext: EngineCreationContext, + engineConn: EngineConn, + labels: Array[Label[_]] + ): OnceExecutor = engineConn.getEngineConnSession match { + case flinkEngineConnContext: FlinkEngineConnContext => + val executor = new FlinkManagerConcurrentExecutor( + id, + FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getValue, + flinkEngineConnContext + ) + setDefaultExecutor(executor) + executor + } + + // just set lots of runType, but now only sql is supported. + override protected def getSupportRunTypes: Array[String] = + Array(JSON.toString) + + override protected def getRunType: RunType = JSON +} + +object FlinkManagerExecutorFactory { + + private var defaultExecutor: FlinkManagerConcurrentExecutor = _ + + def setDefaultExecutor(executor: FlinkManagerConcurrentExecutor): Unit = { + defaultExecutor = executor + } + + def getDefaultExecutor(): FlinkManagerConcurrentExecutor = defaultExecutor + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala new file mode 100644 index 0000000000..78ae653077 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.hook + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor +import org.apache.linkis.engineconn.acessible.executor.hook.OperationHook +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration +import org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory +import org.apache.linkis.manager.common.entity.enumeration.NodeStatus +import org.apache.linkis.manager.common.protocol.engine.{ + EngineOperateRequest, + EngineOperateResponse +} + +import org.springframework.stereotype.Service + +import javax.annotation.PostConstruct + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable + +@Service +class EngineLoadOperationHook extends OperationHook with Logging { + + @PostConstruct + private def init(): Unit = { + OperationHook.registerOperationHook(this) + logger.info(s"${getName()} init success.") + } + + private val taskNum = new AtomicInteger(0) + private val lock = new Object + + override def getName(): String = getClass.getSimpleName + + override def doPreOperation( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit = { + ExecutorManager.getInstance.getReportExecutor match { + case accessibleExecutor: AccessibleExecutor => + accessibleExecutor.updateLastActivityTime() + case _ => + } + if ( + taskNum.incrementAndGet() >= FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue() + ) { + lock.synchronized { + if ( + taskNum + .incrementAndGet() >= FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue() + ) { + FlinkManagerExecutorFactory.getDefaultExecutor() match { + case accessibleExecutor: AccessibleExecutor => + if (NodeStatus.Busy != accessibleExecutor.getStatus) { + accessibleExecutor.transition(NodeStatus.Busy) + logger.warn("The number of tasks exceeds the maximum limit, change status to busy.") + } + case _ => logger.error("FlinkManagerExecutorFactory.getDefaultExecutor() is None.") + } + } + } + } + } + + override def doPostOperation( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit = { + if (taskNum.get() - 1 < FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()) { + lock.synchronized { + if ( + taskNum + .decrementAndGet() < FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue() + ) { + FlinkManagerExecutorFactory.getDefaultExecutor() match { + case accessibleExecutor: AccessibleExecutor => + if (NodeStatus.Busy == accessibleExecutor.getStatus) { + accessibleExecutor.transition(NodeStatus.Unlock) + logger.warn( + "The number of tasks is less than the maximum limit, change status to unlock." + ) + } + case _ => logger.error("FlinkManagerExecutorFactory.getDefaultExecutor() is None.") + } + } + } + } + if (logger.isDebugEnabled()) { + logger.debug(s"taskNum: ${taskNum.get()}") + } + } + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala new file mode 100644 index 0000000000..1a917526b1 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.operator + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconnplugin.flink.constants.FlinkECConstant +import org.apache.linkis.engineconnplugin.flink.operator.clientmanager.FlinkRestClientManager +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException +import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.governance.common.exception.GovernanceErrorException +import org.apache.linkis.manager.common.operator.Operator +import org.apache.linkis.server.toScalaMap + +import org.apache.hadoop.yarn.api.records.{ApplicationId, FinalApplicationStatus} + +import java.util + +import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import scala.collection.mutable + +class KillOperator extends Operator with Logging { + + override def getNames: Array[String] = Array("kill") + + @throws[GovernanceErrorException] + override def apply(params: util.Map[String, Object]): util.Map[String, Object] = { + + val rsMap = new mutable.HashMap[String, String] + val appIdStr = params.getOrElse(ECConstants.YARN_APPID_NAME_KEY, "").asInstanceOf[String] + val snapShot = params.getOrElse(FlinkECConstant.SNAPHOT_KEY, "false").toString.toBoolean + + val appId: ApplicationId = YarnUtil.retrieveApplicationId(appIdStr) + + var isStopped = false + val restClient = + Utils.tryCatch { + FlinkRestClientManager.getFlinkRestClient(appIdStr) + } { case e: Exception => + val yarnClient = YarnUtil.getYarnClient() + val appReport = yarnClient.getApplicationReport(appId) + if (appReport.getFinalApplicationStatus != FinalApplicationStatus.UNDEFINED) { + // Flink cluster is not running anymore + val msg = + s"The application ${appIdStr} doesn't run anymore. It has previously completed with final status: ${appReport.getFinalApplicationStatus.toString}" + logAndException(msg) + isStopped = true + null + } else { + val msg = s"Get client for app ${appIdStr} failed, because : ${e.getMessage}" + throw logAndException(msg) + } + } + if (!isStopped) { + if (snapShot) { + val checkPointPath = + params.getOrElse(FlinkECConstant.SAVAPOINT_PATH_KEY, null).asInstanceOf[String] + val rs = YarnUtil.triggerSavepoint(appIdStr, checkPointPath, restClient) + rsMap.put(FlinkECConstant.MSG_KEY, rs) + } + val jobs = restClient.listJobs().get() + if (null == jobs || jobs.isEmpty) { + val msg = s"App : ${appIdStr} have no jobs, but is not ended." + throw logAndException(msg) + } + val msg = s"Try to kill ${jobs.size()} jobs of app : ${appIdStr}" + jobs.asScala.foreach(job => restClient.cancel(job.getJobId)) + rsMap += (FlinkECConstant.MSG_KEY -> msg) + } + + rsMap.toMap[String, String] + val map = new util.HashMap[String, Object]() + rsMap.foreach(entry => map.put(entry._1, entry._2)) + map + } + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala new file mode 100644 index 0000000000..1497763dec --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.operator + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.common.exception.EngineConnException +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil +import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.governance.common.exception.GovernanceErrorException +import org.apache.linkis.governance.common.exception.engineconn.EngineConnExecutorErrorCode +import org.apache.linkis.manager.common.operator.Operator +import org.apache.linkis.server.{toScalaBuffer, toScalaMap, BDPJettyServerHelper} + +import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicationState} + +import java.util + +import scala.collection.mutable + +class ListOperator extends Operator with Logging { + + private val json = BDPJettyServerHelper.jacksonJson + + override def getNames: Array[String] = Array("list") + + @throws[GovernanceErrorException] + override def apply(params: util.Map[String, Object]): util.Map[String, Object] = { + + val applicationTypeSet = new util.HashSet[String]() + var appStateSet = util.EnumSet.of[YarnApplicationState](YarnApplicationState.RUNNING) + var appName = "" + + Utils.tryCatch { + val appTypeList = params + .getOrElse(ECConstants.YARN_APP_TYPE_LIST_KEY, new util.ArrayList[String]()) + .asInstanceOf[util.List[String]] + appTypeList.foreach(applicationTypeSet.add) + val appStateList = params + .getOrElse(ECConstants.YARN_APP_STATE_LIST_KEY, new util.ArrayList[String]()) + .asInstanceOf[util.List[String]] + val appStateArray = new util.HashSet[YarnApplicationState] + appStateList.foreach(e => appStateArray.add(YarnApplicationState.valueOf(e))) + if (!appStateArray.isEmpty) { + appStateSet = util.EnumSet.copyOf(appStateArray) + } + appName = params.getOrElse(ECConstants.YARN_APP_NAME_KEY, "").asInstanceOf[String] + } { e: Throwable => + val msg = "Invalid params. " + e.getMessage + logger.error(msg, e) + throw new EngineConnException(EngineConnExecutorErrorCode.INVALID_PARAMS, msg) + } + + val yarnClient = YarnUtil.getYarnClient() + val appList = yarnClient.getApplications(applicationTypeSet, appStateSet) + val rsMap = new mutable.HashMap[String, String] + Utils.tryCatch { + val appTypeStr = json.writeValueAsString(applicationTypeSet) + val appStateStr = json.writeValueAsString(appStateSet) + val rsAppList = new util.ArrayList[util.Map[String, String]]() + appList.foreach(report => { + if (report.getName.contains(appName)) { + val tmpMap = new util.HashMap[String, String]() + tmpMap.put(ECConstants.YARN_APP_NAME_KEY, report.getName) + tmpMap.put(ECConstants.YARN_APP_TYPE_KEY, report.getApplicationType) + tmpMap.put(ECConstants.YARN_APPID_NAME_KEY, report.getApplicationId.toString) + tmpMap.put(ECConstants.YARN_APP_URL_KEY, report.getTrackingUrl) + val appStatus = + if (report.getFinalApplicationStatus != FinalApplicationStatus.UNDEFINED) { + report.getFinalApplicationStatus + } else { + report.getYarnApplicationState + } + tmpMap.put( + ECConstants.NODE_STATUS_KEY, + YarnUtil + .convertYarnStateToNodeStatus(report.getApplicationId.toString, appStatus.toString) + .toString + ) + rsAppList.add(tmpMap) + } + }) + val listStr = json.writeValueAsString(rsAppList) + + logger.info( + s"List yarn apps, params : appTypeSet : ${appTypeStr}, appStateSet : ${appStateStr}, list : ${listStr}" + ) + + rsMap += (ECConstants.YARN_APP_RESULT_LIST_KEY -> listStr) + } { case e: Exception => + val msg = "convert listStr failed. Because : " + e.getMessage + logger.error(msg) + throw e + } + + val map = new util.HashMap[String, Object]() + rsMap.foreach(e => map.put(e._1, e._2)) + map + } + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala new file mode 100644 index 0000000000..05f1200bb7 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.operator + +import org.apache.linkis.common.exception.{LinkisException, LinkisRuntimeException} +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.common.exception.EngineConnException +import org.apache.linkis.engineconnplugin.flink.util.{ManagerUtil, YarnUtil} +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException +import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.governance.common.exception.engineconn.EngineConnExecutorErrorCode +import org.apache.linkis.manager.common.entity.enumeration.NodeStatus +import org.apache.linkis.manager.common.operator.Operator +import org.apache.linkis.server.toScalaMap + +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, FinalApplicationStatus} +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException + +import java.util + +import scala.collection.mutable + +class StatusOperator extends Operator with Logging { + + override def getNames: Array[String] = Array("status") + + override def apply(params: util.Map[String, Object]): util.Map[String, Object] = { + + val appIdStr = params.getOrElse(ECConstants.YARN_APPID_NAME_KEY, "").asInstanceOf[String] + + val parts = appIdStr.split("_") + val clusterTimestamp = parts(1).toLong + val sequenceNumber = parts(2).toInt + + // Create an ApplicationId object using newInstance method + val appId = ApplicationId.newInstance(clusterTimestamp, sequenceNumber) + val rsMap = new mutable.HashMap[String, String] + + val yarnClient = YarnUtil.getYarnClient() + var appReport: ApplicationReport = null + Utils.tryCatch { + appReport = yarnClient.getApplicationReport(appId) + if (null == appReport) { + throw logAndException(s"Got null appReport for appid : ${appIdStr}") + } + } { case notExist: ApplicationNotFoundException => + logger.error(s"Application : ${appIdStr} not exists, will set the status to failed.") + val map = new util.HashMap[String, Object]() + map.put(ECConstants.NODE_STATUS_KEY, NodeStatus.Failed.toString) + map.put(ECConstants.YARN_APPID_NAME_KEY, appIdStr) + return map + } + + // Get the application status (YarnApplicationState) + val appStatus = if (appReport.getFinalApplicationStatus != FinalApplicationStatus.UNDEFINED) { + appReport.getFinalApplicationStatus + } else { + appReport.getYarnApplicationState + } + + val nodeStatus: NodeStatus = YarnUtil.convertYarnStateToNodeStatus(appIdStr, appStatus.toString) + + logger.info(s"try to get appid: ${appIdStr}, status ${nodeStatus.toString}.") + rsMap += (ECConstants.NODE_STATUS_KEY -> nodeStatus.toString) + rsMap += (ECConstants.YARN_APPID_NAME_KEY -> appIdStr) + val map = new util.HashMap[String, Object]() + rsMap.foreach(entry => map.put(entry._1, entry._2)) + map + } + +} + +object StatusOperator extends Logging { + + private var handshaked: Boolean = false + + def addHandshake(): Unit = { + handshaked = true + } + + def isHandshaked: Boolean = handshaked + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala new file mode 100644 index 0000000000..ceddc367cb --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.operator + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.engineconn.launch.EngineConnServer +import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager +import org.apache.linkis.engineconnplugin.flink.constants.FlinkECConstant +import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._ +import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException +import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor +import org.apache.linkis.engineconnplugin.flink.operator.clientmanager.FlinkRestClientManager +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException +import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.governance.common.exception.GovernanceErrorException +import org.apache.linkis.manager.common.operator.Operator + +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus + +import java.text.MessageFormat +import java.util + +import scala.collection.mutable + +class TriggerSavepointOperator extends Operator with Logging { + + override def getNames: Array[String] = Array("doSavepoint") + + @throws[GovernanceErrorException] + override def apply(params: util.Map[String, Object]): util.Map[String, Object] = { + val rsMap = new mutable.HashMap[String, String] + + val savepointPath = getAsThrow[String](params, FlinkECConstant.SAVAPOINT_PATH_KEY) + val appIdStr = getAsThrow[String](params, ECConstants.YARN_APPID_NAME_KEY) + val mode = getAsThrow[String](params, FlinkECConstant.SAVEPOINT_MODE_KEY) + + val appId = YarnUtil.retrieveApplicationId(appIdStr) + val yarnClient = YarnUtil.getYarnClient() + val appReport = yarnClient.getApplicationReport(appId) + if (appReport.getFinalApplicationStatus != FinalApplicationStatus.UNDEFINED) { + // Flink cluster is not running anymore + val msg = + s"The application ${appIdStr} doesn't run anymore. It has previously completed with final status: ${appReport.getFinalApplicationStatus.toString}" + throw logAndException(msg) + } + + logger.info(s"try to $mode savepoint with path $savepointPath.") + if ( + YarnUtil.isDetach( + EngineConnServer.getEngineCreationContext.getOptions.asInstanceOf[util.Map[String, Any]] + ) + ) { + logger.info("The flink cluster is detached, use rest api to trigger savepoint.") + val restClient = FlinkRestClientManager.getFlinkRestClient(appIdStr) + val rs = YarnUtil.triggerSavepoint(appIdStr, savepointPath, restClient) + rsMap.put(FlinkECConstant.RESULT_SAVEPOINT_KEY, rs) + } else { + logger.info("The flink cluster is not detached, use flink client to trigger savepoint.") + OnceExecutorManager.getInstance.getReportExecutor match { + case flinkExecutor: FlinkOnceExecutor[_] => + val writtenSavepoint = + flinkExecutor.getClusterDescriptorAdapter.doSavepoint(savepointPath, mode) + rsMap.put(FlinkECConstant.RESULT_SAVEPOINT_KEY, writtenSavepoint) + case executor => + throw new JobExecutionException( + NOT_SUPPORT_SAVEPOTION.getErrorDesc + executor.getClass.getSimpleName + + MessageFormat + .format(NOT_SUPPORT_SAVEPOTION.getErrorDesc, executor.getClass.getSimpleName) + ) + } + } + val map = new util.HashMap[String, Object]() + rsMap.foreach(entry => map.put(entry._1, entry._2)) + map + } + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala new file mode 100644 index 0000000000..d7a14beb44 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.operator.clientmanager + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration +import org.apache.linkis.engineconnplugin.flink.executor.FlinkManagerConcurrentExecutor +import org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil +import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException + +import org.apache.flink.client.program.rest.RestClusterClient +import org.apache.flink.configuration.Configuration +import org.apache.hadoop.yarn.api.records.{ApplicationId, FinalApplicationStatus} + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{ + CacheBuilder, + CacheLoader, + LoadingCache, + RemovalListener, + RemovalNotification +} + +object FlinkRestClientManager extends Logging { + + private lazy val restclientCache + : LoadingCache[String, RestClusterClient[ApplicationId]] = CacheBuilder + .newBuilder() + .maximumSize(FlinkEnvConfiguration.FLINK_MANAGER_CLIENT_MAX_NUM.getValue) + .expireAfterAccess( + FlinkEnvConfiguration.FLINK_MANAGER_CLIENT_EXPIRE_MILLS.getValue, + TimeUnit.MILLISECONDS + ) + .weakKeys() + .removalListener(new RemovalListener[String, RestClusterClient[ApplicationId]]() { + + override def onRemoval( + notification: RemovalNotification[String, RestClusterClient[ApplicationId]] + ): Unit = { + logger.info(s"RestClusterClient of AppId : ${notification.getKey} was removed.") + } + + }) + .build(new CacheLoader[String, RestClusterClient[ApplicationId]]() { + + override def load(appIdStr: String): RestClusterClient[ApplicationId] = { + + val appId: ApplicationId = YarnUtil.retrieveApplicationId(appIdStr) + + val yarnClient = YarnUtil.getYarnClient() + val appReport = yarnClient.getApplicationReport(appId) + + if (appReport.getFinalApplicationStatus != FinalApplicationStatus.UNDEFINED) { + // Flink cluster is not running anymore + val msg = + s"The application ${appIdStr} doesn't run anymore. It has previously completed with final status: ${appReport.getFinalApplicationStatus.toString}" + throw logAndException(msg) + } + + val executor = FlinkManagerExecutorFactory.getDefaultExecutor() + val tmpFlinkConf: Configuration = executor match { + case flinkManagerExecutor: FlinkManagerConcurrentExecutor => + flinkManagerExecutor.getFlinkContext().getEnvironmentContext.getFlinkConfig.clone() + case _ => + val msg = s"Invalid FlinkManagerConcurrentExecutor : ${executor}" + throw logAndException(msg) + } + YarnUtil.setClusterEntrypointInfoToConfig(tmpFlinkConf, appReport) + new RestClusterClient[ApplicationId](tmpFlinkConf, appReport.getApplicationId) + } + + }) + + def getFlinkRestClient(appIdStr: String): RestClusterClient[ApplicationId] = + restclientCache.get(appIdStr) + + def setFlinkRestClient(appIdStr: String, client: RestClusterClient[ApplicationId]): Unit = + restclientCache.put(appIdStr, client) + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala new file mode 100644 index 0000000000..a4dec1cf82 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.util + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.engineconn.launch.EngineConnServer +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration + +import java.util + +object ManagerUtil extends Logging { + + val isManager: Boolean = { + val options = EngineConnServer.getEngineCreationContext.getOptions + FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue(options) + } + +} diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala new file mode 100644 index 0000000000..a1c96619e6 --- /dev/null +++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.util + +import org.apache.linkis.common.exception.ErrorException +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.engineconn.executor.entity.YarnExecutor +import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration +import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException +import org.apache.linkis.governance.common.conf.GovernanceCommonConf +import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.manager.common.entity.enumeration.NodeStatus + +import org.apache.commons.lang3.StringUtils +import org.apache.flink +import org.apache.flink.client.program.rest.RestClusterClient +import org.apache.flink.configuration.{HighAvailabilityOptions, JobManagerOptions, RestOptions} +import org.apache.flink.runtime.client.JobStatusMessage +import org.apache.flink.yarn.configuration.YarnConfigOptions +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records.{ + ApplicationId, + ApplicationReport, + FinalApplicationStatus, + YarnApplicationState +} +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils + +import java.util + +import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import scala.collection.mutable.ArrayBuffer + +object YarnUtil extends Logging { + + val CORE_SITE = "core-site.xml" + val YARN_SITE = "yarn-site.xml" + val HDFS_SITE = "hdfs-site.xml" + val MAPRED_SITE = "mapred-site.xml" + + private var yarnClient: YarnClient = _ + + def getYarnClient(): YarnClient = { + if (null == yarnClient) { + YarnUtil.getClass.synchronized { + if (null == yarnClient) { + yarnClient = createYarnClient() + } + } + } + yarnClient + } + + private def createYarnClient(): YarnClient = { + val yarnClient = YarnClient.createYarnClient() + val hadoopConf = getHadoopConf() + val yarnConfiguration = new YarnConfiguration(hadoopConf) + yarnClient.init(yarnConfiguration) + yarnClient.start() + yarnClient + } + + private def getHadoopConf(): Configuration = { + val conf = new Configuration() + var confRoot = FlinkEnvConfiguration.HADOOP_CONF_DIR.getValue + if (StringUtils.isBlank(confRoot)) { + throw new JobExecutionException("HADOOP_CONF_DIR or linkis.flink.hadoop.conf.dir not set!") + } + confRoot = confRoot + "/" + conf.addResource(confRoot + HDFS_SITE) + conf.addResource(confRoot + CORE_SITE) + conf.addResource(confRoot + MAPRED_SITE) + conf.addResource(confRoot + YARN_SITE) + conf + } + + def setClusterEntrypointInfoToConfig( + flinkConfiguration: flink.configuration.Configuration, + appReport: ApplicationReport + ): Unit = { + if (null == appReport) { + val msg = "Invalid null appReport" + logger.error(msg) + throw new JobExecutionException(msg) + } + + val appId = appReport.getApplicationId + val host = appReport.getHost + val port = appReport.getRpcPort + + logger.info(s"Found Web Interface ${host}:${port} of application '${appId}'.") + + flinkConfiguration.setString(JobManagerOptions.ADDRESS, host) + flinkConfiguration.setInteger(JobManagerOptions.PORT, port) + + flinkConfiguration.setString(RestOptions.ADDRESS, host) + flinkConfiguration.setInteger(RestOptions.PORT, port) + + flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId)) + + if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) { + flinkConfiguration.set(HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId)) + } + } + + def logAndException(msg: String, t: Throwable = null): ErrorException = { + logger.error(msg, t) + new JobExecutionException(msg) + } + + def retrieveApplicationId(appIdStr: String): ApplicationId = { + val parts = appIdStr.split("_") + val clusterTimestamp = parts(1).toLong + val sequenceNumber = parts(2).toInt + // Create an ApplicationId object using newInstance method + val appId = ApplicationId.newInstance(clusterTimestamp, sequenceNumber) + appId + } + + def triggerSavepoint( + appIdStr: String, + checkPointPath: String, + restClient: RestClusterClient[ApplicationId] + ): String = { + val jobs = restClient.listJobs().get() + if (null == jobs || jobs.size() > 1) { + val size = if (null == jobs) { + 0 + } else { + jobs.size() + } + val msg = s"App : ${appIdStr} have ${size} jobs, cannot do snapshot." + throw logAndException(msg) + } + if (StringUtils.isBlank(checkPointPath)) { + val msg = s"App : ${appIdStr} checkpoint path is null, cannot do checkpoint" + throw logAndException(msg) + } else { + val firstJob = jobs.asScala.headOption.getOrElse(null).asInstanceOf[JobStatusMessage] + if (null == firstJob) { + val msg = s"App : ${appIdStr} got no head job, cannot do checkPoint and cancel." + throw new JobExecutionException(msg) + } + val rs = restClient.triggerSavepoint(firstJob.getJobId, checkPointPath).get() + rs + } + } + + def convertYarnStateToNodeStatus(appIdStr: String, appStatus: String): NodeStatus = { + val nodeStatus = appStatus match { + case finalState if (FinalApplicationStatus.values().map(_.toString).contains(finalState)) => + FinalApplicationStatus.valueOf(finalState) match { + case FinalApplicationStatus.KILLED | FinalApplicationStatus.FAILED => + NodeStatus.Failed + case FinalApplicationStatus.SUCCEEDED => + NodeStatus.Success + case _ => + val msg: String = if (null != appStatus) { + s"Application : ${appIdStr} has unknown state : ${appStatus.toString}" + } else { + s"Application : ${appIdStr} has null state" + } + throw new JobExecutionException(msg) + } + case yarnState if (YarnApplicationState.values().map(_.toString).contains(yarnState)) => + YarnApplicationState.valueOf(yarnState) match { + case YarnApplicationState.FINISHED => + val msg: String = "Invalid yarn app state : FINISHED" + throw new JobExecutionException(msg) + case YarnApplicationState.KILLED | YarnApplicationState.FAILED => + NodeStatus.Failed + case _ => + NodeStatus.Running + } + case _ => + val msg: String = if (null != appStatus) { + s"Application : ${appIdStr} has unknown state : ${appStatus.toString}" + } else { + s"Application : ${appIdStr} has null state" + } + throw new JobExecutionException(msg) + } + nodeStatus + } + + def isDetach(params: util.Map[String, Any]): Boolean = { + val managerOn = params.getOrDefault( + FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.key, + FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue + ) + if (null != managerOn && managerOn.toString.toBoolean) { + return true + } + val clientType = params + .getOrDefault( + GovernanceCommonConf.EC_APP_MANAGE_MODE.key, + GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue + ) + .toString + logger.info(s"clientType : ${clientType}") + clientType.toLowerCase() match { + case ECConstants.EC_CLIENT_TYPE_DETACH => + true + case _ => + false + } + } + + def getAppIds: Array[String] = { + val ids = new ArrayBuffer[String] + ExecutorManager.getInstance.getExecutors.foreach(executor => { + executor match { + case yarnExecutor: YarnExecutor => + ids.append(yarnExecutor.getApplicationId) + case _ => + } + }) + if (ids.size > 1) { + logger.error( + "There are more than one yarn application running, please check it. Ids : " + ids + .mkString(",") + ) + } + ids.toArray + } + +} diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala index b637da27e1..abae56eb7a 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala @@ -97,7 +97,7 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w engineCreationContext: EngineCreationContext ): AbstractHiveSession = { // if hive engine support concurrent, return HiveConcurrentSession - if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) { + if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()) { return doCreateHiveConcurrentSession(engineCreationContext.getOptions) } diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala index cbfffc244d..85baceef62 100755 --- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala +++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala @@ -38,7 +38,7 @@ class ShellEngineConnFactory extends ComputationSingleExecutorEngineConnFactory engineCreationContext: EngineCreationContext, engineConn: EngineConn ): LabelExecutor = { - if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) { + if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue) { new ShellEngineConnConcurrentExecutor( id, ShellEngineConnConf.SHELL_ENGINECONN_CONCURRENT_LIMIT