From 3711276e4f073759a742f5c0e4c4a189f1ac3fcc Mon Sep 17 00:00:00 2001 From: William Lo Date: Wed, 19 Jul 2023 15:55:36 -0400 Subject: [PATCH 1/2] Add override flag to force generate a job execution id based on gobblin cluster system time --- .../GobblinClusterConfigurationKeys.java | 4 ++ .../gobblin/cluster/HelixJobsMapping.java | 12 +++-- .../cluster/GobblinHelixJobMappingTest.java | 53 +++++++++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) create mode 100644 gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index ef83ab029a2..111cebe402c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -180,6 +180,10 @@ public class GobblinClusterConfigurationKeys { public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete"; public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false"; + // Job Execution ID for Helix jobs is inferred from Flow Execution IDs, but there are scenarios in earlyStop jobs where + // this behavior needs to be avoided due to concurrent planning and acutal jobs sharing the same execution ID + public static final String USE_GENERATED_JOBEXECUTION_IDS = GOBBLIN_CLUSTER_PREFIX + "job.useGeneratedJobExecutionIds"; + // By default we cancel job by calling helix stop API. In some cases, jobs just hang in STOPPING state and preventing // new job being launched. We provide this config to give an option to cancel jobs by calling Delete API. Directly delete // a Helix workflow should be safe in Gobblin world, as Gobblin job is stateless for Helix since we implement our own state store diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java index 8128aa0aab1..b8fb436e4ce 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java @@ -96,15 +96,17 @@ public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) { } public static String createPlanningJobId (Properties jobPlanningProps) { + long planningJobId = PropertiesUtils.getPropAsBoolean(jobPlanningProps, GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ? + System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()); return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobPlanningProps), - PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis())); + + JobState.getJobNameFromProps(jobPlanningProps), planningJobId); } public static String createActualJobId (Properties jobProps) { - return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobProps), - PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis())); + long actualJobId = PropertiesUtils.getPropAsBoolean(jobProps, GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ? + System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()); + return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX + + JobState.getJobNameFromProps(jobProps), actualJobId); } @Nullable diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java new file mode 100644 index 00000000000..022a9fb8a66 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.Properties; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.ConfigurationKeys; + + +public class GobblinHelixJobMappingTest { + + @Test + void testMapJobNameWithFlowExecutionId() { + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234"); + props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1"); + String planningJobId = HelixJobsMapping.createPlanningJobId(props); + String actualJobId = HelixJobsMapping.createActualJobId(props); + Assert.assertEquals(planningJobId, "job_PlanningJobjob1_1234"); + Assert.assertEquals(actualJobId, "job_ActualJobjob1_1234"); + } + + @Test + void testMapJobNameWithOverride() { + Properties props = new Properties(); + props.setProperty(GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "true"); + props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234"); + props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1"); + String planningJobId = HelixJobsMapping.createPlanningJobId(props); + String actualJobId = HelixJobsMapping.createActualJobId(props); + // The jobID will be the system timestamp instead of the flow execution ID + Assert.assertNotEquals(planningJobId, "job_PlanningJobjob1_1234"); + Assert.assertNotEquals(actualJobId, "job_ActualJobjob1_1234"); + } +} From f73b1ee8221835c416a3b4feca6325d3f97335e0 Mon Sep 17 00:00:00 2001 From: William Lo Date: Thu, 20 Jul 2023 11:57:43 -0400 Subject: [PATCH 2/2] fix typo --- .../apache/gobblin/cluster/GobblinClusterConfigurationKeys.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 111cebe402c..a2e8a4b5654 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -181,7 +181,7 @@ public class GobblinClusterConfigurationKeys { public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false"; // Job Execution ID for Helix jobs is inferred from Flow Execution IDs, but there are scenarios in earlyStop jobs where - // this behavior needs to be avoided due to concurrent planning and acutal jobs sharing the same execution ID + // this behavior needs to be avoided due to concurrent planning and actual jobs sharing the same execution ID public static final String USE_GENERATED_JOBEXECUTION_IDS = GOBBLIN_CLUSTER_PREFIX + "job.useGeneratedJobExecutionIds"; // By default we cancel job by calling helix stop API. In some cases, jobs just hang in STOPPING state and preventing