[GOBBLIN-2264] Key deadline reminders by dagAction only so DELETE events can unschedule them#4194
Merged
Merged
Conversation
d9101bd to
b272cca
Compare
…nts can unschedule them Deadline reminder JobKeys previously embedded the lease event time, but the DagActionStore DELETE event payload omits event time, so the change monitor could not cancel a still-pending deadline reminder for a dagAction whose row had already been resolved. Re-key deadline reminders by (flowGroup, flowName, flowExecutionId, jobName, dagActionType) only and add an unscheduleReminderJob(DagAction) overload that the DELETE handler can invoke. Replace any pre-existing deadline reminder on schedule to avoid ObjectAlreadyExistsException when the delete-then-reinsert duplicate-insert path in DagProcUtils.sendEnforce*DeadlineDagAction races with an out-of-order change event. Retry reminders still embed event time and are intentionally not unscheduled on DELETE; their JobKey is not derivable from the DELETE payload, and orphaned retries fire harmlessly because the lease arbiter sees the row is gone. Re-enable the previously TODO-disabled DELETE assertion in DagManagementDagActionStoreChangeMonitorTest to cover the activated code path.
b272cca to
5e75747
Compare
agam-99
approved these changes
May 19, 2026
Blazer-007
approved these changes
May 19, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Problem. Deadline reminders for
ENFORCE_JOB_START_DEADLINE/ENFORCE_FLOW_FINISH_DEADLINEcould not be cancelled byDagManagementDagActionStoreChangeMonitoron a DELETE event:JobKeyfor every reminder embedded the lease event time (seecreateDagActionReminderKeyon master), but theDagActionStoreChangeEventpayload for a DELETE row carries no event time. The DELETE branch in the change monitor was therefore commented out with a TODO ("skip deadline removal for now and let them fire"), leaving deadline reminders to fire on already-resolved actions.DagProcUtils.sendEnforceJobStartDeadlineDagAction/sendEnforceFlowFinishDeadlineDagActionrecover fromSQLIntegrityConstraintViolationExceptionby deleting the existing row and reinserting it. If the resulting DELETE + INSERT change events arrived out-of-order on the consumer, an unschedule attempt could miss the still-live old reminder and the new INSERT would then collide on the Quartz key, throwingObjectAlreadyExistsExceptionand losing the new deadline.Change.
DagActionReminderScheduler.createDagActionReminderKey(LeaseParams, boolean isDeadlineReminder): deadline reminders are now keyed only by(flowGroup, flowName, flowExecutionId, jobName, dagActionType)— event time is intentionally omitted. TheDagActionStoreprimary key already guarantees uniqueness over that tuple forENFORCE_*_DEADLINE, so at most one deadline reminder can ever be in flight for that tuple. Retry reminders keep the event-time suffix (multi-KILL / multi-RESUME still need to coexist by event time).unscheduleReminderJob(DagActionStore.DagAction)that the DELETE handler can invoke without needing the original event time. The existing(LeaseParams, boolean)overload is preserved.scheduleReminderperforms a replace-on-conflict for deadline reminders: if a JobKey already exists at schedule time, the pre-existing job is deleted before the new one is scheduled — defends against the duplicate-insert + out-of-order race described above.DagManagementDagActionStoreChangeMonitorDELETE branch is activated forENFORCE_*_DEADLINE. Retry reminders are intentionally not unscheduled here because their key embeds event time; orphaned retries fire harmlessly — when the reminder fires,ReminderJob.executecallsDagManagement.addDagActionwhich routes throughMysqlMultiActiveLeaseArbiter.tryAcquireLease; with the row already gone, the arbiter completes the lifecycle without re-triggering downstream work.Backwards compatibility.
StdSchedulerFactory(properties)setting onlyinstanceNameandthreadCount; the in-treeconf/*/quartz.propertiesall useorg.quartz.simpl.RAMJobStore. No reminders are persisted across restart, so the JobKey format change has zero migration risk.createDagActionReminderKey(LeaseParams)signature gained a requiredbooleanparameter. All in-repo callers (createJobKey,createTriggerKey, andDagActionReminderSchedulerTest) have been updated. No usages outsidegobblin-service/and the gobblin-kafka-09 test.unscheduleReminderJob(LeaseParams, boolean)is preserved (still used internally for retry-reminder lifecycle).Tests
My PR adds the following unit tests OR does not need testing for this extremely good reason:
DagActionReminderSchedulerTest.testCreateDagActionReminderKeyForDeadlineOmitsEventTime— asserts the deadline key is(flowGroup, flowName, flowExecutionId, jobName, dagActionType)only and that twoLeaseParamsfor the same dagAction with different event times produce the same deadline key.DagActionReminderSchedulerTest.testScheduleDeadlineReminderReplacesExistingEntry— exercises the duplicate-insert + out-of-order path: schedules a deadline reminder, then schedules a second reminder with a different event time for the same dagAction, and asserts no exception is thrown and the reminder is still present after the replace.DagActionReminderSchedulerTest.testUnscheduleDeadlineReminderByDagAction— covers the newunscheduleReminderJob(DagAction)overload and asserts idempotency (a second unschedule of an already-removed reminder must not throw).DagActionReminderSchedulerTest.testCreateDagActionReminderKeyupdated to call the new(LeaseParams, boolean)signature for retry-reminder keys.DagManagementDagActionStoreChangeMonitorTest.testProcessMessageWithDelete— re-enables the previously TODO-disabledverify(...)and adapts it to the newunscheduleReminderJob(DagAction)overload, now that the DELETE branch is no longer commented out.Commits