From 0a314c5a8f63acb2059a2e8799643ca5477887eb Mon Sep 17 00:00:00 2001 From: benjobs Date: Thu, 26 Oct 2023 12:23:17 +0800 Subject: [PATCH 1/7] [Improve] custom-code job support dependencies --- .../console/core/entity/Application.java | 17 ++------ .../service/impl/AppBuildPipeServiceImpl.java | 41 ++++++++---------- .../service/impl/ApplicationServiceImpl.java | 22 +++++++++- .../src/main/resources/db/data-h2.sql | 2 +- .../src/main/resources/db/schema-h2.sql | 1 + .../mapper/core/ApplicationMapper.xml | 1 + .../Application/src/AppDarkModeToggle.vue | 4 +- .../ContextMenu/src/ContextMenu.vue | 4 +- .../src/components/Form/src/BasicForm.vue | 2 +- .../src/components/Page/src/PageFooter.vue | 4 +- .../src/hooks/web/useLockPage.ts | 9 ++-- .../src/utils/props.ts | 2 +- .../src/views/flink/app/Add.vue | 25 +++++------ .../src/views/flink/app/EditFlink.vue | 30 ++++++++++++- .../src/views/flink/app/EditStreamPark.vue | 2 +- .../flink/app/hooks/useCreateAndEditSchema.ts | 16 +++++-- .../views/flink/app/hooks/useCreateSchema.ts | 12 +----- .../flink/app/hooks/useEditFlinkSchema.ts | 8 ++++ .../views/flink/app/hooks/useFlinkRender.tsx | 2 +- .../flink/packer/maven/MavenTool.scala | 43 +++++++++++++------ 20 files changed, 149 insertions(+), 98 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 81f4ccc4cc..3e579c4edd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -220,7 +220,7 @@ public class Application implements Serializable { /** running job */ private transient JobsOverview.Task overview; - private transient String dependency; + private String dependency; private transient Long sqlId; private transient String flinkSql; @@ -390,15 +390,6 @@ public boolean cpFailedTrigger() { && this.cpFailureAction != null; } - public boolean eqFlinkJob(Application other) { - if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) { - if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) { - return this.getDependencyObject().eq(other.getDependencyObject()); - } - } - return false; - } - /** Local compilation and packaging working directory */ @JsonIgnore public String getDistHome() { @@ -752,11 +743,11 @@ public int hashCode() { @Override public String toString() { - return groupId + ":" + artifactId + ":" + version + getClassifier(":"); + return groupId + ":" + artifactId + ":" + version + getClassifier(); } - private String getClassifier(String joiner) { - return StringUtils.isEmpty(classifier) ? "" : joiner + classifier; + private String getClassifier() { + return StringUtils.isEmpty(classifier) ? "" : ":" + classifier; } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 6f8890bd23..8dc14141dc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -184,32 +184,24 @@ public void onStart(PipeSnapshot snapshot) { if (app.isCustomCodeJob()) { // customCode upload jar to appHome... - String appHome = app.getAppHome(); FsOperator fsOperator = app.getFsOperator(); - fsOperator.delete(appHome); - if (app.isUploadJob()) { + if (app.isCICDJob()) { + String appHome = app.getAppHome(); + fsOperator.mkCleanDirs(appHome); + fsOperator.upload(app.getDistHome(), appHome); + } else { File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); // upload jar copy to appHome String uploadJar = appUploads.concat("/").concat(app.getJar()); checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads); - switch (app.getApplicationType()) { - case STREAMPARK_FLINK: - fsOperator.mkdirs(app.getAppLib()); - fsOperator.copy(uploadJar, app.getAppLib(), false, true); - break; - case APACHE_FLINK: - fsOperator.mkdirs(appHome); - fsOperator.copy(uploadJar, appHome, false, true); - break; - default: - throw new IllegalArgumentException( - "[StreamPark] unsupported ApplicationType of custom code: " - + app.getApplicationType()); + if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) { + fsOperator.mkdirs(app.getAppLib()); + fsOperator.copy(uploadJar, app.getAppLib(), false, true); } - } else { - fsOperator.upload(app.getDistHome(), appHome); } - } else { + } + + if (app.isFlinkSqlJob() || app.isUploadJob()) { if (!app.getDependencyObject().getJar().isEmpty()) { String localUploads = Workspace.local().APP_UPLOADS(); // copy jar to local upload dir @@ -335,7 +327,8 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId()); String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app); ExecutionMode executionMode = app.getExecutionModeEnum(); - String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS(); + String mainClass = + app.isCustomCodeJob() ? app.getMainClass() : ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS(); switch (executionMode) { case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); @@ -358,13 +351,17 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { case YARN_PER_JOB: case YARN_SESSION: case REMOTE: + boolean skipBuild = app.isCustomCodeJob(); + if (skipBuild && app.isUploadJob()) { + skipBuild = app.getDependencyObject().isEmpty(); + } FlinkRemotePerJobBuildRequest buildRequest = new FlinkRemotePerJobBuildRequest( app.getJobName(), app.getLocalAppHome(), mainClass, flinkUserJar, - app.isCustomCodeJob(), + skipBuild, app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), @@ -424,7 +421,7 @@ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) { case STREAMPARK_FLINK: return String.format("%s/%s", app.getAppLib(), app.getModule().concat(".jar")); case APACHE_FLINK: - return String.format("%s/%s", app.getAppHome(), app.getJar()); + return String.format("%s/%s", WebUtils.getAppTempDir(), app.getJar()); default: throw new IllegalArgumentException( "[StreamPark] unsupported ApplicationType of custom code: " diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 2242b897e2..4dc07ebaba 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -703,6 +703,7 @@ else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode()) public boolean create(Application appParam) { ApiAlertException.throwIfNull( appParam.getTeamId(), "The teamId can't be null. Create application failed."); + appParam.setBuild(true); appParam.setUserId(commonService.getUserId()); appParam.setState(FlinkAppState.ADDED.getValue()); appParam.setRelease(ReleaseState.NEED_RELEASE.get()); @@ -736,6 +737,17 @@ public boolean create(Application appParam) { } } + @Override + public boolean save(Application entity) { + String dependency = entity.getDependency(); + if (entity.isFlinkSqlJob()) { + entity.setDependency(null); + } + boolean flag = super.save(entity); + entity.setDependency(dependency); + return flag; + } + private boolean existsByJobName(String jobName) { return this.baseMapper.existsByJobName(jobName); } @@ -837,7 +849,15 @@ public boolean update(Application appParam) { application.setRelease(ReleaseState.NEED_RELEASE.get()); if (application.isUploadJob()) { - if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) { + Application.Dependency thisDependency = + Application.Dependency.toDependency(appParam.getDependency()); + Application.Dependency targetDependency = + Application.Dependency.toDependency(application.getDependency()); + + boolean depDifference = !thisDependency.eq(targetDependency); + if (depDifference) { + application.setBuild(true); + } else if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) { application.setBuild(true); } else { File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar()); diff --git a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql index 77494579a0..7a450f1292 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql @@ -25,7 +25,7 @@ insert into `t_team` values (100001, 'test', 'The test team', now(), now()); -- ---------------------------- -- Records of t_flink_app -- ---------------------------- -insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test'); +insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test'); -- ---------------------------- -- Records of t_flink_effective diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index 3f7025a83b..d06f907e07 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -46,6 +46,7 @@ create table if not exists `t_flink_app` ( `jar` varchar(255) default null, `jar_check_sum` bigint default null, `main_class` varchar(255) default null, + `dependency` text , `args` text, `options` text, `hot_params` text , diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml index a6c97121cb..9096158b55 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -40,6 +40,7 @@ + diff --git a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue index abf6c47ea6..40b5c4964b 100644 --- a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue +++ b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue @@ -79,9 +79,7 @@ height: 18px; background-color: #fff; border-radius: 50%; - transition: - transform 0.5s, - background-color 0.5s; + transition: transform 0.5s, background-color 0.5s; will-change: transform; } diff --git a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue index 61cc99b7ff..122b6e711d 100644 --- a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue +++ b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue @@ -195,9 +195,7 @@ background-color: @component-background; border: 1px solid rgb(0 0 0 / 8%); border-radius: 0.25rem; - box-shadow: - 0 2px 2px 0 rgb(0 0 0 / 14%), - 0 3px 1px -2px rgb(0 0 0 / 10%), + box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%), 0 1px 5px 0 rgb(0 0 0 / 6%); background-clip: padding-box; user-select: none; diff --git a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue index ab4c106383..0169b4c868 100644 --- a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue +++ b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue @@ -129,7 +129,7 @@ }); const getBindValue = computed( - () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable, + () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable), ); const getSchema = computed((): FormSchema[] => { diff --git a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue index c499ccf65c..1533026465 100644 --- a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue +++ b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue @@ -55,9 +55,7 @@ line-height: 44px; background-color: @component-background; border-top: 1px solid @border-color-base; - box-shadow: - 0 -6px 16px -8px rgb(0 0 0 / 8%), - 0 -9px 28px 0 rgb(0 0 0 / 5%), + box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 5%), 0 -12px 48px 16px rgb(0 0 0 / 3%); transition: width 0.2s; diff --git a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts index 47aa6eb5f0..de03808115 100644 --- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts +++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts @@ -48,12 +48,9 @@ export function useLockPage() { } clear(); - timeId = setTimeout( - () => { - lockPage(); - }, - lockTime * 60 * 1000, - ); + timeId = setTimeout(() => { + lockPage(); + }, lockTime * 60 * 1000); } function lockPage(): void { diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts b/streampark-console/streampark-console-webapp/src/utils/props.ts index 5828c8cf79..92fe2810b1 100644 --- a/streampark-console/streampark-console-webapp/src/utils/props.ts +++ b/streampark-console/streampark-console-webapp/src/utils/props.ts @@ -191,7 +191,7 @@ export const buildProps = < : never; }; -export const definePropType = (val: any) => ({ [wrapperKey]: val }) as PropWrapper; +export const definePropType = (val: any) => ({ [wrapperKey]: val } as PropWrapper); export const keyOf = (arr: T) => Object.keys(arr) as Array; export const mutable = >(val: T) => diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue index 541503f693..071231e0ca 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue @@ -185,7 +185,7 @@ // common params... const resourceFrom = values.resourceFrom; if (resourceFrom) { - if (resourceFrom === 'csv') { + if (resourceFrom === 'cvs') { params['resourceFrom'] = ResourceFromEnum.CICD; //streampark flink if (values.appType == AppTypeEnum.STREAMPARK_FLINK) { @@ -210,15 +210,15 @@ appType: AppTypeEnum.APACHE_FLINK, jar: unref(uploadJar), mainClass: values.mainClass, + dependency: await getDependency(), }); handleCreateApp(params); } } } - /* flink sql mode */ - async function handleSubmitSQL(values: Recordable) { + async function getDependency() { // Trigger a pom confirmation operation. - await unref(dependencyRef)?.handleApplyPom(); + unref(dependencyRef)?.handleApplyPom(); // common params... const dependency: { pom?: string; jar?: string } = {}; const dependencyRecords = unref(dependencyRef)?.dependencyRecords; @@ -233,14 +233,18 @@ jar: unref(uploadJars), }); } - + return dependency.pom === undefined && dependency.jar === undefined + ? null + : JSON.stringify(dependency); + } + /* flink sql mode */ + async function handleSubmitSQL(values: Recordable) { let config = values.configOverride; - if (config != null && config !== undefined && config.trim() != '') { + if (config != null && config.trim() != '') { config = encryptByBase64(config); } else { config = null; } - handleCluster(values); const params = { jobType: JobTypeEnum.SQL, @@ -248,10 +252,7 @@ appType: AppTypeEnum.STREAMPARK_FLINK, config, format: values.isSetConfig ? 1 : null, - dependency: - dependency.pom === undefined && dependency.jar === undefined - ? null - : JSON.stringify(dependency), + dependency: await getDependency(), }; handleSubmitParams(params, values, k8sTemplate); handleCreateApp(params); @@ -285,7 +286,7 @@ const param = {}; for (const k in params) { const v = params[k]; - if (v != null && v !== undefined) { + if (v != null) { param[k] = v; } } diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue index 0213540204..265c66e52f 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue @@ -39,6 +39,7 @@ import VariableReview from './components/VariableReview.vue'; import { useDrawer } from '/@/components/Drawer'; import { ExecModeEnum, ResourceFromEnum } from '/@/enums/flinkEnum'; + import Dependency from '/@/views/flink/app/components/Dependency.vue'; const route = useRoute(); const { t } = useI18n(); @@ -52,6 +53,7 @@ const uploadJar = ref(''); const programArgRef = ref(); const podTemplateRef = ref(); + const dependencyRef = ref(); const k8sTemplate = reactive({ podTemplate: '', @@ -116,6 +118,7 @@ setFieldsValue(defaultParams); app.args && programArgRef.value?.setContent(app.args); setTimeout(() => { + unref(dependencyRef)?.setDefaultValue(JSON.parse(app.dependency || '{}')); unref(podTemplateRef)?.handleChoicePodTemplate('ptVisual', app.k8sPodTemplate); unref(podTemplateRef)?.handleChoicePodTemplate('jmPtVisual', app.k8sJmPodTemplate); unref(podTemplateRef)?.handleChoicePodTemplate('tmPtVisual', app.k8sTmPodTemplate); @@ -142,15 +145,34 @@ /* Handling update parameters */ function handleAppUpdate(values: Recordable) { + // Trigger a pom confirmation operation. + unref(dependencyRef)?.handleApplyPom(); + // common params... + const dependency: { pom?: string; jar?: string } = {}; + const dependencyRecords = unref(dependencyRef)?.dependencyRecords; + const uploadJars = unref(dependencyRef)?.uploadJars; + if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) { + Object.assign(dependency, { + pom: unref(dependencyRecords), + }); + } + if (uploadJars && unref(uploadJars).length > 0) { + Object.assign(dependency, { + jar: unref(uploadJars), + }); + } submitLoading.value = true; try { const params = { id: app.id, jar: values.jar, mainClass: values.mainClass, + dependency: + dependency.pom === undefined && dependency.jar === undefined + ? null + : JSON.stringify(dependency), }; handleSubmitParams(params, values, k8sTemplate); - handleUpdateApp(params); } catch (error) { submitLoading.value = false; @@ -212,13 +234,17 @@ + +