Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1846] Validate Multi-active Scheduler with Logs #3707

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void scheduleJob(Properties jobProps, JobListener jobListener, Map<String
// Schedule the Quartz job with a trigger built from the job configuration
Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
this.scheduler.getScheduler().scheduleJob(job, trigger);
LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), trigger.getNextFireTime()));
scheduleJobLogUtil(job, trigger);
} catch (SchedulerException se) {
LOG.error("Failed to schedule job " + jobName, se);
throw new JobException("Failed to schedule job " + jobName, se);
Expand All @@ -407,6 +407,10 @@ public void scheduleJob(Properties jobProps, JobListener jobListener, Map<String
this.scheduledJobs.put(jobName, job.getKey());
}

public void scheduleJobLogUtil(JobDetail job, Trigger trigger) {
umustafi marked this conversation as resolved.
Show resolved Hide resolved
LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), trigger.getNextFireTime()));
}

/**
* Unschedule and delete a job.
*
Expand Down Expand Up @@ -606,13 +610,6 @@ public void executeImpl(JobExecutionContext context)
JobScheduler jobScheduler = (JobScheduler) dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
// Obtain trigger timestamp from trigger to pass to jobProps
Trigger trigger = context.getTrigger();
// THIS current event has already fired if this method is called, so it now exists in <previousFireTime>
long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
String.valueOf(triggerTimestampMillis));

try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import java.util.TimeZone;

import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,6 +99,7 @@
*/
@Alpha
@Singleton
@Slf4j
public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener {

// Scheduler related configuration
Expand Down Expand Up @@ -442,6 +446,14 @@ public synchronized void scheduleJob(Properties jobProps, JobListener jobListene
}
}

@Override
public void scheduleJobLogUtil(JobDetail job, Trigger trigger) {
Properties jobProps = (Properties) job.getJobDataMap().get(JobScheduler.JOB_SCHEDULER_KEY);
log.info("Scheduler trigger tracing: [flowName: {} flowGroup: {}] - nextTriggerTime: {} - Job newly scheduled",
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, ""),
umustafi marked this conversation as resolved.
Show resolved Hide resolved
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, ""), trigger.getNextFireTime());
}

@Override
public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
try {
Expand Down Expand Up @@ -576,6 +588,15 @@ private void unscheduleSpec(URI specURI, String specVersion) throws JobException
this.scheduledFlowSpecs.remove(specURI.toString());
this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
try {
FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
Properties properties = spec.getConfigAsProperties();
_log.info("Scheduler trigger tracing: [flowName: {} flowGroup: {}] - Unscheduled Spec",
properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties.getProperty(ConfigurationKeys.JOB_GROUP_KEY));
} catch (SpecNotFoundException e) {
_log.warn("Unable to retrieve spec for URI {}", specURI);
}
} else {
throw new JobException(String.format(
"Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
Expand Down Expand Up @@ -666,13 +687,24 @@ public static class GobblinServiceJob extends BaseGobblinJob implements Interrup

@Override
public void executeImpl(JobExecutionContext context) throws JobExecutionException {
_log.info("Starting FlowSpec " + context.getJobDetail().getKey());
JobDetail jobDetail = context.getJobDetail();
_log.info("Starting FlowSpec " + jobDetail.getKey());

JobDataMap dataMap = context.getJobDetail().getJobDataMap();
JobDataMap dataMap = jobDetail.getJobDataMap();
JobScheduler jobScheduler = (JobScheduler) dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);

// Obtain trigger timestamp from trigger to pass to jobProps
Trigger trigger = context.getTrigger();
// THIS current event has already fired if this method is called, so it now exists in <previousFireTime>
long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
String.valueOf(triggerTimestampMillis));
_log.info("Scheduler trigger tracing: [flowName: {} flowGroup: {}] - triggerTime: {} nextTriggerTime: {} - "
+ "Job triggered by scheduler",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY),
triggerTimestampMillis, trigger.getNextFireTime().getTime());
umustafi marked this conversation as resolved.
Show resolved Hide resolved
try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
Expand Down