Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
* upstream/master:
  Fix bug with total count watermark whitelist (apache#3724)
  [GOBBLIN-1858] Fix logs relating to multi-active lease arbiter (apache#3720)
  [GOBBLIN-1838] Introduce total count based completion watermark (apache#3701)
  Correct num of failures (apache#3722)
  [GOBBLIN- 1856] Add flow trigger handler leasing metrics (apache#3717)
  [GOBBLIN-1857] Add override flag to force generate a job execution id based on gobbl… (apache#3719)
  [GOBBLIN-1855] Metadata writer tests do not work in isolation after upgrading to Iceberg 1.2.0 (apache#3718)
  Remove unused ORC writer code (apache#3710)
  [GOBBLIN-1853] Reduce # of Hive calls during schema related updates (apache#3716)
  [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant (apache#3715)
  [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (apache#3712)
  [GOBBLIN-1849] Add Flow Group & Name to Job Config for Job Scheduler (apache#3713)
  [GOBBLIN-1841] Move disabling of current live instances to the GobblinClusterManager startup (apache#3708)
  [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time (apache#3704)
  [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the existing workflow if it is launched (apache#3711)
  [GOBBLIN-1842] Add timers to GobblinMCEWriter (apache#3703)
  [GOBBLIN-1844] Ignore workflows marked for deletion when calculating container count (apache#3709)
  [GOBBLIN-1846] Validate Multi-active Scheduler with Logs (apache#3707)
  [GOBBLIN-1845] Changes parallelstream to stream in DatasetsFinderFilteringDecorator  to avoid classloader issues in spark (apache#3706)
  [GOBBLIN-1843] Utility for detecting non optional unions should convert dataset urn to hive compatible format (apache#3705)
  [GOBBLIN-1837] Implement multi-active, non blocking for leader host (apache#3700)
  [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 (apache#3697)
  Update CHANGELOG to reflect changes in 0.17.0
  Reserving 0.18.0 version for next release
  [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and Graceful Exits for Error-Free Completion (apache#3699)
  [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (apache#3698)
  [GOBBLIN-1825]Hive retention job should fail if deleting underlying files fail (apache#3687)
  [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (apache#3692)
  [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (apache#3693)
  [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (apache#3696)
  • Loading branch information
phet committed Aug 1, 2023
2 parents 90dd1ff + 8198404 commit d885952
Show file tree
Hide file tree
Showing 79 changed files with 3,613 additions and 1,202 deletions.
237 changes: 237 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = "gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = "gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = "eventToRevisitTimestampMillis";
public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = "triggerEventTimestampMillis";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
// Note: linger should be on the order of seconds even though we measure in millis
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;

// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ServiceConfigKeys {
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveScheduler.enabled";
// If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.cluster;

import java.time.Duration;

import org.apache.gobblin.annotation.Alpha;


Expand Down Expand Up @@ -178,6 +180,10 @@ public class GobblinClusterConfigurationKeys {
public static final String CANCEL_RUNNING_JOB_ON_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";

// Job Execution ID for Helix jobs is inferred from Flow Execution IDs, but there are scenarios in earlyStop jobs where
// this behavior needs to be avoided due to concurrent planning and actual jobs sharing the same execution ID
public static final String USE_GENERATED_JOBEXECUTION_IDS = GOBBLIN_CLUSTER_PREFIX + "job.useGeneratedJobExecutionIds";

// By default we cancel job by calling helix stop API. In some cases, jobs just hang in STOPPING state and preventing
// new job being launched. We provide this config to give an option to cancel jobs by calling Delete API. Directly delete
// a Helix workflow should be safe in Gobblin world, as Gobblin job is stateless for Helix since we implement our own state store
Expand Down Expand Up @@ -222,4 +228,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";

public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";

public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;

public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = "helix.job.scheduling.throttle.timeout.seconds";
public static final long DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = Duration.ofMinutes(40).getSeconds();;

}
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,7 @@ public synchronized void start() {
LOGGER.info("Starting the Gobblin Cluster Manager");

this.eventBus.register(this);
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
setupHelix();

if (this.isStandaloneMode) {
// standalone mode starts non-daemon threads later, so need to have this thread to keep process up
Expand Down Expand Up @@ -316,6 +308,18 @@ public void run() {
this.started = true;
}

public synchronized void setupHelix() {
this.multiManager.connect();

// Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
// mode, such as YARN mode
if (!this.isStandaloneMode) {
this.multiManager.cleanUpJobs();
}

configureHelixQuotaBasedTaskScheduling();
}

/**
* Stop the Gobblin Cluster Manager.
*/
Expand Down Expand Up @@ -427,11 +431,18 @@ boolean isHelixManagerConnected() {
*/
@VisibleForTesting
void initializeHelixManager() {
this.multiManager = new GobblinHelixMultiManager(
this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus) ;
this.multiManager = createMultiManager();
this.multiManager.addLeadershipChangeAwareComponent(this);
}

/***
* Can be overriden to inject mock GobblinHelixMultiManager
* @return a new GobblinHelixMultiManager
*/
public GobblinHelixMultiManager createMultiManager() {
return new GobblinHelixMultiManager(this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus);
}

@VisibleForTesting
void sendShutdownRequest() {
Criteria criteria = new Criteria();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,13 @@ public void launchJob(@Nullable JobListener jobListener) throws JobException {
}

// TODO: Better error handling. The current impl swallows exceptions for jobs that were started by this method call.
// One potential way to improve the error handling is to make this error swallowing conifgurable
// One potential way to improve the error handling is to make this error swallowing configurable
} catch (Throwable t) {
errorInJobLaunching = t;
if (isLaunched) {
// Attempts to cancel the helix workflow if an error occurs during launch
cancelJob(jobListener);
}
} finally {
if (isLaunched) {
if (this.runningMap.replace(this.jobContext.getJobName(), true, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -111,14 +115,28 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;

/**
* The throttling timeout prevents helix workflows with the same job name / URI from being submitted
* more than once within the timeout period. This timeout is not reset by deletes / cancels, meaning that
* if you delete a workflow within the timeout period, you cannot reschedule until the timeout period is complete.
* However, if there is an error when launching the job, you can immediately reschedule the flow. <br><br>
*
* NOTE: This throttle timeout period starts when the job launcher thread picks up the runnable. Meaning that the
* time it takes to submit to Helix and start running the flow is also included as part of the timeout period
*/
private final Duration jobSchedulingThrottleTimeout;
private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
private boolean isThrottleEnabled;
private Clock clock;

public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {

MutableJobCatalog jobCatalog,
Clock clock) throws Exception {
super(ConfigUtils.configToProperties(sysConfig), schedulerService);
this.commonJobProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, COMMON_JOB_PROPS));
this.jobHelixManager = jobHelixManager;
Expand Down Expand Up @@ -162,6 +180,27 @@ public GobblinHelixJobScheduler(Config sysConfig,
this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000;

this.jobSchedulingThrottleTimeout = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY), ChronoUnit.SECONDS);

this.jobNameToNextSchedulableTime = new ConcurrentHashMap<>();

this.isThrottleEnabled = ConfigUtils.getBoolean(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY);

this.clock = clock;
}

public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {

this(sysConfig, jobHelixManager, taskDriverHelixManager, eventBus, appWorkDir, metadataTags,
schedulerService, jobCatalog, Clock.systemUTC());
}

@Override
Expand Down Expand Up @@ -206,9 +245,9 @@ protected void startServices() throws Exception {

if (cleanAllDistJobs) {
for (org.apache.gobblin.configuration.State state : this.jobsMapping.getAllStates()) {
String jobUri = state.getId();
LOGGER.info("Delete mapping for job " + jobUri);
this.jobsMapping.deleteMapping(jobUri);
String jobName = state.getId();
LOGGER.info("Delete mapping for job " + jobName);
this.jobsMapping.deleteMapping(jobName);
}
}
}
Expand Down Expand Up @@ -303,36 +342,70 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec
}

@Subscribe
public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
String jobUri = newJobArrival.getJobName();
LOGGER.info("Received new job configuration of job " + jobUri);
public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
String jobName = newJobArrival.getJobName();
LOGGER.info("Received new job configuration of job " + jobName);

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
LOGGER.info("Adding new job is skipped for job {}. Current time is {} and the next schedulable time would be {}",
jobName,
clock.instant(),
nextSchedulableTime
);
return;
}

if (isThrottleEnabled) {
nextSchedulableTime = clock.instant().plus(jobSchedulingThrottleTimeout);
jobNameToNextSchedulableTime.put(jobName, nextSchedulableTime);
}

try {
Properties jobProps = new Properties();
jobProps.putAll(this.commonJobProperties);
jobProps.putAll(newJobArrival.getJobConfig());

// set uri so that we can delete this job later
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobName);

this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);

GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime)
: new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics));
LOGGER.info("Scheduling job " + jobName);
scheduleJob(jobProps, listener);
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics)));
LOGGER.info("No job schedule found, so running job " + jobName);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener));
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
LOGGER.error("Failed to schedule or run job {} . Reset the next scheduable time to {}",
jobName,
Instant.EPOCH,
je);
if (isThrottleEnabled) {
jobNameToNextSchedulableTime.put(jobName, Instant.EPOCH);
}
}
}

@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
LOGGER.info("Replanning is skipped for job {}. Current time is {} and the next schedulable time would be {}",
jobName,
clock.instant(),
nextSchedulableTime
);
return;
}

try {
handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
Expand All @@ -359,8 +432,17 @@ private void waitForJobCompletion(String jobName) {
}
}

/***
* Deleting a workflow with throttling enabled means that the next
* schedulable time for the workflow will remain unchanged.
* Note: In such case, it is required to wait until the throttle
* timeout period elapses before the workflow can be rescheduled.
*
* @param deleteJobArrival
* @throws InterruptedException
*/
@Subscribe
public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
LOGGER.info("Received delete for job configuration of job " + deleteJobArrival.getJobName());
try {
unscheduleJob(deleteJobArrival.getJobName());
Expand All @@ -373,8 +455,8 @@ public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobAr
@Subscribe
public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobArrival)
throws InterruptedException {
String jobUri = cancelJobArrival.getJoburi();
LOGGER.info("Received cancel for job configuration of job " + jobUri);
String jobName = cancelJobArrival.getJoburi();
LOGGER.info("Received cancel for job configuration of job " + jobName);
Optional<String> distributedJobMode;
Optional<String> planningJob = Optional.empty();
Optional<String> actualJob = Optional.empty();
Expand All @@ -384,14 +466,14 @@ public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobAr
this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();

try {
distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
distributedJobMode = this.jobsMapping.getDistributedJobMode(jobName);
if (distributedJobMode.isPresent() && Boolean.parseBoolean(distributedJobMode.get())) {
planningJob = this.jobsMapping.getPlanningJobId(jobUri);
planningJob = this.jobsMapping.getPlanningJobId(jobName);
} else {
actualJob = this.jobsMapping.getActualJobId(jobUri);
actualJob = this.jobsMapping.getActualJobId(jobName);
}
} catch (IOException e) {
LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
LOGGER.warn("jobsMapping could not be retrieved for job {}", jobName);
return;
}

Expand Down Expand Up @@ -466,7 +548,7 @@ public void run() {
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
} catch (JobException je) {
LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
LOGGER.error("Failed to schedule or run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class GobblinHelixTask implements Task {
private SingleTask task;
private String helixTaskId;
private EventBus eventBus;
private boolean isCanceled;

public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
TaskCallbackContext taskCallbackContext,
Expand Down Expand Up @@ -161,12 +162,20 @@ private void getInfoFromTaskConfig() {
@Override
public TaskResult run() {
this.taskMetrics.helixTaskTotalRunning.incrementAndGet();
this.isCanceled = false;
long startTime = System.currentTimeMillis();
log.info("Actual task {} started. [{} {}]", this.taskId, this.applicationName, this.instanceName);
try (Closer closer = Closer.create()) {
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName));
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey));
this.task.run();
// Since we enable gracefully cancel, when task get cancelled, we might not see any exception,
// so we check the isCanceled flag to make sure we return the correct task status
if (this.isCanceled) {
log.error("Actual task {} canceled.", this.taskId);
this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
return new TaskResult(TaskResult.Status.CANCELED, "");
}
log.info("Actual task {} completed.", this.taskId);
this.taskMetrics.helixTaskTotalCompleted.incrementAndGet();
return new TaskResult(TaskResult.Status.COMPLETED, "");
Expand Down Expand Up @@ -219,6 +228,7 @@ public void cancel() {
log.info("Gobblin helix task cancellation invoked for jobId {}.", jobId);
if (this.task != null ) {
try {
this.isCanceled = true;
this.task.cancel();
log.info("Gobblin helix task cancellation completed for jobId {}.", jobId);
} catch (Throwable t) {
Expand Down
Loading

0 comments on commit d885952

Please sign in to comment.