Skip to content

Commit

Permalink
Logs to improve multi-active debugging (apache#3735)
Browse files Browse the repository at this point in the history
Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
2 people authored and phet committed Aug 15, 2023
1 parent e35f8c0 commit ae11e3c
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ private void initializeConstantsTable() throws IOException {
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
eventTimeMillis);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public HighLevelConsumer(String topic, Config config, int numThreads) {

protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
String kafkaConsumerClientClass = config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
log.info("Creating consumer client of class {}", kafkaConsumerClientClass, config);
log.info("Creating consumer client of class {} with config {}", kafkaConsumerClientClass, config);

try {
Class clientFactoryClass = Class.forName(kafkaConsumerClientClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
eventTimeMillis);
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ public static boolean isWithinRange(String cronExpression, int maxNumDaysToSched
cron = new CronExpression(cronExpression);
cron.setTimeZone(TimeZone.getTimeZone("UTC"));
Date nextValidTimeAfter = cron.getNextValidTimeAfter(new Date());
if (nextValidTimeAfter == null) {
log.warn("Calculation issue for next valid time for expression: {}. Will default to true for within range",
cronExpression);
return true;
}
cal.setTime(nextValidTimeAfter);
long diff = cal.getTimeInMillis() - System.currentTimeMillis();
return (int) Math.round(diff / numMillisInADay) < maxNumDaysToScheduleWithin;
Expand Down

0 comments on commit ae11e3c

Please sign in to comment.