Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 214 additions & 1 deletion src/core/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
#endif

#include "lang/eval.h"
#include "lang/env.h"
#include "lang/internal.h"
#include "table/sym.h"

/* ===== Compression (delta + RLE) ===== */

Expand Down Expand Up @@ -182,6 +185,126 @@ static bool validate_creds(const uint8_t* buf, uint8_t cred_len,
return ct_eq(pw_buf, secret, 256);
}

/* ===== Connection hooks (.ipc.on.*) =====
*
* Five user-settable lambdas that intercept the connection lifecycle.
* Lookup is by interned sym id; we cache the ids in `hook_syms[]` so the
* fast path is a single ray_env_get + RAY_LAMBDA-type check per dispatch.
*
* `g_current_handle` is the thread-local value `.ipc.handle` reads back
* to Rayfall. HOOK_SCOPE saves/restores it around each invocation, so a
* hook that opens its own connection (calling `.ipc.handle` inside a
* nested `.ipc.send` round-trip) still sees the outer handle when its
* body resumes. Default value -1 means "no hook is currently on the
* stack" — exposed verbatim through the builtin.
*
* Errors:
* - on.open / on.close / on.async: logged to stderr, swallowed.
* - on.sync: error becomes the response (same as a raw `eval` error).
* - on.auth: error treated as reject (handshake refused). */

/* Hook indices must match the order in `src/lang/env.c`
* (g_ipc_hook_syms[]) — the `ray_sym_ipc_hook(idx)` getter assumes this
* mapping. Keep them in lockstep. */
enum {
IPC_HOOK_OPEN = 0,
IPC_HOOK_CLOSE = 1,
IPC_HOOK_SYNC = 2,
IPC_HOOK_ASYNC = 3,
IPC_HOOK_AUTH = 4,
IPC_HOOK_COUNT = 5,
};

static _Thread_local int64_t g_current_handle = -1;

int64_t ray_ipc_current_handle(void) {
return g_current_handle;
}

/* Fetch the hook lambda if one is installed and is in fact a lambda.
* Non-lambda bindings (cleared via `set .ipc.on.X 0` or never bound)
* yield NULL — caller falls back to default behaviour. Returns a
* borrowed ref; do not release. Sym IDs come from env.c's central
* cache, so a runtime destroy/recreate cycle invalidates them in one
* place and the lookup here always sees IDs from the current sym
* table. */
static ray_t* hook_lookup(int idx) {
int64_t sym = ray_sym_ipc_hook(idx);
if (sym < 0) return NULL;
ray_t* fn = ray_env_get(sym);
if (!fn || fn->type != RAY_LAMBDA) return NULL;
return fn;
}

/* Call a single-arg hook for lifecycle events (on.open / on.close).
* Errors are logged and swallowed — a buggy logging hook must never
* wedge connection teardown. */
static void hook_call_lifecycle(int idx, int64_t handle) {
ray_t* fn = hook_lookup(idx);
if (!fn) return;
ray_t* arg = make_i64(handle);
if (!arg || RAY_IS_ERR(arg)) { if (arg) ray_release(arg); return; }
int64_t prev = g_current_handle;
g_current_handle = handle;
ray_t* r = call_fn1(fn, arg);
g_current_handle = prev;
if (r && RAY_IS_ERR(r)) {
const char* name = (idx == IPC_HOOK_OPEN) ? ".ipc.on.open" : ".ipc.on.close";
fprintf(stderr, "ipc: %s hook raised an error (handle=%lld)\n",
name, (long long)handle);
}
ray_release(arg);
if (r && r != RAY_NULL_OBJ) ray_release(r);
}

/* Call the on.auth hook with (user, pass) string atoms. Returns:
* - 1 → hook ran and returned truthy; caller continues the handshake.
* - 0 → hook ran and returned falsy (or errored); caller rejects.
* - -1 → no hook installed; caller uses the existing pass-through.
* The constant-time secret compare in validate_creds always runs first,
* so this hook can only narrow access — never widen it. */
static int hook_call_auth(int64_t handle, const uint8_t* cred_buf,
uint8_t cred_len) {
ray_t* fn = hook_lookup(IPC_HOOK_AUTH);
if (!fn) return -1;

/* Split user:pass exactly the way validate_creds does — colon-
* separated, with the leading user part possibly empty. Strip the
* trailing NUL the client appends so the hook sees clean strings. */
const char* creds = (const char*)cred_buf;
const char* colon = memchr(creds, ':', cred_len);
const char* upart = creds;
size_t ulen = colon ? (size_t)(colon - creds) : 0;
const char* ppart = colon ? colon + 1 : creds;
size_t plen = colon ? (size_t)(cred_len - (ppart - creds))
: (size_t)cred_len;
if (plen > 0 && ppart[plen - 1] == '\0') plen--;

ray_t* u = ray_str(upart, ulen);
ray_t* p = ray_str(ppart, plen);
if (!u || !p || RAY_IS_ERR(u) || RAY_IS_ERR(p)) {
if (u && !RAY_IS_ERR(u)) ray_release(u);
if (p && !RAY_IS_ERR(p)) ray_release(p);
return 0; /* allocation failure → reject conservatively */
}
int64_t prev = g_current_handle;
g_current_handle = handle;
ray_t* r = call_fn2(fn, u, p);
g_current_handle = prev;
ray_release(u);
ray_release(p);

int ok;
if (!r || RAY_IS_ERR(r)) {
fprintf(stderr, "ipc: .ipc.on.auth hook raised an error — rejecting\n");
ok = 0;
} else {
ok = is_truthy(r) ? 1 : 0;
}
if (r && r != RAY_NULL_OBJ) ray_release(r);
return ok;
}

static void send_response(ray_sock_t fd, ray_t* result)
{
int64_t ser_size = ray_serde_size(result);
Expand Down Expand Up @@ -299,7 +422,28 @@ static ray_t* eval_payload_core(uint8_t* payload, size_t payload_len,

ray_t* result = NULL;
if (msg && !RAY_IS_ERR(msg)) {
if (msg->type == -RAY_STR) {
/* Dispatch through `.ipc.on.sync` / `.ipc.on.async` hook if
* installed; otherwise fall back to v1's inline-eval default.
* The hook receives the raw deserialised payload — same shape
* any other Rayfall lambda would see — so a hook installed
* as `{[m] eval m}` reproduces the default behaviour. */
int hook_idx = (hdr->msgtype == RAY_IPC_MSG_SYNC) ? IPC_HOOK_SYNC
: IPC_HOOK_ASYNC;
ray_t* hook = hook_lookup(hook_idx);
if (hook) {
result = call_fn1(hook, msg);
ray_release(msg);
/* Async errors have nowhere to go on the wire (async never
* sends a response), so log + drop here. Without this the
* caller would silently release the error and the operator
* would never see the hook misbehaving. */
if (result && RAY_IS_ERR(result) &&
hdr->msgtype == RAY_IPC_MSG_ASYNC) {
fprintf(stderr, "ipc: .ipc.on.async hook raised an error\n");
ray_release(result);
result = NULL;
}
} else if (msg->type == -RAY_STR) {
const char* str = ray_str_ptr(msg);
size_t slen = ray_str_len(msg);
if (str && slen > 0) {
Expand Down Expand Up @@ -492,6 +636,10 @@ static ray_t* ipc_read_handshake(ray_poll_t* poll, ray_selector_t* sel)
cd->phase = RAY_IPC_PHASE_HEADER;
sel->rx.read_fn = ipc_read_header;
ray_poll_rx_request(poll, sel, sizeof(ray_ipc_header_t));
/* No-auth path: connection is now fully ready for inbound messages.
* Fire `.ipc.on.open` AFTER we've requested the next read, so a
* hook that calls back into the server can't race the read pump. */
hook_call_lifecycle(IPC_HOOK_OPEN, sel->id);
return NULL;
}

Expand Down Expand Up @@ -525,6 +673,17 @@ static ray_t* ipc_read_creds(ray_poll_t* poll, ray_selector_t* sel)

bool ok = validate_creds(sel->rx.buf->data + 1, cred_len,
poll->auth_secret);

/* Secondary user-defined check via `.ipc.on.auth`. Only consulted
* when the constant-time secret compare already passed — this hook
* can narrow access (deny extras) but never widen it. Errors and
* falsy returns flip `ok` to false, triggering the same reject byte
* + deregister the secret-mismatch path would. */
if (ok) {
int hook_ok = hook_call_auth(sel->id, sel->rx.buf->data + 1, cred_len);
if (hook_ok == 0) ok = false;
}

uint8_t result = ok ? 0x00 : 0x01;
ray_sock_send((ray_sock_t)sel->fd, &result, 1);

Expand All @@ -536,6 +695,10 @@ static ray_t* ipc_read_creds(ray_poll_t* poll, ray_selector_t* sel)
cd->phase = RAY_IPC_PHASE_HEADER;
sel->rx.read_fn = ipc_read_header;
ray_poll_rx_request(poll, sel, sizeof(ray_ipc_header_t));
/* Auth path: fully handshaked and authed — connection is now ready
* for inbound messages. Same ordering as the no-auth branch above:
* fire AFTER the next read is requested. */
hook_call_lifecycle(IPC_HOOK_OPEN, sel->id);
return NULL;
}

Expand Down Expand Up @@ -573,10 +736,19 @@ static ray_t* ipc_read_payload(ray_poll_t* poll, ray_selector_t* sel)
bool prev_restricted = ray_eval_get_restricted();
ray_eval_set_restricted(cd->restricted);

/* Expose this connection's selector id to `.ipc.handle` for the
* duration of any `.ipc.on.sync` / `.ipc.on.async` hook that runs
* inside eval_payload. Save/restore so a hook that itself opens
* a nested IPC round-trip doesn't leave the wrong handle visible
* when its caller resumes. */
int64_t prev_handle = g_current_handle;
g_current_handle = sel->id;

/* Eval and produce result */
ray_t* result = eval_payload(sel->rx.buf->data,
(size_t)sel->rx.buf->offset, &cd->hdr);

g_current_handle = prev_handle;
ray_eval_set_restricted(prev_restricted);

/* Send response for sync messages */
Expand All @@ -595,7 +767,20 @@ static ray_t* ipc_read_payload(ray_poll_t* poll, ray_selector_t* sel)
static void ipc_on_close(ray_poll_t* poll, ray_selector_t* sel)
{
(void)poll;
/* Fire `.ipc.on.close` BEFORE tearing the per-conn state down so a
* hook reading `.ipc.handle` still sees this connection's id, and
* before the listener's own close path (which would otherwise also
* route through here) runs the hook with a stale fd. Guard on:
* - sel->data: the listener itself has no conn data.
* - phase ≥ HEADER: the connection actually completed handshake
* (otherwise no matching on.open was fired, so on.close must
* also stay silent to keep the pair balanced for the user). */
if (sel->data) {
ray_ipc_conn_data_t* cd = (ray_ipc_conn_data_t*)sel->data;
if (cd->phase == RAY_IPC_PHASE_HEADER ||
cd->phase == RAY_IPC_PHASE_PAYLOAD) {
hook_call_lifecycle(IPC_HOOK_CLOSE, sel->id);
}
ray_sys_free(sel->data);
sel->data = NULL;
}
Expand Down Expand Up @@ -630,6 +815,15 @@ int64_t ray_ipc_listen(ray_poll_t* poll, uint16_t port)

static void conn_close(ray_ipc_server_t* srv, ray_ipc_conn_t* c)
{
/* `.ipc.on.close` fires only for conns that were actually opened —
* a slot whose phase never advanced past HANDSHAKE/CREDS was never
* announced via on.open and so shouldn't be announced via on.close.
* Keeps the pair balanced for the user. */
if (c->phase == RAY_IPC_PHASE_HEADER ||
c->phase == RAY_IPC_PHASE_PAYLOAD) {
hook_call_lifecycle(IPC_HOOK_CLOSE, (int64_t)(c - srv->conns));
}

#if defined(__linux__)
epoll_ctl(srv->poll_fd, EPOLL_CTL_DEL, c->fd, NULL);
#elif defined(__APPLE__)
Expand Down Expand Up @@ -678,6 +872,8 @@ static void conn_on_handshake(ray_ipc_server_t* srv, ray_ipc_conn_t* c)

c->rx_need = sizeof(ray_ipc_header_t);
c->phase = RAY_IPC_PHASE_HEADER;
/* Legacy path mirror of the poll-path post-handshake fire. */
hook_call_lifecycle(IPC_HOOK_OPEN, (int64_t)(c - srv->conns));
}

static void conn_on_header(ray_ipc_server_t* srv, ray_ipc_conn_t* c)
Expand All @@ -702,8 +898,16 @@ static void conn_on_payload(ray_ipc_server_t* srv, ray_ipc_conn_t* c)
bool prev = ray_eval_get_restricted();
ray_eval_set_restricted(srv->restricted);

/* Conn-array index doubles as the handle on the legacy path —
* stable for the connection's lifetime, distinct across active
* connections, freed back to the pool on close. Mirrored shape
* of the poll path's sel->id. */
int64_t prev_handle = g_current_handle;
g_current_handle = (int64_t)(c - srv->conns);

ray_t* result = eval_payload(c->rx_buf, c->rx_len, &c->hdr);

g_current_handle = prev_handle;
ray_eval_set_restricted(prev);

if (c->hdr.msgtype == RAY_IPC_MSG_SYNC)
Expand Down Expand Up @@ -735,6 +939,14 @@ static void conn_on_creds(ray_ipc_server_t* srv, ray_ipc_conn_t* c)
uint8_t cred_len = c->rx_buf[0];
bool ok = validate_creds(c->rx_buf + 1, cred_len, srv->auth_secret);

/* Legacy path mirror of the poll-path on.auth call: same handle-as-
* conn-index convention, same narrowing semantics. */
if (ok) {
int hook_ok = hook_call_auth((int64_t)(c - srv->conns),
c->rx_buf + 1, cred_len);
if (hook_ok == 0) ok = false;
}

uint8_t result = ok ? 0x00 : 0x01;
ray_sock_send(c->fd, &result, 1);

Expand All @@ -748,6 +960,7 @@ static void conn_on_creds(ray_ipc_server_t* srv, ray_ipc_conn_t* c)
c->rx_len = 0;
c->rx_need = sizeof(ray_ipc_header_t);
c->phase = RAY_IPC_PHASE_HEADER;
hook_call_lifecycle(IPC_HOOK_OPEN, (int64_t)(c - srv->conns));
}

static void conn_on_readable(ray_ipc_server_t* srv, ray_ipc_conn_t* c)
Expand Down
10 changes: 10 additions & 0 deletions src/core/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ size_t ray_ipc_decompress(const uint8_t* src, size_t clen,
#define RAY_IPC_FLAG_VERBOSE 0x04
#define RAY_IPC_MAX_CONNS 256

/* ===== Connection hooks (.ipc.on.*) ===== */

/* Current connection handle, readable from Rayfall via the `.ipc.handle`
* builtin while a `.ipc.on.*` hook is on the stack. Set/restored by the
* server around every hook invocation; defaults to -1 outside any hook.
* Thread-local — IPC dispatch is single-threaded today, but the storage
* class keeps the value scoped to the dispatch thread should that ever
* change. */
int64_t ray_ipc_current_handle(void);

/* ===== Poll-based IPC (new API) ===== */

/* Register IPC listener on poll. Returns selector id or -1. */
Expand Down
11 changes: 9 additions & 2 deletions src/lang/compile.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,18 @@ static void compile_list(compiler_t *c, ray_t *ast) {
* ray_env_set_local enforces on the tree-walking path.
* Setting c->error aborts bytecode emission; call_lambda
* then falls back to the tree-walking interpreter which
* raises the proper `reserve` error via ray_let_fn. */
* raises the proper `reserve` error via ray_let_fn.
* The five `.ipc.on.*` connection-hook names are exempt —
* they're user-settable, so a `let .ipc.on.open ...` in a
* compiled body must emit the same OP_STOREENV as any other
* local binding. Without this, the bytecode path would
* abort and silently fall back to the interpreter on every
* such write. */
if (sym_id == sf_let && n == 3) {
ray_t *name_obj = elems[1];
if (name_obj->type != -RAY_SYM ||
ray_sym_is_reserved(name_obj->i64)) {
(ray_sym_is_reserved(name_obj->i64) &&
!ray_sym_is_ipc_hook(name_obj->i64))) {
c->error = true;
return;
}
Expand Down
Loading
Loading