Skip to content

Commit 3db6cca

Browse files
committed
GPU Workflow: Pop next tf from completion policy queue only when actually running and add sanity checks
1 parent ac05dee commit 3db6cca

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

GPU/GPUTracking/Global/GPUChainTrackingClusterizer.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,9 @@ int32_t GPUChainTracking::RunTPCClusterizer(bool synchronizeOutput)
751751
if (buildNativeGPU) {
752752
AllocateRegisteredMemory(mInputsHost->mResourceClusterNativeBuffer);
753753
}
754+
if (mWaitForFinalInputs && GetProcessingSettings().nTPCClustererLanes > 6) {
755+
GPUFatal("ERROR, mWaitForFinalInputs cannot be called with nTPCClustererLanes > 6");
756+
}
754757
if (buildNativeHost && !(buildNativeGPU && GetProcessingSettings().delayedOutput)) {
755758
if (mWaitForFinalInputs) {
756759
GPUFatal("Cannot use waitForFinalInput callback without delayed output");

GPU/Workflow/src/GPUWorkflowPipeline.cxx

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,7 @@ void GPURecoWorkflowSpec::initPipeline(o2::framework::InitContext& ic)
6666
mPolicyOrder = [this](o2::framework::DataProcessingHeader::StartTime timeslice) {
6767
std::unique_lock lk(mPipeline->completionPolicyMutex);
6868
mPipeline->completionPolicyNotify.wait(lk, [pipeline = mPipeline.get()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue.empty(); });
69-
if (mPipeline->completionPolicyQueue.front() == timeslice) {
70-
mPipeline->completionPolicyQueue.pop();
71-
return true;
72-
}
73-
return false;
69+
return !mPipeline->completionPolicyQueue.empty() && mPipeline->completionPolicyQueue.front() == timeslice;
7470
};
7571
mPipeline->receiveThread = std::thread([this]() { RunReceiveThread(); });
7672
for (uint32_t i = 0; i < mPipeline->workers.size(); i++) {
@@ -175,6 +171,14 @@ int32_t GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingIn
175171
tpcZSmeta = std::move(context->tpcZSmeta);
176172
tpcZS = context->tpcZS;
177173
ptrs.tpcZS = &tpcZS;
174+
175+
{
176+
std::lock_guard lk(mPipeline->completionPolicyMutex);
177+
if (mPipeline->completionPolicyQueue.empty() || mPipeline->completionPolicyQueue.front() != tinfo.timeslice) {
178+
LOG(fatal) << "Time frame processed does not equal the timeframe at the top of the queue, time frames seem out of sync";
179+
}
180+
mPipeline->completionPolicyQueue.pop();
181+
}
178182
}
179183
if (mSpecConfig.enableDoublePipeline == 2) {
180184
auto prepareDummyMessage = pc.outputs().make<DataAllocator::UninitializedVector<char>>(Output{gDataOriginGPU, "PIPELINEPREPARE", 0}, 0u);

0 commit comments

Comments
 (0)