Skip to content

Commit fe434a6

Browse files
committed
Ensure that kafka config parameters are propagated to post-processing
Feature introduced by QC-271 could not really work without allowing the user to propagate the kafka broker url and the right topic to the SOR/EOR triggers. This commit does the necessary plumbering.
1 parent 35aeb25 commit fe434a6

File tree

6 files changed

+14
-3
lines changed

6 files changed

+14
-3
lines changed

Framework/include/QualityControl/CommonSpec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ struct CommonSpec {
4444
LogDiscardParameters infologgerDiscardParameters;
4545
double postprocessingPeriod = 30.0;
4646
std::string bookkeepingUrl;
47+
std::string kafkaBrokersUrl;
48+
std::string kafkaTopicAliECSRun = "aliecs.run";
4749
};
4850

4951
} // namespace o2::quality_control::core

Framework/include/QualityControl/PostProcessingConfig.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ struct PostProcessingConfig : public o2::quality_control::core::UserCodeConfig {
3939
std::vector<std::string> updateTriggers = {};
4040
std::vector<std::string> stopTriggers = {};
4141
std::string kafkaBrokersUrl;
42-
std::string kafkaTopic;
42+
std::string kafkaTopicAliECSRun;
4343
core::Activity activity;
4444
bool matchAnyRunNumber = false;
4545
bool critical;

Framework/src/InfrastructureSpecReader.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ CommonSpec InfrastructureSpecReader::readSpecEntry<CommonSpec>(const std::string
8181
};
8282
spec.postprocessingPeriod = commonTree.get<double>("postprocessing.periodSeconds", spec.postprocessingPeriod);
8383
spec.bookkeepingUrl = commonTree.get<std::string>("bookkeeping.url", spec.bookkeepingUrl);
84+
spec.kafkaBrokersUrl = commonTree.get<std::string>("kafka.url", spec.kafkaBrokersUrl);
85+
spec.kafkaTopicAliECSRun = commonTree.get<std::string>("kafka.topicAliecsRun", spec.kafkaTopicAliECSRun);
8486

8587
return spec;
8688
}

Framework/src/PostProcessingConfig.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ PostProcessingConfig::PostProcessingConfig(const std::string& id, const boost::p
3939
detectorName = config.get<std::string>("qc.postprocessing." + id + ".detectorName", "MISC");
4040
ccdbUrl = config.get<std::string>("qc.config.conditionDB.url", "");
4141
consulUrl = config.get<std::string>("qc.config.consul.url", "");
42+
kafkaBrokersUrl = config.get<std::string>("qc.config.kafka.url", "");
43+
kafkaTopicAliECSRun = config.get<std::string>("qc.config.kafka.topicAliecsRun", "aliecs.run");
44+
4245
// if available, use the source repo as defined in the postprocessing task, otherwise the general QCDB
4346
auto sourceRepo = config.get_child_optional("qc.postprocessing." + id + ".sourceRepo");
4447
auto databasePath = sourceRepo ? "qc.postprocessing." + id + ".sourceRepo" : "qc.config.database";

Framework/src/TriggerHelpers.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ TriggerFcn triggerFactory(const std::string& trigger, const PostProcessingConfig
9595
} else if (triggerLowerCase == "always") {
9696
return triggers::Always(activity);
9797
} else if (triggerLowerCase == "sor" || triggerLowerCase == "startofrun") {
98-
return triggers::StartOfRun(config.kafkaBrokersUrl, config.kafkaTopic, config.detectorName, config.taskName, activity);
98+
return triggers::StartOfRun(config.kafkaBrokersUrl, config.kafkaTopicAliECSRun, config.detectorName, config.taskName, activity);
9999
} else if (triggerLowerCase == "eor" || triggerLowerCase == "endofrun") {
100-
return triggers::EndOfRun(config.kafkaBrokersUrl, config.kafkaTopic, config.detectorName, config.taskName, activity);
100+
return triggers::EndOfRun(config.kafkaBrokersUrl, config.kafkaTopicAliECSRun, config.detectorName, config.taskName, activity);
101101
} else if (triggerLowerCase == "sof" || triggerLowerCase == "startoffill") {
102102
return triggers::StartOfFill(activity);
103103
} else if (triggerLowerCase == "eof" || triggerLowerCase == "endoffill") {

doc/Advanced.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,6 +1670,10 @@ should not be present in real configuration files.
16701670
"bookkeeping": { "": "Configuration of the bookkeeping (optional)",
16711671
"url": "localhost:4001", "": "Url of the bookkeeping API (port is usually different from web interface)"
16721672
},
1673+
"kafka": {
1674+
"url": "kafka-broker:123", "": "url of the kafka broker",
1675+
"topicAliecsRun":"aliecs.run", "": "the topic where AliECS publishes Run Events, 'aliecs.run' by default"
1676+
},
16731677
"postprocessing": { "": "Configuration parameters for post-processing",
16741678
"periodSeconds": 10.0, "": "Sets the interval of checking all the triggers. One can put a very small value",
16751679
"": "for async processing, but use 10 or more seconds for synchronous operations",

0 commit comments

Comments
 (0)