Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
51fc9ba
feat(perf): Phase A — OP_PEARSON_CORR opcode + planner integration
ser-vasilich May 11, 2026
2488c30
wip(perf): Phase B partial — group.c layout/need-flag for OP_PEARSON_…
ser-vasilich May 11, 2026
993988e
feat(perf): Phase B — OP_PEARSON_CORR vectorized hash-agg
ser-vasilich May 11, 2026
b06bc3c
wip(perf): OP_PEARSON_CORR — single-HT finalize + extra out_type arms
ser-vasilich May 11, 2026
3c8f753
feat(perf): median per-group fast path — bucket-scatter + ray_median_…
ser-vasilich May 11, 2026
abd1816
wip(perf): median fast path — second eval-fallback site
ser-vasilich May 11, 2026
ea8c709
feat(perf): OP_MEDIAN — dump opcode name
ser-vasilich May 12, 2026
f149ac4
feat(perf): OP_MEDIAN — DAG-route integration in exec_group
ser-vasilich May 12, 2026
836731c
feat(perf): OP_TOP_N / OP_BOT_N — opcodes + planner integration
ser-vasilich May 13, 2026
fc0dcb0
feat(perf): OP_TOP_N / OP_BOT_N — per-group bounded-heap kernel
ser-vasilich May 13, 2026
c749ed2
feat(perf): OP_TOP_N / OP_BOT_N — exec_group post-radix wiring
ser-vasilich May 13, 2026
891f829
test(h2o): q8 — native (top col K) / (bot col K) coverage
ser-vasilich May 13, 2026
07921ba
perf(group): cap histscat tasks at worker count
ser-vasilich May 13, 2026
ffa12ed
fix(group): per-group dispatch survives n_groups > 65536
ser-vasilich May 14, 2026
9ea03d1
perf(raze): O(N) fast path for same-typed numeric vectors
ser-vasilich May 14, 2026
d482047
feat(perf): OP_GROUP_TOPK_ROWFORM — row-form per-group top/bot K
ser-vasilich May 14, 2026
c0e2605
perf(group_topk): radix-scatter Phase 1 — L2-hot partition HTs
ser-vasilich May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/lang/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -2499,11 +2499,13 @@ static void ray_register_builtins(void) {
* types. Per-group usage works through the eval-level scatter. */
register_binary("top", RAY_FN_NONE, ray_top_fn);
register_binary("bot", RAY_FN_NONE, ray_bot_fn);
/* pearson_corr: 2-input scalar reducer. Per-group usage routes
* through the eval-level scatter (head not in agg-opcode list,
* but expr_refs_row_column → row-aligned check → per-group eval
* fallback when full-table call collapses to a scalar). */
register_binary("pearson_corr", RAY_FN_NONE, ray_pearson_corr_fn);
/* pearson_corr: 2-input scalar reducer. Marked AGGR + LAZY_AWARE so
* the planner picks it up via is_streaming_aggr_binary_call and lowers
* a `(pearson_corr x y)` reference inside `(select ... by ...)` to an
* OP_PEARSON_CORR DAG node — single-pass vectorized hash-agg. The
* ray_pearson_corr_fn body remains the fallback for non-vectorizable
* shapes (LIST inputs, eval-level scatter on unsupported key types). */
register_binary("pearson_corr", RAY_FN_AGGR | RAY_FN_LAZY_AWARE, ray_pearson_corr_fn);

/* Special forms */
register_binary("set", RAY_FN_SPECIAL_FORM | RAY_FN_RESTRICTED, ray_set_fn);
Expand Down
5 changes: 5 additions & 0 deletions src/lang/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ ray_t* ray_top_fn(ray_t* v, ray_t* n_obj);
ray_t* ray_bot_fn(ray_t* v, ray_t* n_obj);
ray_t* ray_pearson_corr_fn(ray_t* x, ray_t* y);

/* In-place median (quickselect). Caller owns the buffer; we permute
* elements. Returns NaN if n <= 0. Used by aggr_med_per_group_buf in
* query.c for the fast per-group median path. */
double ray_median_dbl_inplace(double* a, int64_t n);

/* Collection helpers (formerly static in eval.c, now in collection.c) */
int atom_eq(ray_t* a, ray_t* b);
ray_t* list_to_typed_vec(ray_t* list, int8_t orig_vec_type);
Expand Down
20 changes: 20 additions & 0 deletions src/ops/agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,26 @@ ray_t* ray_med_fn(ray_t* x) {
static ray_t* var_stddev_core(ray_t* x, int sample, int take_sqrt);


/* In-place exact median over a flat double buffer. Caller owns the
* buffer; we permute its elements via nth_element_dbl. Returns NaN
* if n <= 0 (caller must filter that case if a typed-null is needed).
*
* Used by the per-group median fast path in query.c which avoids the
* full ray_med_fn slice-allocation cost — see aggr_med_per_group_buf. */
double ray_median_dbl_inplace(double* a, int64_t n) {
if (n <= 0) return 0.0;
if (n == 1) return a[0];
int64_t k = n / 2;
if (n % 2 == 1) {
nth_element_dbl(a, 0, n - 1, k);
return a[k];
}
nth_element_dbl(a, 0, n - 1, k - 1);
nth_element_dbl(a, k, n - 1, k);
return (a[k - 1] + a[k]) / 2.0;
}


ray_t* ray_dev_fn(ray_t* x) { return var_stddev_core(x, 0, 1); }

/* Shared core for variance / stddev in sample or population mode.
Expand Down
38 changes: 37 additions & 1 deletion src/ops/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -2950,7 +2950,43 @@ ray_t* ray_raze_fn(ray_t* x) {
int64_t n = x->len;
if (n == 0) return ray_list_new(0);
ray_t** items = (ray_t**)ray_data(x);
/* Try to concat all items */

/* Fast path: all items are vectors of the same primitive type
* (numeric/temporal, fixed-width, no SYM/STR/GUID/LIST/null).
* Pre-size one output vector and memcpy each item's data — O(total)
* instead of the pairwise concat loop's O(N²). */
if (ray_is_vec(items[0])) {
int8_t t = items[0]->type;
bool fast = (t != RAY_LIST && t != RAY_STR && t != RAY_SYM && t != RAY_GUID);
int64_t total = 0;
if (fast) {
for (int64_t i = 0; i < n; i++) {
ray_t* it = items[i];
if (!ray_is_vec(it) || it->type != t
|| (it->attrs & RAY_ATTR_HAS_NULLS)) {
fast = false; break;
}
total += it->len;
}
}
if (fast) {
ray_t* out = ray_vec_new(t, total);
if (!out || RAY_IS_ERR(out)) return out ? out : ray_error("oom", NULL);
out->len = total;
uint8_t esz = ray_elem_size(t);
char* dst = (char*)ray_data(out);
int64_t pos = 0;
for (int64_t i = 0; i < n; i++) {
int64_t k = items[i]->len;
if (k > 0) memcpy(dst + pos * esz, ray_data(items[i]), (size_t)k * esz);
pos += k;
}
return out;
}
}

/* Slow path: pairwise concat — used for mixed types, null-bearing
* inputs, and non-fixed-width vectors (SYM/STR/GUID/LIST). */
ray_t* result = items[0];
ray_retain(result);
for (int64_t i = 1; i < n; i++) {
Expand Down
4 changes: 4 additions & 0 deletions src/ops/dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ const char* ray_opcode_name(uint16_t op) {
case OP_STDDEV_POP: return "STDDEV_POP";
case OP_VAR: return "VAR";
case OP_VAR_POP: return "VAR_POP";
case OP_PEARSON_CORR: return "PEARSON_CORR";
case OP_MEDIAN: return "MEDIAN";
case OP_FILTER: return "FILTER";
case OP_SORT: return "SORT";
case OP_GROUP: return "GROUP";
case OP_GROUP_TOPK_ROWFORM: return "GROUP_TOPK_ROWFORM";
case OP_GROUP_BOTK_ROWFORM: return "GROUP_BOTK_ROWFORM";
case OP_FILTERED_GROUP:return "FILTERED_GROUP";
case OP_PIVOT: return "PIVOT";
case OP_ANTIJOIN: return "ANTIJOIN";
Expand Down
5 changes: 5 additions & 0 deletions src/ops/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ static ray_t* exec_in(ray_graph_t* g, ray_op_t* op, ray_t* col, ray_t* set) {
/* Is this opcode a "heavy" pipeline breaker worth profiling? */
static inline bool op_is_heavy(uint16_t opc) {
return opc == OP_FILTER || opc == OP_SORT || opc == OP_GROUP ||
opc == OP_GROUP_TOPK_ROWFORM || opc == OP_GROUP_BOTK_ROWFORM ||
opc == OP_JOIN || opc == OP_WINDOW_JOIN || opc == OP_SELECT ||
opc == OP_HEAD || opc == OP_TAIL || opc == OP_WINDOW ||
opc == OP_PIVOT ||
Expand Down Expand Up @@ -1235,6 +1236,10 @@ static ray_t* exec_node_inner(ray_graph_t* g, ray_op_t* op) {
case OP_FILTERED_GROUP:
return exec_filtered_group(g, op);

case OP_GROUP_TOPK_ROWFORM:
case OP_GROUP_BOTK_ROWFORM:
return exec_group_topk_rowform(g, op);

case OP_PIVOT: {
ray_t* tbl = g->table;
ray_t* owned_tbl = NULL;
Expand Down
132 changes: 125 additions & 7 deletions src/ops/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ static void graph_fixup_ext_ptrs(ray_graph_t* g, ptrdiff_t delta) {
ext->keys[k] = graph_fix_ptr(ext->keys[k], delta);
for (uint8_t a = 0; a < ext->n_aggs; a++)
ext->agg_ins[a] = graph_fix_ptr(ext->agg_ins[a], delta);
if (ext->agg_ins2) {
for (uint8_t a = 0; a < ext->n_aggs; a++) {
if (ext->agg_ins2[a])
ext->agg_ins2[a] = graph_fix_ptr(ext->agg_ins2[a], delta);
}
}
break;
case OP_JOIN:
case OP_ANTIJOIN:
Expand Down Expand Up @@ -679,6 +685,18 @@ ray_op_t* ray_stddev(ray_graph_t* g, ray_op_t* a) { return make_unary(g, OP_
ray_op_t* ray_stddev_pop(ray_graph_t* g, ray_op_t* a) { return make_unary(g, OP_STDDEV_POP, a, RAY_F64); }
ray_op_t* ray_var(ray_graph_t* g, ray_op_t* a) { return make_unary(g, OP_VAR, a, RAY_F64); }
ray_op_t* ray_var_pop(ray_graph_t* g, ray_op_t* a) { return make_unary(g, OP_VAR_POP, a, RAY_F64); }
/* Pearson correlation is a 2-input aggregator; the node carries two
* input pointers (x and y) and lowers to OP_PEARSON_CORR. */
ray_op_t* ray_pearson_corr(ray_graph_t* g, ray_op_t* x, ray_op_t* y) {
return make_binary(g, OP_PEARSON_CORR, x, y, RAY_F64);
}

/* Exact median per group. Runtime forks to a separate bucket-scatter +
* quickselect path (see ray_median_per_group) — it can't fit the
* fixed-size row-layout HT because per-group buffer size is variable. */
ray_op_t* ray_median(ray_graph_t* g, ray_op_t* a) {
return make_unary(g, OP_MEDIAN, a, RAY_F64);
}

/* --------------------------------------------------------------------------
* Structural ops
Expand Down Expand Up @@ -747,22 +765,45 @@ ray_op_t* ray_sort_op(ray_graph_t* g, ray_op_t* table_node,
return &g->nodes[ext->base.id];
}

ray_op_t* ray_group(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys,
uint16_t* agg_ops, ray_op_t** agg_ins, uint8_t n_aggs) {
/* Shared impl for ray_group / ray_group2 / ray_group3. agg_ins2 NULL →
* no binary aggs; otherwise must be the same length as agg_ins (NULL
* slots for unary aggs, non-NULL for OP_PEARSON_CORR slots). agg_k NULL
* → no scalar params; otherwise length n_aggs (0 in slots without). */
static ray_op_t* ray_group_impl(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys,
uint16_t* agg_ops, ray_op_t** agg_ins,
ray_op_t** agg_ins2, const int64_t* agg_k,
uint8_t n_aggs) {
uint32_t key_ids[256];
uint32_t agg_ids[256];
uint32_t agg_ids2[256]; /* parallel to agg_ids; 0 when no second input */
bool has_ins2 = false;
bool has_k = false;
for (uint8_t i = 0; i < n_keys; i++) key_ids[i] = keys[i]->id;
for (uint8_t i = 0; i < n_aggs; i++) agg_ids[i] = agg_ins[i]->id;
for (uint8_t i = 0; i < n_aggs; i++) {
agg_ids[i] = agg_ins[i]->id;
agg_ids2[i] = 0;
if (agg_ins2 && agg_ins2[i]) {
agg_ids2[i] = agg_ins2[i]->id;
has_ins2 = true;
}
if (agg_k && agg_k[i] != 0) has_k = true;
}

size_t keys_sz = (size_t)n_keys * sizeof(ray_op_t*);
size_t ops_sz = (size_t)n_aggs * sizeof(uint16_t);
size_t ins_sz = (size_t)n_aggs * sizeof(ray_op_t*);
/* Align ops after keys (pointer-sized), ins after ops (needs ptr alignment) */
size_t ops_off = keys_sz;
size_t ins_off = ops_off + ops_sz;
size_t ins2_sz = has_ins2 ? ins_sz : 0;
size_t k_sz = has_k ? (size_t)n_aggs * sizeof(int64_t) : 0;
/* Align ops after keys (pointer-sized), ins after ops, ins2 after ins. */
size_t ops_off = keys_sz;
size_t ins_off = ops_off + ops_sz;
/* Round ins_off up to pointer alignment */
ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1);
ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins_off + ins_sz);
size_t ins2_off = ins_off + ins_sz;
size_t k_off = ins2_off + ins2_sz;
/* Round k_off up to int64 alignment */
k_off = (k_off + sizeof(int64_t) - 1) & ~(sizeof(int64_t) - 1);
ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, k_off + k_sz);
if (!ext) return NULL;

ext->base.opcode = OP_GROUP;
Expand All @@ -782,17 +823,94 @@ ray_op_t* ray_group(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys,
ext->agg_ins = (ray_op_t**)(trail + ins_off);
for (uint8_t i = 0; i < n_aggs; i++)
ext->agg_ins[i] = &g->nodes[agg_ids[i]];
if (has_ins2) {
ext->agg_ins2 = (ray_op_t**)(trail + ins2_off);
for (uint8_t i = 0; i < n_aggs; i++)
ext->agg_ins2[i] = agg_ids2[i] ? &g->nodes[agg_ids2[i]] : NULL;
} else {
ext->agg_ins2 = NULL;
}
if (has_k) {
ext->agg_k = (int64_t*)(trail + k_off);
for (uint8_t i = 0; i < n_aggs; i++)
ext->agg_k[i] = agg_k ? agg_k[i] : 0;
} else {
ext->agg_k = NULL;
}
ext->n_keys = n_keys;
ext->n_aggs = n_aggs;

g->nodes[ext->base.id] = ext->base;
return &g->nodes[ext->base.id];
}

ray_op_t* ray_group(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys,
uint16_t* agg_ops, ray_op_t** agg_ins, uint8_t n_aggs) {
return ray_group_impl(g, keys, n_keys, agg_ops, agg_ins, NULL, NULL, n_aggs);
}

ray_op_t* ray_group2(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys,
uint16_t* agg_ops, ray_op_t** agg_ins,
ray_op_t** agg_ins2, uint8_t n_aggs) {
return ray_group_impl(g, keys, n_keys, agg_ops, agg_ins, agg_ins2, NULL, n_aggs);
}

ray_op_t* ray_group3(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys,
uint16_t* agg_ops, ray_op_t** agg_ins,
ray_op_t** agg_ins2, const int64_t* agg_k,
uint8_t n_aggs) {
return ray_group_impl(g, keys, n_keys, agg_ops, agg_ins, agg_ins2, agg_k, n_aggs);
}

ray_op_t* ray_distinct(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys) {
return ray_group(g, keys, n_keys, NULL, NULL, 0);
}

/* Dedicated per-group top/bot-K with row-form emission. Mirrors the
* OP_GROUP ext-node layout (single key + single agg + agg_k slot) so
* downstream optimiser passes can introspect ext->keys / ext->agg_ins
* the same way they do for OP_GROUP, but with a distinct opcode that
* exec.c routes to exec_group_topk_rowform. */
ray_op_t* ray_group_topk_rowform(ray_graph_t* g, ray_op_t* key,
ray_op_t* val, int64_t k, uint8_t desc) {
if (!g || !key || !val || k < 1 || k > 1024) return NULL;

size_t keys_sz = sizeof(ray_op_t*);
size_t ops_sz = sizeof(uint16_t);
size_t ins_sz = sizeof(ray_op_t*);
size_t ops_off = keys_sz;
size_t ins_off = ops_off + ops_sz;
ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1);
size_t k_off = ins_off + ins_sz;
k_off = (k_off + sizeof(int64_t) - 1) & ~(sizeof(int64_t) - 1);
size_t k_sz = sizeof(int64_t);

ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, k_off + k_sz);
if (!ext) return NULL;

ext->base.opcode = desc ? OP_GROUP_TOPK_ROWFORM : OP_GROUP_BOTK_ROWFORM;
ext->base.arity = 0;
ext->base.out_type = RAY_TABLE;
ext->base.est_rows = key->est_rows;
ext->base.inputs[0] = key;

char* trail = EXT_TRAIL(ext);
ext->keys = (ray_op_t**)trail;
ext->keys[0] = key;
ext->agg_ops = (uint16_t*)(trail + ops_off);
ext->agg_ops[0] = desc ? OP_TOP_N : OP_BOT_N;
ext->agg_ins = (ray_op_t**)(trail + ins_off);
ext->agg_ins[0] = val;
ext->agg_ins2 = NULL;
ext->agg_k = (int64_t*)(trail + k_off);
ext->agg_k[0] = k;
ext->n_keys = 1;
ext->n_aggs = 1;

g->nodes[ext->base.id] = ext->base;
return &g->nodes[ext->base.id];
}

ray_op_t* ray_pivot_op(ray_graph_t* g,
ray_op_t** index_cols, uint8_t n_index,
ray_op_t* pivot_col,
Expand Down
Loading