Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/mem/heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,35 @@ void ray_heap_destroy(void) {
* from other heaps' freelists, which races with concurrent worker
* destruction during ray_pool_free(). */

/* Purge any of h's blocks from every other heap's foreign list
* BEFORE we munmap. Without this, foreign lists outlive h with
* dangling pointers into unmapped memory and crash on the next
* ray_heap_gc has_foreign walk or heap_flush_foreign. */
for (int fh_id = 0; fh_id < RAY_HEAP_REGISTRY_SIZE; fh_id++) {
ray_heap_t* fh_heap = ray_heap_registry[fh_id];
if (!fh_heap || fh_heap == h) continue;
ray_t** pp = &fh_heap->foreign;
ray_t* curr = *pp;
while (curr) {
ray_t* next = curr->fl_next;
bool in_h = false;
for (uint32_t i = 0; i < h->pool_count; i++) {
uintptr_t pb = (uintptr_t)h->pools[i].base;
uintptr_t pe = pb + BSIZEOF(h->pools[i].pool_order);
if ((uintptr_t)curr >= pb && (uintptr_t)curr < pe) {
in_h = true;
break;
}
}
if (in_h) {
*pp = next;
} else {
pp = &curr->fl_next;
}
curr = next;
}
}

/* Munmap all tracked pools. File-backed pools also need their fd
* closed and their tempfile unlinked so the swap directory doesn't
* accumulate orphans. */
Expand Down Expand Up @@ -1404,6 +1433,28 @@ void ray_heap_gc(void) {
gh->slabs[si].count = dst;
}

/* Purge any [pb, pe) blocks from every other heap's foreign
* list BEFORE munmap. The has_foreign check above is racy
* vs concurrent ray_free, and any block left here becomes
* a dangling pointer that crashes subsequent Pass 4 walks
* at fb->fl_next. Reading fl_next is safe here because
* the pool is still mapped. */
for (int fh_id = 0; fh_id < RAY_HEAP_REGISTRY_SIZE; fh_id++) {
ray_heap_t* fh_heap = ray_heap_registry[fh_id];
if (!fh_heap || fh_heap == gh) continue;
ray_t** pp = &fh_heap->foreign;
ray_t* curr = *pp;
while (curr) {
ray_t* next = curr->fl_next;
if ((uintptr_t)curr >= pb && (uintptr_t)curr < pe) {
*pp = next;
} else {
pp = &curr->fl_next;
}
curr = next;
}
}

ray_vm_free(phdr->vm_base, BSIZEOF(po));
/* File-backed pools also need their fd closed and tempfile
* unlinked, mirroring the heap_destroy path. */
Expand Down
151 changes: 151 additions & 0 deletions test/rfl/agg/atom_i64_med_topk.rfl
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
;; Targeted coverage for three small low-coverage helpers:
;;
;; - src/ops/agg.c::agg_atom_i64_for_type — per-base-type atom builder
;; used by agg_parted_minmax to emit a typed scalar for the
;; min/max of a PARTED column. Each switch arm corresponds to a
;; distinct narrow integer / temporal base type. Reached by
;; constructing parted tables via .db.splayed.set + .db.parted.get
;; and reducing the narrow column with (min ...) / (max ...).
;;
;; - src/ops/group.c::med_is_null — per-(group, row) null check used
;; by the parallel median kernel ray_median_per_group_buf and the
;; per-group topk kernel ray_topk_per_group_buf. Each switch arm
;; (F64 / I64 / I32 / I16 / U8) is driven by feeding a null-bearing
;; column of that type into (med col) by:g. Multi-key by: forces
;; the LIST-cell topk path that also calls med_is_null.
;;
;; - src/ops/group.c::topk_read_i64 — per-(row) reader used by the
;; ray_topk_per_group_buf integer arm. Each switch arm
;; (I64/TIMESTAMP, I32/DATE/TIME, I16, BOOL/U8) is driven by routing
;; (top col K) / (bot col K) through the LIST-cell path (multi-key
;; by:) over each width.
;;
;; Pre-flight fixture scrub: the PARTED tests reuse /tmp/rfl_atomi64_*
;; subtrees from prior runs; remove them or .db.splayed.set fails on
;; pre-existing sym.lk files.
(.sys.exec "rm -rf /tmp/rfl_atomi64_bool /tmp/rfl_atomi64_u8 /tmp/rfl_atomi64_i16 /tmp/rfl_atomi64_i32 /tmp/rfl_atomi64_date /tmp/rfl_atomi64_time /tmp/rfl_atomi64_ts /tmp/rfl_atomi64_i64")

;; ════════════════════════════════════════════════════════════════════
;; agg_atom_i64_for_type — exercise each base-type arm via PARTED
;; min/max. Building a 2-segment parted table forces both segments
;; through agg_parted_minmax → agg_atom_i64_for_type(base, best_i).
;; ════════════════════════════════════════════════════════════════════

;; RAY_BOOL arm — parted BOOL column, max → ray_bool(v != 0).
(set Tb-A (table [b] (list (as 'BOOL [false false true]))))
(set Tb-B (table [b] (list (as 'BOOL [false false false]))))
(.db.splayed.set "/tmp/rfl_atomi64_bool/2024.01.01/t/" Tb-A)
(.db.splayed.set "/tmp/rfl_atomi64_bool/2024.01.02/t/" Tb-B)
(max (at (.db.parted.get "/tmp/rfl_atomi64_bool/" 't) 'b)) -- true

;; RAY_U8 arm — parted U8 column.
(set Tu-A (table [u] (list (as 'U8 [3 5 7]))))
(set Tu-B (table [u] (list (as 'U8 [2 9 4]))))
(.db.splayed.set "/tmp/rfl_atomi64_u8/2024.01.01/t/" Tu-A)
(.db.splayed.set "/tmp/rfl_atomi64_u8/2024.01.02/t/" Tu-B)
(max (at (.db.parted.get "/tmp/rfl_atomi64_u8/" 't) 'u)) -- (as 'U8 9)

;; RAY_I16 arm — parted I16 column.
(set Th-A (table [h] (list (as 'I16 [10 20 30]))))
(set Th-B (table [h] (list (as 'I16 [5 15 25]))))
(.db.splayed.set "/tmp/rfl_atomi64_i16/2024.01.01/t/" Th-A)
(.db.splayed.set "/tmp/rfl_atomi64_i16/2024.01.02/t/" Th-B)
(min (at (.db.parted.get "/tmp/rfl_atomi64_i16/" 't) 'h)) -- (as 'I16 5)

;; RAY_I32 arm — parted I32 column.
(set Ti-A (table [i] (list (as 'I32 [100 200 300]))))
(set Ti-B (table [i] (list (as 'I32 [400 50 250]))))
(.db.splayed.set "/tmp/rfl_atomi64_i32/2024.01.01/t/" Ti-A)
(.db.splayed.set "/tmp/rfl_atomi64_i32/2024.01.02/t/" Ti-B)
(max (at (.db.parted.get "/tmp/rfl_atomi64_i32/" 't) 'i)) -- (as 'I32 400)

;; RAY_TIME arm — parted TIME column (DATE is excluded by agg_parted_sum
;; but not by minmax; use TIME here to exercise the ray_time arm).
(set Tt-A (table [t] (list [00:01:00 00:02:30 00:03:45])))
(set Tt-B (table [t] (list [00:00:15 00:05:00 00:04:00])))
(.db.splayed.set "/tmp/rfl_atomi64_time/2024.01.01/t/" Tt-A)
(.db.splayed.set "/tmp/rfl_atomi64_time/2024.01.02/t/" Tt-B)
(max (at (.db.parted.get "/tmp/rfl_atomi64_time/" 't) 't)) -- 00:05:00

;; RAY_DATE arm — parted DATE *data* column (distinct from the partition
;; key column). Per agg.c:128 sum on RAY_DATE returns type-error; minmax
;; does not — so use min/max here.
(set Td-A (table [dv] (list [2024.01.01 2024.06.15 2024.03.10])))
(set Td-B (table [dv] (list [2023.12.31 2024.12.01 2024.08.20])))
(.db.splayed.set "/tmp/rfl_atomi64_date/2024.01.01/t/" Td-A)
(.db.splayed.set "/tmp/rfl_atomi64_date/2024.01.02/t/" Td-B)
(min (at (.db.parted.get "/tmp/rfl_atomi64_date/" 't) 'dv)) -- 2023.12.31

;; RAY_TIMESTAMP arm — parted TIMESTAMP column. Literal form is
;; `YYYY.MM.DDDhh:mm:ss.fffffffff` (D-separator), not ISO-T.
(set Tts-A (table [ts] (list [2024.01.01D00:00:01.000000000 2024.01.01D00:00:05.000000000])))
(set Tts-B (table [ts] (list [2024.01.01D00:00:03.000000000 2024.01.01D00:00:07.000000000])))
(.db.splayed.set "/tmp/rfl_atomi64_ts/2024.01.01/t/" Tts-A)
(.db.splayed.set "/tmp/rfl_atomi64_ts/2024.01.02/t/" Tts-B)
(max (at (.db.parted.get "/tmp/rfl_atomi64_ts/" 't) 'ts)) -- 2024.01.01D00:00:07.000000000

;; default (RAY_I64) arm — parted I64 column.
(set Tl-A (table [l] (list [10 20 30 40])))
(set Tl-B (table [l] (list [5 15 25 35])))
(.db.splayed.set "/tmp/rfl_atomi64_i64/2024.01.01/t/" Tl-A)
(.db.splayed.set "/tmp/rfl_atomi64_i64/2024.01.02/t/" Tl-B)
(min (at (.db.parted.get "/tmp/rfl_atomi64_i64/" 't) 'l)) -- 5

;; ════════════════════════════════════════════════════════════════════
;; med_is_null — null-bearing per-group median for each integer/F64
;; src type. (med v) by: g routes to ray_median_per_group_buf, which
;; loops over rows and calls med_is_null(c->base, c->src_type, row)
;; once per (group, row). Distinct groups + at least one null per
;; group force the true/false branches per arm.
;; ════════════════════════════════════════════════════════════════════

;; F64 arm — NaN/null check via memcpy + v != v.
(set Tmf (table [g v] (list [0 0 0 1 1 1] (as 'F64 [1.0 0Nf 3.0 0Nf 5.0 7.0]))))
(sum (at (select {m: (med v) from: Tmf by: g}) 'm)) -- 8.0

;; I64 arm — NULL_I64 sentinel.
(set Tml (table [g v] (list [0 0 0 1 1 1] [10 0N 30 0N 50 70])))
(sum (at (select {m: (med v) from: Tml by: g}) 'm)) -- 80.0

;; I32 arm — NULL_I32 sentinel.
(set Tmi (table [g v] (list [0 0 0 1 1 1] (as 'I32 [10 0N 30 0N 50 70]))))
(sum (at (select {m: (med v) from: Tmi by: g}) 'm)) -- 80.0

;; I16 arm — NULL_I16 sentinel.
(set Tmh (table [g v] (list [0 0 0 1 1 1] (as 'I16 [10 0N 30 0N 50 70]))))
(sum (at (select {m: (med v) from: Tmh by: g}) 'm)) -- 80.0

;; U8 arm — non-nullable, med_is_null returns false unconditionally.
(set Tmu (table [g v] (list [0 0 0 1 1 1] (as 'U8 [10 20 30 40 50 60]))))
(sum (at (select {m: (med v) from: Tmu by: g}) 'm)) -- 70.0

;; ════════════════════════════════════════════════════════════════════
;; topk_read_i64 — per-group LIST-cell top-K over each integer width.
;; Multi-key by: [g h] forces the rowform gate (single-key only) to
;; bypass, dropping into ray_topk_per_group_buf which calls
;; topk_read_i64(base, t, row) for each kept candidate.
;; ════════════════════════════════════════════════════════════════════

;; RAY_I64 arm — 4 (g,h) groups, each has 2 rows; top-2 returns both.
;; Total kept = 10+20+30+40+50+60+70+80 = 360.
(set Tk64 (table [g h v] (list [0 0 0 0 1 1 1 1] [X Y X Y X Y X Y] [10 20 30 40 50 60 70 80])))
(sum (raze (at (select {t: (top v 2) by: [g h] from: Tk64}) 't))) -- 360

;; RAY_I32 arm — DATE and TIME share this case (4-byte memcpy).
(set Tk32 (table [g h v] (list [0 0 0 0 1 1 1 1] [X Y X Y X Y X Y] (as 'I32 [10 20 30 40 50 60 70 80]))))
(sum (raze (at (select {t: (top v 2) by: [g h] from: Tk32}) 't))) -- (as 'I32 360)

;; RAY_I16 arm — bot-1 per (g,h) group = min of {10,30},{20,40},{50,70},{60,80}
;; = 10+20+50+60 = 140.
(set Tk16 (table [g h v] (list [0 0 0 0 1 1 1 1] [X Y X Y X Y X Y] (as 'I16 [10 20 30 40 50 60 70 80]))))
(sum (raze (at (select {b: (bot v 1) by: [g h] from: Tk16}) 'b))) -- (as 'I16 140)

;; RAY_U8 arm — BOOL shares this case (1-byte direct read). bot-2 per
;; (g,h) group keeps both rows; sum = 1+..+8 = 36 (sum on U8 returns I64).
(set Tku8 (table [g h v] (list [0 0 0 0 1 1 1 1] [X Y X Y X Y X Y] (as 'U8 [1 2 3 4 5 6 7 8]))))
(sum (raze (at (select {b: (bot v 2) by: [g h] from: Tku8}) 'b))) -- 36

;; ════════════════════════════════════════════════════════════════════
;; teardown — leave /tmp clean for the next run.
;; ════════════════════════════════════════════════════════════════════
(.sys.exec "rm -rf /tmp/rfl_atomi64_bool /tmp/rfl_atomi64_u8 /tmp/rfl_atomi64_i16 /tmp/rfl_atomi64_i32 /tmp/rfl_atomi64_date /tmp/rfl_atomi64_time /tmp/rfl_atomi64_ts /tmp/rfl_atomi64_i64")
Loading
Loading