From bd1465113a6b1139e32e9dcce44b5b3297eb8530 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Wed, 9 Aug 2023 15:42:16 -0700 Subject: [PATCH] Logs to improve multi-active debugging --- .../gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java | 2 ++ .../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 2 +- .../service/modules/orchestration/FlowTriggerHandler.java | 2 -- .../modules/scheduler/GobblinServiceJobScheduler.java | 5 +++++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 2cdcf71ce1..cce2c35bb3 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -186,6 +186,8 @@ private void initializeConstantsTable() throws IOException { @Override public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException { + log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction, + eventTimeMillis); // Check table for an existing entry for this flow action and event time Optional getResult = withPreparedStatement(thisTableGetInfoStatement, getInfoStatement -> { diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java index 0e68ea9aae..5e8daaa26a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java @@ -139,7 +139,7 @@ public HighLevelConsumer(String topic, Config config, int numThreads) { protected GobblinKafkaConsumerClient createConsumerClient(Config config) { String kafkaConsumerClientClass = config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY); - log.info("Creating consumer client of class {}", kafkaConsumerClientClass, config); + log.info("Creating consumer client of class {} with config {}", kafkaConsumerClientClass, config); try { Class clientFactoryClass = Class.forName(kafkaConsumerClientClass); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index 410350b885..90379e730f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -104,8 +104,6 @@ public FlowTriggerHandler(Config config, Optional lease public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException { if (multiActiveLeaseArbiter.isPresent()) { - log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction, - eventTimeMillis); MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis); if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index f71093f528..89fde16949 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -298,6 +298,11 @@ public static boolean isWithinRange(String cronExpression, int maxNumDaysToSched cron = new CronExpression(cronExpression); cron.setTimeZone(TimeZone.getTimeZone("UTC")); Date nextValidTimeAfter = cron.getNextValidTimeAfter(new Date()); + if (nextValidTimeAfter == null) { + log.warn("Calculation issue for next valid time for expression: {}. Will default to true for within range", + cronExpression); + return true; + } cal.setTime(nextValidTimeAfter); long diff = cal.getTimeInMillis() - System.currentTimeMillis(); return (int) Math.round(diff / numMillisInADay) < maxNumDaysToScheduleWithin;