@@ -529,12 +529,72 @@ o2::framework::ServiceSpec CommonServices::ccdbSupportSpec()
529529 } },
530530 .kind = ServiceKind::Global};
531531}
532+ struct DecongestionContext {
533+ ServiceRegistryRef ref;
534+ TimesliceIndex::OldestOutputInfo oldestPossibleOutput;
535+ };
536+
537+ auto decongestionCallback = [](AsyncTask& task, size_t id) -> void {
538+ auto & oldestPossibleOutput = task.user <DecongestionContext>().oldestPossibleOutput ;
539+ auto & ref = task.user <DecongestionContext>().ref ;
540+
541+ auto & decongestion = ref.get <DecongestionService>();
542+ auto & proxy = ref.get <FairMQDeviceProxy>();
543+
544+ O2_SIGNPOST_ID_GENERATE (cid, async_queue);
545+ cid.value = id;
546+ if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice .value ) {
547+ O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" , " Not sending already sent value: %" PRIu64 " > %" PRIu64,
548+ decongestion.lastTimeslice , (uint64_t )oldestPossibleOutput.timeslice .value );
549+ return ;
550+ }
551+ O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" , " Running oldest possible timeslice %" PRIu64 " propagation." ,
552+ (uint64_t )oldestPossibleOutput.timeslice .value );
553+ DataProcessingHelpers::broadcastOldestPossibleTimeslice (ref, oldestPossibleOutput.timeslice .value );
554+
555+ for (int fi = 0 ; fi < proxy.getNumForwardChannels (); fi++) {
556+ auto & info = proxy.getForwardChannelInfo (ChannelIndex{fi});
557+ auto & state = proxy.getForwardChannelState (ChannelIndex{fi});
558+ // TODO: this we could cache in the proxy at the bind moment.
559+ if (info.channelType != ChannelAccountingType::DPL) {
560+ O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" , " Skipping channel %{public}s" , info.name .c_str ());
561+ continue ;
562+ }
563+ if (DataProcessingHelpers::sendOldestPossibleTimeframe (ref, info, state, oldestPossibleOutput.timeslice .value )) {
564+ O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" ,
565+ " Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 " , priority %d" ,
566+ info.name .c_str (), (uint64_t )oldestPossibleOutput.timeslice .value , 20 );
567+ }
568+ }
569+ decongestion.lastTimeslice = oldestPossibleOutput.timeslice .value ;
570+ };
571+
572+ auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
573+ auto & oldestPossibleOutput = task.user <DecongestionContext>().oldestPossibleOutput ;
574+ auto & ref = task.user <DecongestionContext>().ref ;
575+
576+ auto & decongestion = ref.get <DecongestionService>();
577+ auto & state = ref.get <DeviceState>();
578+ auto & timesliceIndex = ref.get <TimesliceIndex>();
579+ O2_SIGNPOST_ID_GENERATE (cid, async_queue);
580+ int64_t oldNextTimeslice = decongestion.nextTimeslice ;
581+ decongestion.nextTimeslice = std::max (decongestion.nextTimeslice , (int64_t )oldestPossibleOutput.timeslice .value );
582+ if (oldNextTimeslice != decongestion.nextTimeslice ) {
583+ if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode ()) {
584+ O2_SIGNPOST_EVENT_EMIT_WARN (async_queue, cid, " oldest_possible_timeslice" , " Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
585+ } else {
586+ O2_SIGNPOST_EVENT_EMIT_ERROR (async_queue, cid, " oldest_possible_timeslice" , " Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
587+ }
588+ timesliceIndex.rescan ();
589+ }
590+ };
532591
533592// Decongestion service
534593// If we do not have any Timeframe input, it means we must be creating timeslices
535594// in order and that we should propagate the oldest possible timeslice at the end
536595// of each processing step.
537- o2::framework::ServiceSpec CommonServices::decongestionSpec ()
596+ o2::framework::ServiceSpec
597+ CommonServices::decongestionSpec ()
538598{
539599 return ServiceSpec{
540600 .name = " decongestion" ,
@@ -548,7 +608,7 @@ o2::framework::ServiceSpec CommonServices::decongestionSpec()
548608 }
549609 }
550610 auto & queue = services.get <AsyncQueue>();
551- decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create (queue, {" oldest-possible-timeslice" , 100 });
611+ decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create (queue, {. name = " oldest-possible-timeslice" , . score = 100 });
552612 return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
553613 },
554614 .postForwarding = [](ProcessingContext& ctx, void * service) {
@@ -632,7 +692,6 @@ o2::framework::ServiceSpec CommonServices::decongestionSpec()
632692 auto & decongestion = services.get <DecongestionService>();
633693 auto & relayer = services.get <DataRelayer>();
634694 auto & timesliceIndex = services.get <TimesliceIndex>();
635- auto & proxy = services.get <FairMQDeviceProxy>();
636695 O2_SIGNPOST_ID_FROM_POINTER (cid, data_processor_context, &decongestion);
637696 O2_SIGNPOST_EVENT_EMIT (data_processor_context, cid, " oldest_possible_timeslice" , " Received oldest possible timeframe %" PRIu64 " from channel %d" ,
638697 (uint64_t )oldestPossibleTimeslice, channel.value );
@@ -650,59 +709,22 @@ o2::framework::ServiceSpec CommonServices::decongestionSpec()
650709 return ;
651710 }
652711 auto & queue = services.get <AsyncQueue>();
653- const auto & spec = services.get <DeviceSpec const >();
654712 const auto & state = services.get <DeviceState>();
655- auto *device = services.get <RawDeviceService>().device ();
656713 // / We use the oldest possible timeslice to debounce, so that only the latest one
657714 // / at the end of one iteration is sent.
658715 O2_SIGNPOST_EVENT_EMIT (data_processor_context, cid, " oldest_possible_timeslice" , " Queueing oldest possible timeslice %" PRIu64 " propagation for execution." ,
659716 (uint64_t )oldestPossibleOutput.timeslice .value );
660717 AsyncQueueHelpers::post (
661- queue, decongestion.oldestPossibleTimesliceTask , [ref = services, oldestPossibleOutput, &decongestion, &proxy, &spec, device, ×liceIndex](size_t id) {
662- O2_SIGNPOST_ID_GENERATE (cid, async_queue);
663- cid.value = id;
664- if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice .value ) {
665- O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" , " Not sending already sent value: %" PRIu64 " > %" PRIu64,
666- decongestion.lastTimeslice , (uint64_t )oldestPossibleOutput.timeslice .value );
667- return ;
668- }
669- O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" , " Running oldest possible timeslice %" PRIu64 " propagation." ,
670- (uint64_t )oldestPossibleOutput.timeslice .value );
671- DataProcessingHelpers::broadcastOldestPossibleTimeslice (ref, oldestPossibleOutput.timeslice .value );
672-
673- for (int fi = 0 ; fi < proxy.getNumForwardChannels (); fi++) {
674- auto & info = proxy.getForwardChannelInfo (ChannelIndex{fi});
675- auto & state = proxy.getForwardChannelState (ChannelIndex{fi});
676- // TODO: this we could cache in the proxy at the bind moment.
677- if (info.channelType != ChannelAccountingType::DPL) {
678- O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" , " Skipping channel %{public}s" , info.name .c_str ());
679- continue ;
680- }
681- if (DataProcessingHelpers::sendOldestPossibleTimeframe (ref, info, state, oldestPossibleOutput.timeslice .value )) {
682- O2_SIGNPOST_EVENT_EMIT (async_queue, cid, " oldest_possible_timeslice" ,
683- " Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 " , priority %d" ,
684- info.name .c_str (), (uint64_t )oldestPossibleOutput.timeslice .value , 20 );
685- }
686- }
687- decongestion.lastTimeslice = oldestPossibleOutput.timeslice .value ;
688- },
689- TimesliceId{oldestPossibleTimeslice}, -1 );
718+ queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
719+ .id = decongestion.oldestPossibleTimesliceTask ,
720+ .debounce = -1 , .callback = decongestionCallback}
721+ .user <DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
722+
690723 if (decongestion.orderedCompletionPolicyActive ) {
691724 AsyncQueueHelpers::post (
692- queue, decongestion.oldestPossibleTimesliceTask , [ref = services, oldestPossibleOutput, &decongestion, &proxy, &spec, &state, device, ×liceIndex](size_t id) {
693- O2_SIGNPOST_ID_GENERATE (cid, async_queue);
694- int64_t oldNextTimeslice = decongestion.nextTimeslice ;
695- decongestion.nextTimeslice = std::max (decongestion.nextTimeslice , (int64_t )oldestPossibleOutput.timeslice .value );
696- if (oldNextTimeslice != decongestion.nextTimeslice ) {
697- if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode ()) {
698- O2_SIGNPOST_EVENT_EMIT_WARN (async_queue, cid, " oldest_possible_timeslice" , " Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
699- } else {
700- O2_SIGNPOST_EVENT_EMIT_ERROR (async_queue, cid, " oldest_possible_timeslice" , " Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
701- }
702- timesliceIndex.rescan ();
703- }
704- },
705- TimesliceId{oldestPossibleOutput.timeslice .value }, -1 );
725+ queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice .value },.id = decongestion.oldestPossibleTimesliceTask , .debounce = -1 ,
726+ .callback = decongestionCallbackOrdered}
727+ .user <DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
706728 } },
707729 .kind = ServiceKind::Serial};
708730}
0 commit comments