Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;


/**
Expand Down Expand Up @@ -100,12 +103,14 @@ public void setupMockMonitor() {

@BeforeClass
public void setUp() throws Exception {
doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any(), anyBoolean());

doReturn(0).when(dagActionReminderScheduler).unscheduleRemindersForDagAction(any(), anyBoolean());
}

/**
* Tests process message with a DELETE type message.
* Tests that a DELETE for a deadline-type DagAction clears both deadline-group and retry-group reminders for that
* DagAction on the local Quartz scheduler. Cross-host clearing follows from the change-monitor's broadcast consumer
* group semantics (see DagActionStoreChangeMonitor's UUID-suffixed group id) — every GaaS instance independently
* receives the DELETE and runs the same clear against its in-memory RAMJobStore.
*/
@Test
public void testProcessMessageWithDelete() throws SchedulerException {
Expand All @@ -114,12 +119,10 @@ public void testProcessMessageWithDelete() throws SchedulerException {
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, Long.parseLong(FLOW_EXECUTION_ID), JOB_NAME,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
/* TODO: skip deadline removal for now and let them fire
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(), times(1))
.unscheduleReminderJob(eq(dagAction), eq(true));
.unscheduleRemindersForDagAction(eq(dagAction), eq(true));
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(), times(1))
.unscheduleReminderJob(eq(dagAction), eq(false));
*/
.unscheduleRemindersForDagAction(eq(dagAction), eq(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;

import org.quartz.Job;
Expand All @@ -34,6 +37,7 @@
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;

Expand Down Expand Up @@ -108,6 +112,49 @@ public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, boolea
}
}

/**
* Clears all reminders whose job-key matches the {@link DagActionStore.DagAction} (any eventTimeMillis) within the
* given reminder group (deadline or retry). Used by the DagActionStore DELETE handler, which receives only the
* {@link DagActionStore.DagAction} fields and does not have access to the {@code eventTimeMillis} that was baked into
* the originally-scheduled key.
*
* @return number of reminders cleared from the local Quartz scheduler
*/
public int unscheduleRemindersForDagAction(DagActionStore.DagAction dagAction, boolean isDeadlineReminder)
throws SchedulerException {
String group = isDeadlineReminder ? DeadlineReminderKeyGroup : RetryReminderKeyGroup;
String keyNamePrefix = createDagActionKeyNamePrefix(dagAction);
Set<JobKey> jobKeysInGroup = quartzScheduler.getJobKeys(GroupMatcher.jobGroupEquals(group));
List<JobKey> toDelete = new ArrayList<>();
for (JobKey k : jobKeysInGroup) {
if (k.getName().startsWith(keyNamePrefix)) {
toDelete.add(k);
}
}
if (toDelete.isEmpty()) {
log.debug("No {} reminders to clear for {}.", group, dagAction);
return 0;
}
log.info("Clearing {} {} reminders for {}.", toDelete.size(), group, dagAction);
quartzScheduler.deleteJobs(toDelete);
return toDelete.size();
}

/**
* Returns the job-key-name prefix shared by every reminder scheduled for the given {@link DagActionStore.DagAction},
* regardless of {@code eventTimeMillis}. Mirrors the first five segments of {@link #createDagActionReminderKey}
* with a trailing separator so prefix matches cannot span unrelated keys.
*/
@VisibleForTesting
static String createDagActionKeyNamePrefix(DagActionStore.DagAction dagAction) {
return String.join(".",
dagAction.getFlowGroup(),
dagAction.getFlowName(),
String.valueOf(dagAction.getFlowExecutionId()),
dagAction.getJobName(),
String.valueOf(dagAction.getDagActionType())) + ".";
}

/**
* Creates a key for the reminder job by concatenating all dagAction fields and the eventTime of the dagAction.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ protected void handleDagAction(String operation, DagActionStore.DagAction dagAct
break;
case "DELETE":
log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
/* TODO: skip deadline removal for now and let them fire
if (dagActionType == DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE
|| dagActionType == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, true);
// clear any deadline reminders as well as any retry reminders
this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, false);
// Clear both deadline-group and retry-group reminders for this DagAction. The wildcard variant is required
// because the Quartz key embeds the per-lease-attempt eventTimeMillis (added by GOBBLIN-2016) which is not
// carried on the DagActionStore change event; see DagActionReminderScheduler#unscheduleRemindersForDagAction.
this.dagActionReminderScheduler.unscheduleRemindersForDagAction(dagAction, true);
this.dagActionReminderScheduler.unscheduleRemindersForDagAction(dagAction, false);
}
*/
break;
default:
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,68 @@ public void testRemindersForMultipleFlowExecutions() throws SchedulerException {
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, true);
}

@Test
public void testCreateDagActionKeyNamePrefixSharedAcrossEventTimes() {
String prefix = DagActionReminderScheduler.createDagActionKeyNamePrefix(launchDagAction);
Assert.assertTrue(DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams).startsWith(prefix),
"Both eventTime-keyed reminder keys for the same DagAction must share the prefix");
Assert.assertTrue(DagActionReminderScheduler.createDagActionReminderKey(launchLeaseParams2).startsWith(prefix),
"Both eventTime-keyed reminder keys for the same DagAction must share the prefix");
Assert.assertTrue(prefix.endsWith("."),
"Prefix must end with the segment separator so it cannot match unrelated keys (e.g., LAUNCH vs LAUNCH2)");
}

/*
Schedule deadline reminders at two distinct eventTimes for the same DagAction (simulating multiple lease attempts),
then assert the wildcard clear removes both — i.e., the DagActionStore DELETE path no longer needs to know the
original eventTimeMillis to find the scheduled reminders.
*/
@Test
public void testUnscheduleRemindersForDagActionClearsAllEventTimes() throws SchedulerException {
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000, true);
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2, 50000, true);
Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(
DagActionReminderScheduler.createJobKey(launchLeaseParams, true)));
Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(
DagActionReminderScheduler.createJobKey(launchLeaseParams2, true)));

int cleared = this.dagActionReminderScheduler.unscheduleRemindersForDagAction(launchDagAction, true);
Assert.assertEquals(cleared, 2,
"Wildcard clear must remove both eventTime-keyed reminders for the DagAction");
Assert.assertFalse(this.dagActionReminderScheduler.quartzScheduler.checkExists(
DagActionReminderScheduler.createJobKey(launchLeaseParams, true)));
Assert.assertFalse(this.dagActionReminderScheduler.quartzScheduler.checkExists(
DagActionReminderScheduler.createJobKey(launchLeaseParams2, true)));
}

/*
Reminders scheduled in the deadline group must not be cleared when the wildcard clear is invoked for the retry group,
and vice-versa. Guards against the prefix scan accidentally crossing groups.
*/
@Test
public void testUnscheduleRemindersForDagActionScopedToGroup() throws SchedulerException {
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 50000, true);
this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams2, 50000, false);

int clearedRetry = this.dagActionReminderScheduler.unscheduleRemindersForDagAction(launchDagAction, false);
Assert.assertEquals(clearedRetry, 1, "Should clear only the retry-group reminder");
Assert.assertTrue(this.dagActionReminderScheduler.quartzScheduler.checkExists(
DagActionReminderScheduler.createJobKey(launchLeaseParams, true)),
"Deadline-group reminder must remain after a retry-group clear");

int clearedDeadline = this.dagActionReminderScheduler.unscheduleRemindersForDagAction(launchDagAction, true);
Assert.assertEquals(clearedDeadline, 1, "Should clear the remaining deadline-group reminder");
}

@Test
public void testUnscheduleRemindersForDagActionNoopWhenNoneScheduled() throws SchedulerException {
DagActionStore.DagAction other = new DagActionStore.DagAction(
flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.RESUME);
int cleared = this.dagActionReminderScheduler.unscheduleRemindersForDagAction(other, true);
Assert.assertEquals(cleared, 0,
"Clearing reminders for a DagAction that never scheduled any must return 0, not throw");
}

// Test multiple schedulers can co-exist and run their jobs of different types
@Test
public void testMultipleSchedules() throws SchedulerException, InterruptedException, IOException {
Expand Down