Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions src/runtime/object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,41 @@ Author: Leonardo de Moura
#define LEAN_SUPPORTS_BACKTRACE 0
#endif

// POSIX named semaphores for the experimental cross-process jobserver
// (env var `LEAN_JOB_SEMAPHORE=/name`, or `LEAN_JOB_SEMAPHORE_AUTO=N` to
// auto-create one and propagate it to children). Linux + macOS only.
#if !defined(_WIN32) && !defined(LEAN_EMSCRIPTEN)
#define LEAN_JOBSERVER_POSIX 1
#include <semaphore.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <cstring>
#else
#define LEAN_JOBSERVER_POSIX 0
#endif

#if LEAN_JOBSERVER_POSIX
// Name of a semaphore created by this process; unlinked on exit.
static char * g_owned_sem_name = nullptr;

static void unlink_owned_sem() {
if (g_owned_sem_name) {
sem_unlink(g_owned_sem_name);
free(g_owned_sem_name);
g_owned_sem_name = nullptr;
}
}

// Per-worker-thread flag: does this thread currently hold a jobserver token?
// Set by `acquire_token` / `wait_for`'s reclaim, cleared by `release_token`
// or by `wait_for`'s release. When `wait_for` cannot reclaim a token
// non-blockingly, the worker continues running its task un-gated and this
// flag stays false; the worker-loop's `release_token` then skips its
// `sem_post`, keeping per-worker token accounting balanced.
static thread_local bool g_holds_token = false;
#endif

#if LEAN_SUPPORTS_BACKTRACE
#include <execinfo.h>
#include <unistd.h>
Expand Down Expand Up @@ -719,6 +754,41 @@ class task_manager {
condition_variable m_task_finished_cv;
condition_variable m_dedicated_finished_cv;
bool m_shutting_down{false};
#if LEAN_JOBSERVER_POSIX
// Optional cross-process token pool. When non-null, standard workers must
// acquire a token before running a task and release it after, so that the
// total number of concurrently running standard workers across all
// processes sharing the semaphore does not exceed the initial count.
sem_t * m_jobserver_sem{nullptr};
#endif

// Acquire a token before running a task. Blocks on the global semaphore.
// Must be called with `lock` held; temporarily releases it.
void acquire_token(unique_lock<mutex> & lock) {
#if LEAN_JOBSERVER_POSIX
if (!m_jobserver_sem) return;
lock.unlock();
while (sem_wait(m_jobserver_sem) != 0 && errno == EINTR) {}
lock.lock();
g_holds_token = true;
#else
(void)lock;
#endif
}

// Release the token currently held by this worker to the global semaphore,
// if any. A `wait_for` that couldn't reclaim its token non-blockingly
// continues un-gated and leaves `g_holds_token == false`, in which case
// we skip the `sem_post` to keep accounting balanced.
void release_token() {
#if LEAN_JOBSERVER_POSIX
if (!m_jobserver_sem) return;
if (g_holds_token) {
sem_post(m_jobserver_sem);
g_holds_token = false;
}
#endif
}

lean_task_object * dequeue() {
lean_assert(m_queues_size != 0);
Expand Down Expand Up @@ -808,9 +878,23 @@ class task_manager {
continue;
}

// Acquire a jobserver token (drops the mutex while blocked).
// After waking we must re-check conditions, because the queue
// may have been drained or shutdown may have been requested.
acquire_token(lock);
#if LEAN_JOBSERVER_POSIX
if (m_jobserver_sem &&
(m_queues_size == 0 || m_shutting_down ||
m_std_workers.size() - m_idle_std_workers >= m_max_std_workers)) {
release_token();
continue;
}
#endif

lean_task_object * t = dequeue();
m_idle_std_workers--;
run_task(lock, t);
release_token();
m_idle_std_workers++;
reset_heartbeat();
}
Expand Down Expand Up @@ -914,6 +998,38 @@ class task_manager {
public:
task_manager(unsigned max_std_workers):
m_max_std_workers(max_std_workers) {
#if LEAN_JOBSERVER_POSIX
if (char const * name = std::getenv("LEAN_JOB_SEMAPHORE")) {
// Attach as a participant in an existing jobserver.
sem_t * s = sem_open(name, 0);
if (s != SEM_FAILED) {
m_jobserver_sem = s;
}
} else {
// No jobserver set up yet; create one with `max_std_workers`
// slots (overridable via `LEAN_JOB_SEMAPHORE_AUTO=N`) and hand it
// to children via env. Do NOT gate this process: the creator is
// typically an orchestrator (e.g. `lake`) whose workers block on
// subprocesses, and gating it would deadlock the pool.
int count = (int)max_std_workers;
if (char const * auto_n = std::getenv("LEAN_JOB_SEMAPHORE_AUTO")) {
count = std::atoi(auto_n);
}
if (count > 0) {
char buf[64];
std::snprintf(buf, sizeof buf, "/lean-jobs-%d", (int)getpid());
sem_unlink(buf);
sem_t * s = sem_open(buf, O_CREAT | O_EXCL, 0600, (unsigned)count);
if (s != SEM_FAILED) {
sem_close(s); // the named semaphore persists until unlink
setenv("LEAN_JOB_SEMAPHORE", buf, 1);
unsetenv("LEAN_JOB_SEMAPHORE_AUTO");
g_owned_sem_name = strdup(buf);
std::atexit(unlink_owned_sem);
}
}
}
#endif
}

~task_manager() {
Expand All @@ -931,6 +1047,12 @@ class task_manager {
unique_lock<mutex> lock(m_mutex);
m_dedicated_finished_cv.wait(lock, [&]() { return m_num_dedicated_workers == 0; });
// never seems to terminate under Emscripten
#endif
#if LEAN_JOBSERVER_POSIX
if (m_jobserver_sem) {
sem_close(m_jobserver_sem);
m_jobserver_sem = nullptr;
}
#endif
}

Expand Down Expand Up @@ -986,10 +1108,37 @@ class task_manager {
spawn_worker();
else
m_queue_cv.notify_one();
#if LEAN_JOBSERVER_POSIX
// Release our token globally so a sibling worker (in this or
// another process) can pick up the sub-task while we are
// blocked. If we don't currently hold a token (e.g. a previous
// un-reclaimed `wait_for` left us running un-gated), skip:
// there's no token to release.
if (m_jobserver_sem && g_holds_token) {
sem_post(m_jobserver_sem);
g_holds_token = false;
}
#endif
}
m_task_finished_cv.wait(lock, [&]() { return t->m_value != nullptr; });
if (in_pool) {
m_max_std_workers--;
#if LEAN_JOBSERVER_POSIX
// Try to reclaim a token non-blockingly. If one isn't
// immediately available, continue running un-gated rather than
// blocking in `sem_wait` — that's what previously let new
// `wait_for` calls spawn more workers that piled up in the same
// blocking call. The worker-loop's `release_token` will see
// `g_holds_token == false` and skip its `sem_post`, so the
// initial `sem_post`/`sem_wait` pair stays balanced. The cost is
// brief inter-process oversubscription, bounded by the depth of
// nested `Task.get`.
if (m_jobserver_sem && !g_holds_token) {
if (sem_trywait(m_jobserver_sem) == 0) {
g_holds_token = true;
}
}
#endif
}
}

Expand Down
Loading