Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions defaultEnvironment.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ subprojects {
mavenCentral()
maven {
url "https://repository.cloudera.com/artifactory/cloudera-repos/"
content {
// io.grpc artifacts are available from Maven Central. Excluding them here avoids querying the Cloudera
excludeGroup "io.grpc"
}
}

// Conjars is a read only repository that hosts older artifacts, necessary for hive 1.0.1 and hadoop 2.10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public class ConfigurationKeys {
public static final String FLOW_EDGE_ID_KEY = "flow.edgeId";
public static final String FLOW_DESCRIPTION_KEY = "flow.description";
public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId";
// Stamped onto the JobSpec config by LaunchDagProc, carrying the DagAction store row-insert time in millis
// (sourced upstream from the CDC binlog event timestamp). Enables downstream executors to measure end-to-end
// LAUNCH-to-submission latency including CDC propagation. Absent / -1 when the source timestamp is unknown.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will r doing this if (storeInsertTimeMillis != DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS), will this be ever -1L

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the default and fallback value -

      long storeInsertTimeMillis = jobDataMap.containsKey(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
          ? jobDataMap.getLong(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
          : DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS;
`

public static final String DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY = "dagAction.launch.storeInsertTimeMillis";
public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
public static final String FLOW_APPLY_RETENTION = "flow.applyRetention";
public static final String FLOW_APPLY_INPUT_RETENTION = "flow.applyInputRetention";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
},
"doc" : "type of dag action",
"compliance" : "NONE"
}
]
}, {
"name" : "storeInsertTimeMillis",
"type" : [ "null", "long" ],
"default" : null,
"doc" : "DagAction store row-insert time in epoch millis, sourced upstream from the MySQL binlog EventTimestamp on the dag_action row INSERT. Distinct from the kafka publish time on changeEventIdentifier — this is the original DB-side timestamp, used by downstream consumers to measure end-to-end CDC+orchestration latency. Null when the producer cannot determine it.",
"compliance" : "NONE"
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType
new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType);
txidCounter++;
return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, String.valueOf(flowExecutionId),
jobName, dagAction);
jobName, dagAction, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ public static JobDetail createReminderJobDetail(DagActionStore.LeaseParams lease
dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagAction.getFlowExecutionId());
dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, dagAction.getDagActionType());
dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY, leaseParams.getEventTimeMillis());
// Carry the source DagActionStore row-insert time through the Quartz reminder so a host-failure-driven
// reattempt still preserves the original timestamp for end-to-end latency instrumentation.
dataMap.put(ReminderJob.FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY, leaseParams.getStoreInsertTimeMillis());

return JobBuilder.newJob(ReminderJob.class)
.withIdentity(createJobKey(leaseParams, isDeadlineReminder))
Expand Down Expand Up @@ -190,6 +193,7 @@ public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) {
public static class ReminderJob implements Job {
public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
public static final String FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY = "flow.storeInsertTimeMillis";
private final DagManagement dagManagement;

@Override
Expand All @@ -202,10 +206,16 @@ public void execute(JobExecutionContext context) {
long flowExecutionId = jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType = (DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
// Restore the original DagActionStore row-insert time so downstream LaunchDagProc can still stamp
// the JobSpec on a reminder-driven reattempt. Defaults to UNKNOWN for reminders scheduled by older
// code paths that did not populate the key.
long storeInsertTimeMillis = jobDataMap.containsKey(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
? jobDataMap.getLong(FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY)
: DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS;

DagActionStore.LeaseParams reminderLeaseParams = new DagActionStore.LeaseParams(
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType),
true, eventTimeMillis);
true, eventTimeMillis, storeInsertTimeMillis);
log.info("DagProc reminder triggered for dagAction event: {}", reminderLeaseParams);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,37 @@ public Dag.DagId getDagId() {
* {@link DagAction} along with the time it was requested, denoted by the `eventTimeMillis` field. It also tracks
* whether it has been previously passed to the {@link MultiActiveLeaseArbiter} to attempt ownership over the flow
* event, indicated by the 'isReminder' field (true when it has been previously attempted).
*
* The `storeInsertTimeMillis` field carries the original DagAction store row-insert time (sourced upstream
* from the CDC binlog event timestamp) when known. It is independent of `eventTimeMillis`, which the lease
* arbiter may rewrite via consensus. A value of {@link LeaseParams#UNKNOWN_STORE_INSERT_TIME_MILLIS} signals "not provided"
* and is the default for callers that do not have access to the source timestamp.
*/
@Data
@RequiredArgsConstructor
class LeaseParams {
public static final long UNKNOWN_STORE_INSERT_TIME_MILLIS = -1L;

final DagAction dagAction;
final boolean isReminder;
final long eventTimeMillis;
final long storeInsertTimeMillis;

public LeaseParams(DagAction dagAction, boolean isReminder, long eventTimeMillis, long storeInsertTimeMillis) {
this.dagAction = dagAction;
this.isReminder = isReminder;
this.eventTimeMillis = eventTimeMillis;
this.storeInsertTimeMillis = storeInsertTimeMillis;
}

public LeaseParams(DagAction dagAction, boolean isReminder, long eventTimeMillis) {
this(dagAction, isReminder, eventTimeMillis, UNKNOWN_STORE_INSERT_TIME_MILLIS);
}

/**
* Creates a lease object for a dagAction and eventTimeMillis representing an original event (isReminder is False)
*/
public LeaseParams(DagAction dagAction, long eventTimeMillis) {
this(dagAction, false, eventTimeMillis);
this(dagAction, false, eventTimeMillis, UNKNOWN_STORE_INSERT_TIME_MILLIS);
}

public LeaseParams(DagAction dagAction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,19 @@ private LeaseAttemptStatus doTryAcquireLease(DagActionStore.LeaseParams leasePar
if (isWithinEpsilon) {
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ? leaseParams.updateDagActionFlowExecutionId(dbEventTimestamp.getTime()) : leaseParams.getDagAction();
// Preserve storeInsertTimeMillis through consensus so downstream consumers can measure end-to-end latency.
DagActionStore.LeaseParams updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction,
dbEventTimestamp.getTime());
false, dbEventTimestamp.getTime(), leaseParams.getStoreInsertTimeMillis());
log.debug("tryAcquireLease for {} - CASE 2: Same event, lease is valid", contextualizeLeasing(updatedLeaseParams));
// Utilize db timestamp for reminder
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ? leaseParams.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime()) : leaseParams.getDagAction();
// Preserve storeInsertTimeMillis through consensus so downstream consumers can measure end-to-end latency.
DagActionStore.LeaseParams updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction,
dbCurrentTimestamp.getTime());
false, dbCurrentTimestamp.getTime(), leaseParams.getStoreInsertTimeMillis());
log.debug("tryAcquireLease for {} - CASE 3: Distinct event, lease is valid", contextualizeLeasing(updatedLeaseParams));
// Utilize db lease acquisition timestamp for wait time and currentTimestamp as the new eventTimestamp
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
Expand Down Expand Up @@ -537,8 +539,11 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
}
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ? leaseParams.updateDagActionFlowExecutionId(selectInfoResult.eventTimeMillis) : leaseParams.dagAction;
// Preserve storeInsertTimeMillis through consensus so downstream LaunchDagProc can stamp the JobSpec
// for end-to-end LAUNCH-to-submission latency instrumentation that includes CDC propagation.
DagActionStore.LeaseParams consensusLeaseParams =
new DagActionStore.LeaseParams(updatedDagAction, selectInfoResult.getEventTimeMillis());
new DagActionStore.LeaseParams(updatedDagAction, false, selectInfoResult.getEventTimeMillis(),
leaseParams.getStoreInsertTimeMillis());
// If no db current timestamp is present, then use the full db linger value for duration
long minimumLingerDurationMillis = dbCurrentTimestamp.isPresent() ?
selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagUtils;
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
Expand Down Expand Up @@ -69,6 +70,11 @@ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dag
try {
FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId()));
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, getDagId().getFlowExecutionId());
long storeInsertTimeMillis = getDagTask().getLeaseParams().getStoreInsertTimeMillis();
if (storeInsertTimeMillis != DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS) {
flowSpec.addProperty(ConfigurationKeys.DAG_ACTION_LAUNCH_STORE_INSERT_TIME_MILLIS_KEY,
storeInsertTimeMillis);
}
Optional<Dag<JobExecutionPlan>> dag = this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
if (dag.isPresent()) {
dagManagementStateStore.addDag(dag.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public abstract class DagTask {
private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
private final DagProcessingEngineMetrics dagProcEngineMetrics;

/**
* Returns the consensus {@link DagActionStore.LeaseParams} for this task, exposing the per-event metadata that
* the lease arbiter agreed on — including {@code storeInsertTimeMillis} for downstream latency instrumentation.
*/
public DagActionStore.LeaseParams getLeaseParams() {
return this.leaseObtainedStatus.getConsensusLeaseParams();
}

public DagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
DagManagementStateStore dagManagementStateStore, DagProcessingEngineMetrics dagProcEngineMetrics) {
this.dagAction = dagAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.quartz.Job;
import org.quartz.JobBuilder;
Expand All @@ -46,6 +47,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class DagActionReminderSchedulerTest {
Expand Down Expand Up @@ -104,6 +106,69 @@ public void testCreateReminderJobDetail() {
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_EVENT_TIME_KEY), launchLeaseParams.getEventTimeMillis());
}

@Test
public void testCreateReminderJobDetailStashesStoreInsertTimeMillis() {
long sourceStoreInsertTime = 1700000000000L;
DagActionStore.LeaseParams paramsWithStoreInsertTime = new DagActionStore.LeaseParams(
launchDagAction, false, eventTimeMillis, sourceStoreInsertTime);

JobDetail jobDetail = DagActionReminderScheduler.createReminderJobDetail(paramsWithStoreInsertTime, false);
JobDataMap dataMap = jobDetail.getJobDataMap();
Assert.assertEquals(
dataMap.getLong(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY),
sourceStoreInsertTime,
"Reminder JobDataMap should stash storeInsertTimeMillis so host-failure reattempts preserve it");
}

@Test
public void testReminderJobExecuteCarriesStoreInsertTimeMillis() {
long sourceStoreInsertTime = 1700000000000L;
DagActionStore.LeaseParams paramsWithStoreInsertTime = new DagActionStore.LeaseParams(
launchDagAction, false, eventTimeMillis, sourceStoreInsertTime);
JobDetail jobDetail = DagActionReminderScheduler.createReminderJobDetail(paramsWithStoreInsertTime, false);
JobExecutionContext context = mock(JobExecutionContext.class);
when(context.getMergedJobDataMap()).thenReturn(jobDetail.getJobDataMap());

DagManagement capturedDagManagement = mock(DagManagement.class);
ArgumentCaptor<DagActionStore.LeaseParams> captor = ArgumentCaptor.forClass(DagActionStore.LeaseParams.class);
try {
doNothing().when(capturedDagManagement).addDagAction(captor.capture());
} catch (IOException e) {
throw new RuntimeException(e);
}

new DagActionReminderScheduler.ReminderJob(capturedDagManagement).execute(context);

DagActionStore.LeaseParams reconstructed = captor.getValue();
Assert.assertEquals(reconstructed.getStoreInsertTimeMillis(), sourceStoreInsertTime,
"Reminder fire path should restore storeInsertTimeMillis from JobDataMap");
Assert.assertTrue(reconstructed.isReminder(), "Reminder-driven LeaseParams must carry isReminder=true");
}

@Test
public void testReminderJobExecuteFallsBackToUnknownWhenKeyAbsent() {
// Simulates a reminder scheduled by an older code path that did not stash the storeInsertTimeMillis key.
JobDetail jobDetail = DagActionReminderScheduler.createReminderJobDetail(launchLeaseParams, false);
JobDataMap dataMap = jobDetail.getJobDataMap();
dataMap.remove(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_STORE_INSERT_TIME_MILLIS_KEY);
JobExecutionContext context = mock(JobExecutionContext.class);
when(context.getMergedJobDataMap()).thenReturn(dataMap);

DagManagement capturedDagManagement = mock(DagManagement.class);
ArgumentCaptor<DagActionStore.LeaseParams> captor = ArgumentCaptor.forClass(DagActionStore.LeaseParams.class);
try {
doNothing().when(capturedDagManagement).addDagAction(captor.capture());
} catch (IOException e) {
throw new RuntimeException(e);
}

new DagActionReminderScheduler.ReminderJob(capturedDagManagement).execute(context);

Assert.assertEquals(captor.getValue().getStoreInsertTimeMillis(),
DagActionStore.LeaseParams.UNKNOWN_STORE_INSERT_TIME_MILLIS,
"Reminders scheduled by older code paths (no key) must fall back to UNKNOWN, not throw");
}

/*
Add deadline reminders for multiple launches of the same flow and assert no exception is thrown and they can be
deleted as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,46 @@ public DagActionStore.LeaseParams getUniqueLaunchLeaseParams() {
return new DagActionStore.LeaseParams(getUniqueLaunchDagAction(), false, eventTimeMillis);
}

/**
* Verifies the consensus {@link DagActionStore.LeaseParams} returned by {@code tryAcquireLease} preserves the
* caller-supplied {@code storeInsertTimeMillis} across all three internal construction sites:
* - CASE 1 / fresh-lease (selectInfoResult path) on first acquisition
* - CASE 2 (within-epsilon, lease still valid) on repeated acquisition with the same event
* - CASE 3 (distinct event, lease still valid) after sleeping past epsilon
* The arbiter consensus rewrites {@code eventTimeMillis} but must NOT clobber {@code storeInsertTimeMillis}.
*/
@Test
public void testStoreInsertTimeMillisPreservedThroughConsensus() throws Exception {
final long sourceStoreInsertTime = 1700000000000L;
DagActionStore.LeaseParams paramsWithStoreInsertTime = new DagActionStore.LeaseParams(
getUniqueLaunchDagAction(), false, eventTimeMillis, sourceStoreInsertTime);

// CASE 1 / fresh-lease: storeInsertTimeMillis should travel onto the consensus LeaseParams
LeaseAttemptStatus firstStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(paramsWithStoreInsertTime, true);
Assert.assertTrue(firstStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus,
firstStatus.getClass().getSimpleName() + " expected LeaseObtainedStatus");
Assert.assertEquals(firstStatus.getConsensusLeaseParams().getStoreInsertTimeMillis(), sourceStoreInsertTime,
"Fresh-lease consensus path dropped storeInsertTimeMillis");

// CASE 2: same event again within epsilon → LeasedToAnotherStatus (within-epsilon branch)
LeaseAttemptStatus secondStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(paramsWithStoreInsertTime, true);
Assert.assertTrue(secondStatus instanceof LeaseAttemptStatus.LeasedToAnotherStatus,
secondStatus.getClass().getSimpleName() + " expected LeasedToAnotherStatus");
Assert.assertEquals(secondStatus.getConsensusLeaseParams().getStoreInsertTimeMillis(), sourceStoreInsertTime,
"CASE 2 (within-epsilon) dropped storeInsertTimeMillis");

// CASE 3: distinct event (sleep past epsilon) while lease still valid → LeasedToAnotherStatus (distinct-event branch)
Thread.sleep(MORE_THAN_EPSILON);
LeaseAttemptStatus thirdStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(paramsWithStoreInsertTime, true);
Assert.assertTrue(thirdStatus instanceof LeaseAttemptStatus.LeasedToAnotherStatus,
thirdStatus.getClass().getSimpleName() + " expected LeasedToAnotherStatus");
Assert.assertEquals(thirdStatus.getConsensusLeaseParams().getStoreInsertTimeMillis(), sourceStoreInsertTime,
"CASE 3 (distinct event, lease valid) dropped storeInsertTimeMillis");
}

/**
* Marks the lease associated with the dagAction as completed by fabricating a LeaseObtainedStatus
* @return SelectInfoResult object containing the event information used to complete the lease
Expand Down
Loading
Loading