Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 79 additions & 18 deletions src/ops/group.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,48 @@ static void cd_part_dedup_fn(void* ctx, uint32_t worker_id,
/* Width-specialised value extraction for the partition pass. Reading
* row-by-row through read_col_i64 was the dispatch overhead in the
* sequential path; specialising on the column width lets the autovec
* pass tighten the loop. */
* pass tighten the loop.
*
* Indexing note: histograms and cursors are keyed by *task index*, not
* worker id. ray_pool_dispatch's ring is work-stealing — the same
* worker_id can claim different tasks across two consecutive
* dispatches, so the row range processed by worker w in pass 1
* (histogram) need not match the range processed by worker w in pass 2
* (scatter). Using worker_id as the cursor key would let pass 2
* scatter writes overshoot the slot reserved by pass 1, mangle the
* partition layout, and over- or under-count distinct values
* non-deterministically. Task index is stable across passes because
* the row range tied to task t is fixed at dispatch-fill time. */
typedef struct {
const void* base;
int64_t* counts; /* P per-partition row counts (per worker) */
int64_t* counts; /* P per-partition row counts (per task) */
uint32_t p_bits;
uint64_t p_mask;
int64_t grain; /* rows per task (last task may have fewer) */
int64_t total; /* total row count */
uint8_t stride_log2; /* log2(elem size) for plain int paths */
uint8_t is_f64;
int8_t type;
uint8_t attrs;
} cd_count_ctx_t;

/* Count rows per partition (per worker, into worker-local slot). Two
* passes: this one fills the histograms; the next does the scatter. */
/* Count rows per partition (per task, into task-local slot). Two
* passes: this one fills the histograms; the next does the scatter.
* Dispatched via ray_pool_dispatch_n with start=task_idx so the
* cursor key is stable across the histogram and scatter passes. */
static void cd_hist_fn(void* ctx, uint32_t worker_id,
int64_t start, int64_t end) {
(void)worker_id;
(void)end;
cd_count_ctx_t* x = (cd_count_ctx_t*)ctx;
int64_t* hist = x->counts + (size_t)worker_id * (x->p_mask + 1);
int64_t task_idx = start;
int64_t row_start = task_idx * x->grain;
int64_t row_end = row_start + x->grain;
if (row_end > x->total) row_end = x->total;
int64_t* hist = x->counts + (size_t)task_idx * (x->p_mask + 1);
/* Reuse the existing tight loops by aliasing the local names. */
start = row_start;
end = row_end;
const void* base = x->base;
int8_t in_type = x->type;
uint8_t in_attrs = x->attrs;
Expand Down Expand Up @@ -373,18 +397,29 @@ static void cd_hist_fn(void* ctx, uint32_t worker_id,
typedef struct {
const void* base;
int64_t* out_buf; /* concatenated payloads (output) */
int64_t* cursor; /* per-worker × P; advances per scatter */
int64_t* cursor; /* per-task × P; advances per scatter */
uint32_t p_bits;
uint64_t p_mask;
int64_t grain; /* rows per task (last task may have fewer) */
int64_t total; /* total row count */
uint8_t is_f64;
int8_t type;
uint8_t attrs;
} cd_scatter_ctx_t;

static void cd_scatter_fn(void* ctx, uint32_t worker_id,
int64_t start, int64_t end) {
(void)worker_id;
(void)end;
cd_scatter_ctx_t* x = (cd_scatter_ctx_t*)ctx;
int64_t* cur = x->cursor + (size_t)worker_id * (x->p_mask + 1);
int64_t task_idx = start;
int64_t row_start = task_idx * x->grain;
int64_t row_end = row_start + x->grain;
if (row_end > x->total) row_end = x->total;
int64_t* cur = x->cursor + (size_t)task_idx * (x->p_mask + 1);
/* Reuse the existing tight loops by aliasing the local names. */
start = row_start;
end = row_end;
int64_t* out = x->out_buf;
const void* base = x->base;
int8_t in_type = x->type;
Expand Down Expand Up @@ -541,31 +576,51 @@ ray_t* exec_count_distinct(ray_graph_t* g, ray_op_t* op, ray_t* input) {
uint64_t P = (uint64_t)1 << p_bits;
uint64_t p_mask = P - 1;

/* Pass 1: per-worker histogram (P × nw int64 cells). */
/* Histograms and cursors are keyed by *task* index, not worker id, so
* pass-2 scatter writes land in the slot that pass-1 histogram
* reserved. A worker may execute different tasks in the two passes
* (the dispatch ring is work-stealing); the row range tied to a task
* is fixed when ray_pool_dispatch_n fills the ring. */
int64_t grain = (int64_t)RAY_DISPATCH_MORSELS * RAY_MORSEL_ELEMS;
if (grain <= 0) grain = 8192;
int64_t n_tasks_64 = (len + grain - 1) / grain;
if (n_tasks_64 <= 0) n_tasks_64 = 1;
/* MAX_RING_CAP guards against pathological len; if we'd exceed it,
* fall back to the sequential kernel — the cap is high enough that
* this only fires on absurd inputs. */
if (n_tasks_64 > (1u << 16)) {
int64_t cnt = cd_seq_count(in_type, input->attrs, base, len);
if (cnt < 0) return ray_error("oom", NULL);
return ray_i64(cnt);
}
uint32_t n_tasks = (uint32_t)n_tasks_64;

/* Pass 1: per-task histogram (P × n_tasks int64 cells). */
ray_t* hist_hdr = NULL;
int64_t* hist = (int64_t*)scratch_calloc(&hist_hdr,
(size_t)P * nw * sizeof(int64_t));
(size_t)P * n_tasks * sizeof(int64_t));
if (!hist) {
return ray_error("oom", NULL);
}
cd_count_ctx_t hctx = {
.base = base, .counts = hist,
.p_bits = p_bits, .p_mask = p_mask,
.grain = grain, .total = len,
.stride_log2 = 0, .is_f64 = (in_type == RAY_F64),
.type = in_type, .attrs = input->attrs,
};
ray_pool_dispatch(pool, cd_hist_fn, &hctx, len);
ray_pool_dispatch_n(pool, cd_hist_fn, &hctx, n_tasks);

/* Convert per-worker histograms into a global prefix sum. Order:
* partition_0_worker_0, partition_0_worker_1, …, partition_1_worker_0, …
* so each (worker, partition) range is a contiguous slice of out_buf. */
/* Convert per-task histograms into a global prefix sum. Order:
* partition_0_task_0, partition_0_task_1, …, partition_1_task_0, …
* so each (task, partition) range is a contiguous slice of out_buf. */
ray_t* off_hdr = NULL;
int64_t* part_off = (int64_t*)scratch_alloc(&off_hdr,
(size_t)(P + 1) * sizeof(int64_t));
if (!part_off) { scratch_free(hist_hdr); return ray_error("oom", NULL); }
ray_t* cur_hdr = NULL;
int64_t* cursor = (int64_t*)scratch_alloc(&cur_hdr,
(size_t)P * nw * sizeof(int64_t));
(size_t)P * n_tasks * sizeof(int64_t));
if (!cursor) {
scratch_free(off_hdr); scratch_free(hist_hdr);
return ray_error("oom", NULL);
Expand All @@ -574,9 +629,9 @@ ray_t* exec_count_distinct(ray_graph_t* g, ray_op_t* op, ray_t* input) {
int64_t total = 0;
for (uint64_t p = 0; p < P; p++) {
part_off[p] = total;
for (uint32_t w = 0; w < nw; w++) {
cursor[(size_t)w * P + p] = total;
total += hist[(size_t)w * P + p];
for (uint32_t t = 0; t < n_tasks; t++) {
cursor[(size_t)t * P + p] = total;
total += hist[(size_t)t * P + p];
}
}
part_off[P] = total;
Expand All @@ -598,10 +653,11 @@ ray_t* exec_count_distinct(ray_graph_t* g, ray_op_t* op, ray_t* input) {
cd_scatter_ctx_t sctx = {
.base = base, .out_buf = out_buf, .cursor = cursor,
.p_bits = p_bits, .p_mask = p_mask,
.grain = grain, .total = len,
.is_f64 = (in_type == RAY_F64),
.type = in_type, .attrs = input->attrs,
};
ray_pool_dispatch(pool, cd_scatter_fn, &sctx, len);
ray_pool_dispatch_n(pool, cd_scatter_fn, &sctx, n_tasks);

/* Pass 3: dedup each partition in parallel. Each partition gets one
* task — distinct values land in the same partition, so per-partition
Expand Down Expand Up @@ -3414,6 +3470,11 @@ ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl,
key_owned[k] = 1;
}
}
if (!key_vecs[k]) {
for (uint8_t j = 0; j < k; j++)
if (key_owned[j] && key_vecs[j]) ray_release(key_vecs[j]);
return ray_error("domain", "by: column not found in table");
}
}

/* Resolve agg input columns (VLA — n_aggs ≤ 8; use ≥1 to avoid zero-size VLA UB) */
Expand Down
30 changes: 21 additions & 9 deletions src/ops/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -2224,10 +2224,20 @@ static ray_t* atom_broadcast_vec(ray_t* a, int64_t n) {
int8_t vec_type = (int8_t)(-a->type);
if (vec_type <= 0) return NULL;

/* SYM atoms produced by ray_sym(id) carry no width attr (always 0 →
* W8), so we can't trust a->attrs when the id exceeds one byte.
* Pick the narrowest width that fits a->i64. */
uint8_t sym_w = 0;
if (vec_type == RAY_SYM) {
uint64_t id = (uint64_t)a->i64;
sym_w = id <= 0xFFu ? RAY_SYM_W8
: id <= 0xFFFFu ? RAY_SYM_W16
: id <= 0xFFFFFFFFu ? RAY_SYM_W32
: RAY_SYM_W64;
}
ray_t* v;
if (vec_type == RAY_SYM) {
uint8_t w = (uint8_t)(a->attrs & RAY_SYM_W_MASK);
v = ray_sym_vec_new(w, n);
v = ray_sym_vec_new(sym_w, n);
} else {
v = ray_vec_new(vec_type, n);
}
Expand Down Expand Up @@ -2269,20 +2279,22 @@ static ray_t* atom_broadcast_vec(ray_t* a, int64_t n) {
break;
}
case RAY_SYM: {
/* SYM stores the ID in `i64` regardless of width; truncate per
* the vector's width attribute. Width came from the atom and
* was carried by ray_sym_vec_new above. */
uint8_t w = (uint8_t)(a->attrs & RAY_SYM_W_MASK);
if (w == RAY_SYM_W8) {
/* Width was selected above to fit a->i64, not read from a->attrs
* (atom-built syms never set the width attr). */
if (sym_w == RAY_SYM_W8) {
memset(dst, (uint8_t)a->i64, (size_t)n);
} else if (w == RAY_SYM_W16) {
} else if (sym_w == RAY_SYM_W16) {
uint16_t val = (uint16_t)a->i64;
uint16_t* d = (uint16_t*)dst;
for (int64_t i = 0; i < n; i++) d[i] = val;
} else { /* W32 — default */
} else if (sym_w == RAY_SYM_W32) {
uint32_t val = (uint32_t)a->i64;
uint32_t* d = (uint32_t*)dst;
for (int64_t i = 0; i < n; i++) d[i] = val;
} else { /* W64 */
int64_t val = a->i64;
int64_t* d = (int64_t*)dst;
for (int64_t i = 0; i < n; i++) d[i] = val;
}
break;
}
Expand Down
2 changes: 2 additions & 0 deletions test/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ extern const test_entry_t graph_entries[];
extern const test_entry_t graph_builtin_entries[];
extern const test_entry_t group_extra_entries[];
extern const test_entry_t fused_group_entries[];
extern const test_entry_t fused_topk_entries[];
extern const test_entry_t hash_entries[];
extern const test_entry_t heap_entries[];
extern const test_entry_t index_entries[];
Expand Down Expand Up @@ -155,6 +156,7 @@ static const test_entry_t* const compiled_groups[] = {
format_entries, fvec_entries, graph_entries, graph_builtin_entries,
group_extra_entries,
fused_group_entries,
fused_topk_entries,
hash_entries,
heap_entries,
index_entries, ipc_entries,
Expand Down
84 changes: 84 additions & 0 deletions test/rfl/agg/count_distinct.rfl
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
;; Coverage tests for count(distinct) — exec_count_distinct + the
;; parallel partitioned kernel (cd_hist_fn / cd_scatter_fn /
;; cd_part_dedup_fn) in src/ops/group.c.
;;
;; Trigger conditions for the parallel path:
;; - len >= 65536 (1 << 16) AND a worker pool is available.
;;
;; Type coverage (flat numeric only — owned by this agent per the brief):
;; I64, I32, I16, U8, BOOL, F64.
;; SYM/STR/GUID are owned by other agents.
;;
;; Each test feeds (count (distinct ...)) which the optimiser rewrites
;; to OP_COUNT_DISTINCT (see ops/idiom.c rw_count_distinct), routing
;; through exec_count_distinct.
;;
;; The parallel kernel keys its histograms and scatter cursors by *task*
;; index (stable across passes), not worker id (work-stealing means a
;; worker can claim different tasks across two consecutive dispatches).
;; The earlier worker-id keying caused non-deterministic under-counts;
;; tests below assert exact distinct counts.

;; ────────────── 1. Sub-threshold sequential path (len < 65536) ──────────────
;; Guarantees coverage of cd_seq_count for each base type. Existing
;; ops/idiom.rfl covers I64; here we add the small-input arms for the
;; narrow types that were missing.
(count (distinct (as 'I32 [1 2 3 1 2]))) -- 3
(count (distinct (as 'I16 [10 20 10 30]))) -- 3
(count (distinct (as 'U8 [0 1 2 1 0 2]))) -- 3
(count (distinct (as 'BOOL [true false true false]))) -- 2
(count (distinct (as 'F64 [1.5 2.5 1.5 3.5 2.5]))) -- 3

;; Just below the threshold: sequential cd_seq_count for I64 / F64 /
;; narrow ints with a meaningful payload (sub-65536 hits the seq path).
(count (distinct (til 65535))) -- 65535
(count (distinct (% (til 65535) 999))) -- 999
(count (distinct (as 'F64 (til 65535)))) -- 65535
(count (distinct (as 'I32 (% (til 65535) 500)))) -- 500
(count (distinct (as 'I16 (% (til 65535) 250)))) -- 250
(count (distinct (as 'U8 (% (til 65535) 200)))) -- 200

;; ────────────── 2. Parallel path: I64 (len >= 65536) ──────────────
;; Triggers the cd_hist_fn / cd_scatter_fn I64 arm and cd_part_dedup_fn
;; over int64 payloads.
(count (distinct (til 65600))) -- 65600
(count (distinct (% (til 65600) 1000))) -- 1000

;; ────────────── 3. Parallel path: I32 (len >= 65536) ──────────────
(count (distinct (as 'I32 (% (til 65600) 500)))) -- 500

;; ────────────── 4. Parallel path: I16 (len >= 65536) ──────────────
(count (distinct (as 'I16 (% (til 65600) 250)))) -- 250

;; ────────────── 5. Parallel path: U8 (len >= 65536) ──────────────
;; U8 cannot hold > 255 distinct values, so cap at 200.
(count (distinct (as 'U8 (% (til 65600) 200)))) -- 200

;; ────────────── 6. Parallel path: BOOL (len >= 65536) ──────────────
;; BOOL is exactly two values. Alternating pattern → both values appear.
(count (distinct (== 0 (% (til 65600) 2)))) -- 2

;; ────────────── 7. Parallel path: F64 (len >= 65536) ──────────────
;; Triggers cd_hist_fn / cd_scatter_fn F64 arms including the
;; NaN/0.0 normalisation.
(count (distinct (as 'F64 (til 65600)))) -- 65600
(count (distinct (as 'F64 (% (til 65600) 100)))) -- 100

;; ────────────── 8. Per-group count(distinct) over >= 200000 rows ──────────────
;; Routes through ray_count_distinct_per_group → count_distinct_per_group_parallel
;; (group.c L840-949: cdpg_hist_fn, cdpg_scat_fn, cdpg_dedup_fn, cdpg_read).
;; query.c:5703 selects ray_count_distinct_per_group when n_groups > 50000;
;; group.c:991 dispatches the parallel kernel when n_rows >= 200000.
;; So we need: n_rows >= 200000 AND n_groups > 50000, with a numeric key.
;; Note: a more thorough C-level test exists at
;; test/test_group_extra.c::test_count_distinct_per_group_parallel —
;; it bypasses rfl plumbing for ~100x speed. This rfl form provides
;; end-to-end planner coverage on the same dispatch.
;; Use exactly the threshold (200000) — anything below skips the parallel
;; kernel. Use 51000 groups (>50000 routes through ray_count_distinct_per_group).
(set Ncdpg 200000)
(set Tcdpg (table [g v] (list (% (til Ncdpg) 51000) (% (til Ncdpg) 4))))
;; n_groups = 51000, n_rows = 200000 → count_distinct_per_group_parallel.
;; Reduced to 4 distinct per group so cdpg_dedup_fn HT settles fast.
(set Rcdpg (select {n: (count (distinct v)) from: Tcdpg by: g}))
(count Rcdpg) -- 51000
Loading
Loading