Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ AC_ARG_ENABLE([sync-daemon-mpi],
[enable_sync_daemon_mpi="$enableval" user_set_sync_daemon_mpi="yes_$enableval"],
[enable_sync_daemon_mpi="yes" user_set_sync_daemon_mpi="no"])
AC_SUBST([user_set_sync_daemon_mpi])
AC_LANG_PUSH([C++])
AS_IF([test "x$enable_sync_daemon_mpi" != xno],
[AX_MPI([enable_sync_daemon_mpi=yes], [enable_sync_daemon_mpi=no])])
AS_IF([test "x$enable_sync_daemon_mpi" != xno],
[AX_MPI_SESSION([enable_sync_daemon_mpi=yes], [enable_sync_daemon_mpi=no])])
AC_LANG_POP([C++])
AS_IF([test "x$user_set_sync_daemon_mpi" = "xyes_yes" && test "x$enable_sync_daemon_mpi" = "xno"],
[AC_MSG_ERROR([sync-daemon with MPI requested but MPI Sessions are not supported!])])
AS_IF([test "x$user_set_sync_daemon_mpi" != "xyes_no" && test "x$enable_sync_daemon_mpi" = "xno"],
Expand Down
42 changes: 42 additions & 0 deletions integration_tests/light_iprof_only_sync.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env ruby

# Stand-in for the iprof Ruby driver: spawn the configured sync daemon,
# drive the socketpair protocol manually, run the user command in between.
# Used by integration tests to exercise daemon binaries without the rest
# of iprof.

require 'socket'
require 'io/nonblock'

daemon_kind = ENV.fetch('THAPI_SYNC_DAEMON')
daemon = "sync_daemon_#{daemon_kind}"

MSG_INIT = 'INIT'
MSG_LOCAL_BARRIER = 'LOCAL_BARRIER'
MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER'
MSG_FINISH = 'FINISH'
MSG_READY = 'READY'

parent, child = UNIXSocket.pair(:SEQPACKET)
# Ruby sets O_NONBLOCK on sockets by default; the daemon uses blocking
# read, so clear it on the inherited fd.
child.nonblock = false
pid = Process.spawn(daemon, child.fileno.to_s, child.fileno => child.fileno)
child.close

send_and_wait = lambda do |msg|
parent.sendmsg(msg)
reply, = parent.recvmsg(64)
raise "expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY
end

send_and_wait.call(MSG_INIT)
send_and_wait.call(MSG_LOCAL_BARRIER)

system(*ARGV) || exit($?.exitstatus || 1)

send_and_wait.call(MSG_LOCAL_BARRIER)
send_and_wait.call(MSG_GLOBAL_BARRIER)
send_and_wait.call(MSG_FINISH)
parent.close
Process.wait(pid)
64 changes: 0 additions & 64 deletions integration_tests/light_iprof_only_sync.sh

This file was deleted.

9 changes: 7 additions & 2 deletions integration_tests/parallel_execution.bats
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ launch_mpi() {
# THAPI_SYNC_DAEMON=fs Tests

@test "sync_daemon_fs" {
THAPI_SYNC_DAEMON=fs launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.sh clinfo
THAPI_SYNC_DAEMON=fs launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.rb clinfo
}

@test "iprof_fs" {
Expand All @@ -27,7 +27,7 @@ launch_mpi() {

# bats test_tags=mpi_sync_daemon
@test "sync_daemon_mpi" {
THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.sh clinfo
THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.rb clinfo
}

# bats test_tags=mpi_sync_daemon
Expand All @@ -45,6 +45,11 @@ launch_mpi() {
THAPI_SYNC_DAEMON_MPI_NO_FINALIZE=1 THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 iprof ./mpi_helloworld
}

# Non-MPI runs should ignore THAPI_SYNC_DAEMON entirely (no validation, no spawn).
@test "sync_daemon_ignored_without_mpi" {
THAPI_SYNC_DAEMON=whatever-not-in-list iprof -- true
}

# Test Traced Rank

@test "iprof_mpi+traced_ranks" {
Expand Down
11 changes: 7 additions & 4 deletions sampling/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ else
WERROR =
endif

AM_CFLAGS = -Wall -Wextra $(WERROR)
AM_CXXFLAGS = -std=c++17 -Wall -Wextra $(WERROR)

TRACEPOINT_GEN = \
$(srcdir)/sampling_events.yaml

Expand All @@ -29,7 +32,7 @@ nodist_libsamplingtracepoints_la_SOURCES = \
$(SAMPLING_STATIC_PROBES_SRC)

libsamplingtracepoints_la_CPPFLAGS = -I$(top_srcdir)/utils -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./
libsamplingtracepoints_la_CFLAGS = -fPIC -Wall -Wextra -Wno-unused-parameter -Wno-type-limits -Wno-sign-compare $(WERROR) $(LTTNG_UST_CFLAGS)
libsamplingtracepoints_la_CFLAGS = $(AM_CFLAGS) -fPIC -Wno-unused-parameter -Wno-type-limits -Wno-sign-compare $(LTTNG_UST_CFLAGS)
libsamplingtracepoints_la_LDFLAGS = $(LTTNG_UST_LIBS)


Expand All @@ -50,7 +53,7 @@ libThapiSampling_la_SOURCES = \
thapi_sampling.c

libThapiSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include
libThapiSampling_la_CFLAGS = -Wall -Wextra $(WERROR)
libThapiSampling_la_CFLAGS = $(AM_CFLAGS)
libThapiSampling_la_LDFLAGS = -lpthread -version-info 1:0:0 $(LTTNG_UST_LIBS)

bin_PROGRAMS = thapi_sampling_daemon
Expand All @@ -60,15 +63,15 @@ thapi_sampling_daemon_SOURCES = \
thapi_sampling_daemon.cpp

thapi_sampling_daemon_CPPFLAGS = -I$(top_srcdir)/utils/include
thapi_sampling_daemon_CXXFLAGS = -Wall -Wextra $(WERROR)
thapi_sampling_daemon_CXXFLAGS = $(AM_CXXFLAGS)

libHeartbeatSampling_la_SOURCES = heartbeat_sampling_plugin.c

nodist_libHeartbeatSampling_la_SOURCES = \
$(SAMPLING_STATIC_PROBES_INCL)

libHeartbeatSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include
libHeartbeatSampling_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) $(LTTNG_UST_CFLAGS)
libHeartbeatSampling_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter $(LTTNG_UST_CFLAGS)
libHeartbeatSampling_la_LDFLAGS = -avoid-version -module
libHeartbeatSampling_la_LIBADD = libThapiSampling.la libsamplingtracepoints.la -ldl $(LTTNG_UST_LIBS)

Expand Down
53 changes: 26 additions & 27 deletions sampling/thapi_sampling_daemon.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
#include <csignal>
#include "daemon_proto.hpp"
#include <cstdlib>
#include <dlfcn.h>
#include <iostream>
#include <vector>
#define RT_SIGNAL_READY (SIGRTMIN)
#define RT_SIGNAL_FINISH (SIGRTMIN + 3)

typedef void (*plugin_initialize_func)(void);
typedef void (*plugin_finalize_func)(void);
using namespace daemon_proto;

int main(int argc, char **argv) {
using plugin_initialize_func = void (*)();
using plugin_finalize_func = void (*)();

// Setup signaling, to exit the sampling loop
int parent_pid = std::atoi(argv[1]);
sigset_t signal_set;
sigemptyset(&signal_set);
sigaddset(&signal_set, RT_SIGNAL_FINISH);
sigprocmask(SIG_BLOCK, &signal_set, NULL);
constexpr auto WHO = "thapi_sampling_daemon";

int main(int argc, char **argv) {
if (argc < 2) {
std::cerr << "usage: " << argv[0] << " <fd> [plugin.so ...]" << std::endl;
return 1;
}
const int fd = std::atoi(argv[1]);

// DL Open
struct Plugin {
Expand All @@ -33,30 +33,26 @@ int main(int argc, char **argv) {
std::cerr << "Failed to load " << argv[i] << ": " << dlerror() << std::endl;
continue;
}
plugin_initialize_func init_func =
auto init_func =
reinterpret_cast<plugin_initialize_func>(dlsym(handle, "thapi_initialize_sampling_plugin"));

plugin_finalize_func fini_func =
auto fini_func =
reinterpret_cast<plugin_finalize_func>(dlsym(handle, "thapi_finalize_sampling_plugin"));

plugins.push_back({handle, init_func, fini_func});
}

// User pluging
for (const auto &plugin : plugins) {
for (const auto &plugin : plugins)
plugin.initialize();
}

// Signal Ready to manager
kill(parent_pid, RT_SIGNAL_READY);
// Handshake: parent → INIT, daemon → READY
if (recv_expect(WHO, fd, MSG_INIT) < 0)
return 1;
if (send_msg(WHO, fd, MSG_READY) < 0)
return 1;

// Wait for to finish
while (true) {
int signum;
sigwait(&signal_set, &signum);
if (signum == RT_SIGNAL_FINISH)
break;
}
// Wait for shutdown: parent → FINISH
if (recv_expect(WHO, fd, MSG_FINISH) < 0)
return 1;

// Finalization
for (const auto &plugin : plugins) {
Expand All @@ -65,6 +61,9 @@ int main(int argc, char **argv) {
dlclose(plugin.handle);
}

if (send_msg(WHO, fd, MSG_READY) < 0)
return 1;
close(fd);
// Will call the destructor, who will finalize all the not unregistered plugin
return 0;
}
1 change: 1 addition & 0 deletions utils/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ noinst_HEADERS = \
include/utlist.h \
include/json.hpp \
include/magic_enum.hpp \
include/daemon_proto.hpp \
xprof_utils.hpp

nodist_noinst_HEADERS = lttng/tracepoint_gen.h
Expand Down
53 changes: 53 additions & 0 deletions utils/include/daemon_proto.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Wire protocol shared between iprof (parent) and the C/C++ daemons
// (sync_daemon_mpi, thapi_sampling_daemon). Messages are short ASCII
// tokens exchanged over a SOCK_SEQPACKET socketpair; each parent
// command is acknowledged by the daemon with MSG_READY.

#pragma once

#include <cstdio>
#include <iostream>
#include <string_view>
#include <sys/socket.h>
#include <unistd.h>

namespace daemon_proto {

using namespace std::string_view_literals;

constexpr auto MSG_INIT = "INIT"sv;
constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv;
constexpr auto MSG_GLOBAL_BARRIER = "GLOBAL_BARRIER"sv;
constexpr auto MSG_FINISH = "FINISH"sv;
constexpr auto MSG_READY = "READY"sv;

inline int send_msg(const char *who, int fd, std::string_view msg) {
if (write(fd, msg.data(), msg.size()) < 0) {
std::perror(who);
return -1;
}
return 0;
}

// Read one message from fd and verify it matches `want`. Returns 0 on
// match, -1 on syscall failure, EOF, or mismatch.
inline int recv_expect(const char *who, int fd, std::string_view want) {
char buf[64];
const ssize_t n = read(fd, buf, sizeof(buf));
if (n < 0) {
std::perror(who);
return -1;
}
if (n == 0) {
std::cerr << who << ": parent closed socket unexpectedly" << std::endl;
return -1;
}
const std::string_view got(buf, n);
if (got != want) {
std::cerr << who << ": expected " << want << ", got " << got << std::endl;
return -1;
}
return 0;
}

} // namespace daemon_proto
Loading
Loading