Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
find_package(Boost REQUIRED COMPONENTS unit_test_framework program_options system filesystem)
find_package(Git QUIET)
find_package(ApMon MODULE)
find_package(CURL MODULE)
find_package(RdKafka CONFIG)

####################################
Expand Down Expand Up @@ -105,11 +106,12 @@ add_library(Monitoring SHARED
src/Exceptions/MonitoringException.cxx
$<$<BOOL:${ApMon_FOUND}>:src/Backends/ApMonBackend.cxx>
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/Kafka.cxx>
$<$<BOOL:${CURL_FOUND}>:src/Transports/HTTP.cxx>
)

target_include_directories(Monitoring
PUBLIC
$<INSTALL_INTERFACE:include>
PUBLIC
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src
Expand All @@ -127,6 +129,7 @@ target_link_libraries(Monitoring
pthread
$<$<BOOL:${ApMon_FOUND}>:ApMon::ApMon>
$<$<BOOL:${RdKafka_FOUND}>:RdKafka::rdkafka++>
$<$<BOOL:${CURL_FOUND}>:CURL::libcurl>
)

# Handle ApMon optional dependency
Expand All @@ -138,6 +141,10 @@ if(RdKafka_FOUND)
message(STATUS " Compiling Kafka transport")
endif()

if(CURL_FOUND)
message(STATUS " Compiling HTTP transport/InfluxDB 2.x backend")
endif()

# Detect operating system
if (UNIX AND NOT APPLE)
message(STATUS "Detected Linux: Process monitor enabled")
Expand All @@ -155,6 +162,7 @@ target_compile_definitions(Monitoring
$<$<BOOL:${LINUX}>:O2_MONITORING_OS_LINUX>
$<$<BOOL:${ApMon_FOUND}>:O2_MONITORING_WITH_APPMON>
$<$<BOOL:${RdKafka_FOUND}>:O2_MONITORING_WITH_KAFKA>
$<$<BOOL:${CURL_FOUND}>:O2_MONITORING_WITH_CURL>
)

# Use C++17
Expand All @@ -173,6 +181,7 @@ set(EXAMPLES
examples/5-Benchmark.cxx
examples/6-Increment.cxx
examples/7-InternalBenchamrk.cxx
examples/8-DbFiller.cxx
examples/10-Buffering.cxx
)

Expand All @@ -187,6 +196,7 @@ foreach (example ${EXAMPLES})
endforeach()

set_target_properties(5-Benchmark PROPERTIES OUTPUT_NAME "o2-monitoring-benchmark")
set_target_properties(8-DbFiller PROPERTIES OUTPUT_NAME "o2-monitoring-dbfiller")

####################################
# Tests
Expand Down Expand Up @@ -217,7 +227,7 @@ foreach (test ${TEST_SRCS})

add_executable(${test_name} ${test})
target_link_libraries(${test_name}
PRIVATE
PRIVATE
Monitoring Boost::unit_test_framework Boost::filesystem
)
add_test(NAME ${test_name} COMMAND ${test_name})
Expand All @@ -230,7 +240,7 @@ endforeach()
####################################

# Install library
install(TARGETS Monitoring 5-Benchmark
install(TARGETS Monitoring 5-Benchmark 8-DbFiller
EXPORT MonitoringTargets
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ See the table below to find `URI`s for supported backends:
| InfluxDB | Unix socket | `influxdb-unix` | - | `info` |
| InfluxDB | StdOut | `influxdb-stdout` | - | `info` |
| InfluxDB | Kafka | `influxdb-kafka` | Kafka topic | `info` |
| InfluxDB 2.x | HTTP | `influxdbv2` | `org=ORG&bucket=BUCKET&token=TOKEN` | `info` |
| ApMon | UDP | `apmon` | - | `info` |
| StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` |

Expand All @@ -62,7 +63,7 @@ A metric consist of 5 parameters:
| Parameter name | Type | Required | Default |
| -------------- |:--------------------------------:|:--------:| -----------------------:|
| name | string | yes | - |
| values | map&lt;string, int/double/string/uint64_t&gt; | no/1 | - |
| values | map&lt;string, int/double/string/uint64_t&gt; | no/1 | - |
| timestamp | time_point&lt;system_clock&gt; | no | current time |
| verbosity | Enum (Debug/Info/Prod) | no | Verbosity::Info |
| tags | map | no | host and process names |
Expand Down
69 changes: 35 additions & 34 deletions examples/5-Benchmark.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,34 @@ using namespace o2::monitoring;

int main(int argc, char* argv[])
{
int sleep = 1000000;
int count = 1;
int measurements = 1;
int flps = 1;

std::srand(std::time(nullptr));

std::random_device rd;
std::mt19937 mt(rd());

std::uniform_real_distribution<double> doubleDist(1.0, 100.0);
std::uniform_int_distribution<> intDist(1, 100);

boost::program_options::options_description desc("Allowed options");
desc.add_options()("sleep", boost::program_options::value<int>(), "Thread sleep in microseconds")("url", boost::program_options::value<std::string>()->required(), "URL to monitoring backend (or list of comma seperated URLs)")("id", boost::program_options::value<std::string>(), "Instance ID")("count", boost::program_options::value<int>(), "Number of loop cycles")("multiple", boost::program_options::bool_switch()->default_value(false), "Sends multiple metrics per measurement")("latency", boost::program_options::bool_switch()->default_value(false), "Sends timestamp as a value")("monitor", boost::program_options::bool_switch()->default_value(false), "Enabled process monitor")("buffer", boost::program_options::value<int>(), "Creates buffr of given size")("measurements", boost::program_options::value<int>(), "Number of different measurements")("flps", boost::program_options::value<int>(), "Number of FLPs (tags)");
desc.add_options()
("sleep", boost::program_options::value<int>()->default_value(1000000), "Thread sleep in microseconds")
("url", boost::program_options::value<std::string>()->required(), "URL to monitoring backend (or list of comma seperated URLs)")
("count", boost::program_options::value<int>()->default_value(1), "Number of loop cycles")
("multiple", boost::program_options::bool_switch()->default_value(false), "Sends multiple metrics per measurement")
("latency", boost::program_options::bool_switch()->default_value(false), "Sends timestamp as a value")
("monitor", boost::program_options::bool_switch()->default_value(false), "Enabled process monitor")
("buffer", boost::program_options::value<int>(), "Creates buffr of given size")
("measurements", boost::program_options::value<int>()->default_value(1), "Number of different measurements")
("flps", boost::program_options::value<int>()->default_value(1), "Number of FLPs")
("crus", boost::program_options::value<int>()->default_value(1), "Number of CRUss (optional)");

boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);

if (vm.count("flps")) {
flps = vm["flps"].as<int>();
}

if (vm.count("sleep")) {
sleep = vm["sleep"].as<int>();
}

if (vm.count("count")) {
count = vm["count"].as<int>();
}

if (vm.count("measurements")) {
measurements = vm["measurements"].as<int>();
}
int flps = vm["flps"].as<int>();
int crus = vm["crus"].as<int>();
int sleep = vm["sleep"].as<int>();
int count = vm["count"].as<int>();
int measurements = vm["measurements"].as<int>();

auto monitoring = MonitoringFactory::Get(vm["url"].as<std::string>());
if (vm["monitor"].as<bool>()) {
Expand All @@ -55,12 +48,18 @@ int main(int argc, char* argv[])
if (vm["multiple"].as<bool>()) {
for (int j = 1; j <= count; j++) {
for (int i = 1; i <= measurements; i++) {
monitoring->send(Metric{"measurement" + std::to_string(i)}
.addValue(doubleDist(mt), "doubleMetric")
.addValue(intDist(mt), "intMetric")
.addValue(std::rand() % 2, "onOffMetric")
);
std::this_thread::sleep_for(std::chrono::microseconds(sleep));
for (int k = 1; k <= flps; k++) {
for (int l = 1; l <= crus; l++) {
monitoring->send(Metric{"measurement" + std::to_string(i)}
.addValue(doubleDist(mt), "double_metric")
.addValue(intDist(mt), "int_metric")
.addValue(std::rand() % 2, "on_off_metric")
.addTag(tags::Key::FLP, k)
.addTag(tags::Key::CRU, l)
);
}
std::this_thread::sleep_for(std::chrono::microseconds(sleep));
}
}
if (!vm.count("count"))
j--;
Expand All @@ -84,12 +83,14 @@ int main(int argc, char* argv[])
for (int j = 1; j <= count; j++) {
for (int i = 1; i <= measurements; i++) {
for (int k = 1; k <= flps; k++) {
monitoring->send(Metric{doubleDist(mt), "doubleMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k));
monitoring->send(Metric{intDist(mt), "intMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k));
monitoring->send(Metric{std::rand() % 2, "onOffMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k));
std::this_thread::sleep_for(std::chrono::microseconds(10));
for (int l = 1; l <= crus; l++) {
monitoring->send(Metric{doubleDist(mt), "doubleMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k).addTag(tags::Key::CRU, l));
monitoring->send(Metric{intDist(mt), "intMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k).addTag(tags::Key::CRU, l));
monitoring->send(Metric{std::rand() % 2, "onOffMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k).addTag(tags::Key::CRU, l));
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
std::this_thread::sleep_for(std::chrono::microseconds(sleep));
}
std::this_thread::sleep_for(std::chrono::microseconds(sleep));
}
if (!vm.count("count"))
j--;
Expand Down
57 changes: 57 additions & 0 deletions examples/8-DbFiller.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
///
/// \file 8-DbFiller.cxx
/// \author Adam Wegrzynek <adam.wegrzynek@cern.ch>
///

#include "Monitoring/MonitoringFactory.h"
#include <boost/program_options.hpp>
#include <random>

using o2::monitoring::Metric;
using namespace o2::monitoring;

int main(int argc, char* argv[])
{
std::srand(std::time(nullptr));

std::random_device rd;
std::mt19937 mt(rd());

std::uniform_real_distribution<double> doubleDist(1.0, 100.0);
std::uniform_int_distribution<> intDist(1, 100);

boost::program_options::options_description desc("Allowed options");
desc.add_options()
("url", boost::program_options::value<std::string>()->required(), "URL to monitoring backend (or list of comma seperated URLs)")
("measurements", boost::program_options::value<int>()->default_value(1), "Number of different measurements")
("flps", boost::program_options::value<int>()->default_value(1), "Number of FLPs")
("since", boost::program_options::value<int>()->default_value(60), "Start filling since (s)")
("interval", boost::program_options::value<int>()->default_value(1), "Interval between metrics (s)");

boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);

auto monitoring = MonitoringFactory::Get(vm["url"].as<std::string>());
monitoring->addGlobalTag(tags::Key::Subsystem, tags::Value::QC);

auto now = Metric::getCurrentTimestamp();
auto interval = std::chrono::seconds(vm["interval"].as<int>());
auto since = now - std::chrono::seconds(vm["since"].as<int>());

for (;;) {
since = since + interval;
for (int i = 1; i <= vm["measurements"].as<int>(); i++) {
for (int k = 1; k <= vm["flps"].as<int>(); k++) {
auto metric = Metric{"metric" + std::to_string(i), Metric::DefaultVerbosity, since}
.addValue(doubleDist(mt), "double")
.addValue(intDist(mt), "int")
.addValue(std::rand() % 2, "onOff")
.addTag(tags::Key::FLP, k);
monitoring->send(std::move(metric));
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
if (since > now) break;
}
}
2 changes: 1 addition & 1 deletion include/Monitoring/Metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Metric

/// Constructor that does not require any value to be specified, .addValue needs to be used
/// \param name metric name
Metric(const std::string& name, Verbosity verbosity = Metric::DefaultVerbosity);
Metric(const std::string& name, Verbosity verbosity = Metric::DefaultVerbosity, const std::chrono::time_point<std::chrono::system_clock>& timestamp = Metric::getCurrentTimestamp());

/// Adds additional int value to metric
/// \param value
Expand Down
2 changes: 1 addition & 1 deletion src/Metric.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Metric&& Metric::addValue(const std::variant<int, std::string, double, uint64_t>
return std::move(*this);
}

Metric::Metric(const std::string& name, Verbosity verbosity) : mName(name), mTimestamp(Metric::getCurrentTimestamp()), mVerbosity(verbosity)
Metric::Metric(const std::string& name, Verbosity verbosity, const std::chrono::time_point<std::chrono::system_clock>& timestamp) : mName(name), mTimestamp(timestamp), mVerbosity(verbosity)
{
overwriteVerbosity();
}
Expand Down
36 changes: 36 additions & 0 deletions src/MonitoringFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include "Transports/Kafka.h"
#endif

#ifdef O2_MONITORING_WITH_CURL
#include "Transports/HTTP.h"
#endif

namespace o2
{
/// ALICE O2 Monitoring system
Expand All @@ -56,6 +60,37 @@ std::unique_ptr<Backend> getStdOut(http::url uri)
}
}

/// Extracts token from header add sets it as addition HTTP header
/// http://localhost:9999/?org=YOUR_ORG&bucket=YOUR_BUCKET&token=AUTH_TOKEN
/// ->
/// http://localhost:9999/api/v2/write?org=YOUR_ORG&bucket=YOUR_BUCKET
/// --header "Authorization: Token YOURAUTHTOKEN"
std::unique_ptr<Backend> getInfluxDbv2(http::url uri)
{
#ifdef O2_MONITORING_WITH_CURL
std::string tokenLabel = "token=";
std::string path = "/api/v2/write";
std::string query = uri.search;

auto tokenStart = query.find(tokenLabel);
auto tokenEnd = query.find('&', tokenStart);
if (tokenEnd == std::string::npos) {
tokenEnd = query.length();
}
std::string token = query.substr(tokenStart + tokenLabel.length(), tokenEnd-(tokenStart + tokenLabel.length()));
// make sure ampersand is removed
if (tokenEnd < query.length() && query.at(tokenEnd) == '&') tokenEnd++;
if (tokenStart > 0 && query.at(tokenStart-1) == '&') tokenStart--;
query.erase(tokenStart, tokenEnd - tokenStart);

auto transport = std::make_unique<transports::HTTP>("http://" + uri.host + ':' + std::to_string(uri.port) + path + '?' + query);
transport->addHeader("Authorization: Token " + token);
return std::make_unique<backends::InfluxDB>(std::move(transport));
#else
throw std::runtime_error("HTTP transport is not enabled");
#endif
}

std::unique_ptr<Backend> getInfluxDb(http::url uri)
{
auto const position = uri.protocol.find_last_of('-');
Expand Down Expand Up @@ -129,6 +164,7 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url)
{"influxdb-unix", getInfluxDb},
{"influxdb-stdout", getInfluxDb},
{"influxdb-kafka", getInfluxDb},
{"influxdbv2", getInfluxDbv2},
{"apmon", getApMon},
{"no-op", getNoop}
};
Expand Down
Loading