Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1872] Modify logs to improve multi-active debugging #3735

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ private void initializeConstantsTable() throws IOException {
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
eventTimeMillis);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public HighLevelConsumer(String topic, Config config, int numThreads) {

protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
String kafkaConsumerClientClass = config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
log.info("Creating consumer client of class {}", kafkaConsumerClientClass, config);
log.info("Creating consumer client of class {} with config {}", kafkaConsumerClientClass, config);

try {
Class clientFactoryClass = Class.forName(kafkaConsumerClientClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
eventTimeMillis);
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ public static boolean isWithinRange(String cronExpression, int maxNumDaysToSched
cron = new CronExpression(cronExpression);
cron.setTimeZone(TimeZone.getTimeZone("UTC"));
Date nextValidTimeAfter = cron.getNextValidTimeAfter(new Date());
if (nextValidTimeAfter == null) {
log.warn("Calculation issue for next valid time for expression: {}. Will default to true for within range",
cronExpression);
return true;
}
cal.setTime(nextValidTimeAfter);
long diff = cal.getTimeInMillis() - System.currentTimeMillis();
return (int) Math.round(diff / numMillisInADay) < maxNumDaysToScheduleWithin;
Expand Down
Loading