Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/io/csv.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "core/pool.h"
#include "lang/format.h"
#include "ops/hash.h"
#include "ops/idxop.h" /* attach per-chunk zone index after load */
#include "store/col.h"
#include "store/fileio.h"
#include "store/splay.h"
Expand Down Expand Up @@ -1410,6 +1411,20 @@ static ray_t* csv_materialize_rows(const char* buf, size_t file_size,
col_data[c] = dst;
}

/* Per-chunk min/max + null bit on every column big enough to be worth
* indexing — gives the reduce min/max and the filter chunk-skip paths
* an O(n_chunks) scan instead of O(n_rows). Attach is best-effort:
* unsupported types (RAY_STR/RAY_SYM/RAY_GUID in v1) just stay
* unindexed and the consumer falls back to a row scan. */
for (int c = 0; c < ncols; c++) {
ray_t* v = col_vecs[c];
if (!v || RAY_IS_ERR(v)) continue;
if (v->len < (1 << 16)) continue; /* < one chunk, skip */
ray_t* r = ray_index_attach_chunk_zone(&v, 16);
if (r && !RAY_IS_ERR(r)) col_vecs[c] = v; /* attach succeeded */
/* On failure the original column stays in col_vecs[c]; ignore. */
}

ray_t* tbl = ray_table_new(ncols);
if (!tbl || RAY_IS_ERR(tbl)) {
for (int c = 0; c < ncols; c++) ray_release(col_vecs[c]);
Expand Down Expand Up @@ -1788,6 +1803,17 @@ ray_t* ray_read_csv_named_opts(const char* path, char delimiter, bool header,

/* ---- 11. Build table ---- */
{
/* Best-effort per-chunk zone index attach (see comment on the
* matching loop in build_table_from_cols) — unsupported types
* fall through to the unindexed path inside the consumer. */
for (int c = 0; c < ncols; c++) {
ray_t* v = col_vecs[c];
if (!v || RAY_IS_ERR(v)) continue;
if (v->len < (1 << 16)) continue;
ray_t* r = ray_index_attach_chunk_zone(&v, 16);
if (r && !RAY_IS_ERR(r)) col_vecs[c] = v;
}

ray_t* tbl = ray_table_new(ncols);
if (!tbl || RAY_IS_ERR(tbl)) {
for (int c = 0; c < ncols; c++) ray_release(col_vecs[c]);
Expand Down
14 changes: 0 additions & 14 deletions src/lang/env.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@
#include <stdlib.h>
#include <string.h>

static _Atomic uint64_t g_env_generation = 1;

uint64_t ray_env_generation(void) {
return atomic_load_explicit(&g_env_generation, memory_order_relaxed);
}

static void env_bump_generation_if_user(int is_user) {
if (is_user)
atomic_fetch_add_explicit(&g_env_generation, 1, memory_order_relaxed);
}

/* ---- Function constructors ---- */

/* Builtin name stored inline in nullmap[2..15] (max 13 chars + null).
Expand Down Expand Up @@ -311,7 +300,6 @@ static ray_err_t env_bind_global_impl(int64_t sym_id, ray_t* val, int is_user) {
g_env.user[j] = g_env.user[j + 1];
}
g_env.count--;
env_bump_generation_if_user(is_user);
env_unlock();
return RAY_OK;
}
Expand All @@ -324,7 +312,6 @@ static ray_err_t env_bind_global_impl(int64_t sym_id, ray_t* val, int is_user) {
* flag alone — once user, always user, until the slot is
* deleted. */
if (is_user) g_env.user[i] = 1;
env_bump_generation_if_user(is_user);
env_unlock();
return RAY_OK;
}
Expand All @@ -342,7 +329,6 @@ static ray_err_t env_bind_global_impl(int64_t sym_id, ray_t* val, int is_user) {
g_env.vals[g_env.count] = val;
g_env.user[g_env.count] = is_user ? 1 : 0;
g_env.count++;
env_bump_generation_if_user(is_user);
env_unlock();
return RAY_OK;
}
Expand Down
1 change: 0 additions & 1 deletion src/lang/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ static inline const char* ray_fn_name(const ray_t* fn) {
ray_err_t ray_env_init(void);
void ray_env_destroy(void);
ray_t* ray_env_get(int64_t sym_id);
uint64_t ray_env_generation(void);

/* User-facing binder. Refuses any name starting with `.` — that root is
* reserved for system namespaces (.sys, .os, .io, .ipc, …) populated by
Expand Down
109 changes: 0 additions & 109 deletions src/lang/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -1487,116 +1487,9 @@ ray_t* ray_cond_fn(ray_t** args, int64_t n) {
return make_i64(0);
}

static uint64_t do_cache_mix(uint64_t h, uint64_t v) {
h ^= v + 0x9e3779b97f4a7c15ull + (h << 6) + (h >> 2);
return h ? h : 0x9e3779b97f4a7c15ull;
}

static uint64_t do_cache_hash(ray_t* x) {
if (!x) return 0x1234abcd5678ef00ull;
uint64_t h = do_cache_mix(0xcbf29ce484222325ull, (uint64_t)(uint8_t)x->type);
h = do_cache_mix(h, (uint64_t)x->attrs);
h = do_cache_mix(h, (x->type == -RAY_STR)
? (uint64_t)ray_str_len(x)
: (uint64_t)x->len);
if (x->type == RAY_LIST) {
ray_t** elems = (ray_t**)ray_data(x);
for (int64_t i = 0; i < x->len; i++)
h = do_cache_mix(h, do_cache_hash(elems[i]));
} else if (x->type == RAY_DICT) {
h = do_cache_mix(h, do_cache_hash(ray_dict_keys(x)));
h = do_cache_mix(h, do_cache_hash(ray_dict_vals(x)));
} else if (x->type == RAY_STR) {
for (int64_t i = 0; i < x->len; i++) {
size_t n = 0;
const char* s = ray_str_vec_get(x, i, &n);
for (size_t j = 0; s && j < n; j++)
h = do_cache_mix(h, (unsigned char)s[j]);
}
} else if (x->type == -RAY_STR) {
const char* s = ray_str_ptr(x);
size_t n = ray_str_len(x);
for (size_t i = 0; s && i < n; i++)
h = do_cache_mix(h, (unsigned char)s[i]);
} else if (x->type == RAY_SYM || x->type == -RAY_SYM ||
x->type == RAY_I64 || x->type == -RAY_I64 ||
x->type == RAY_TIMESTAMP || x->type == -RAY_TIMESTAMP) {
h = do_cache_mix(h, (uint64_t)x->i64);
} else if (x->type == RAY_I32 || x->type == -RAY_I32 ||
x->type == RAY_DATE || x->type == -RAY_DATE ||
x->type == RAY_TIME || x->type == -RAY_TIME) {
h = do_cache_mix(h, (uint64_t)(uint32_t)x->i32);
} else if (x->type == RAY_I16 || x->type == -RAY_I16) {
h = do_cache_mix(h, (uint64_t)(uint16_t)x->i16);
} else if (x->type == RAY_U8 || x->type == -RAY_U8 ||
x->type == RAY_BOOL || x->type == -RAY_BOOL) {
h = do_cache_mix(h, (uint64_t)x->u8);
} else if (x->type == RAY_F64 || x->type == -RAY_F64) {
uint64_t bits = 0;
memcpy(&bits, &x->f64, sizeof(bits));
h = do_cache_mix(h, bits);
}
return h;
}

static bool do_cache_contains_set(ray_t* x) {
if (!x || x->type != RAY_LIST) return false;
ray_t** elems = (ray_t**)ray_data(x);
if (x->len > 0 && elems[0] && elems[0]->type == -RAY_SYM) {
ray_t* s = ray_sym_str(elems[0]->i64);
bool is_set = s && ray_str_len(s) == 3 &&
memcmp(ray_str_ptr(s), "set", 3) == 0;
if (s) ray_release(s);
if (is_set) return true;
}
for (int64_t i = 0; i < x->len; i++)
if (do_cache_contains_set(elems[i]))
return true;
return false;
}

static bool do_cache_is_null_name(ray_t* x) {
if (!x || x->type != -RAY_SYM || !(x->attrs & RAY_ATTR_NAME)) return false;
ray_t* s = ray_sym_str(x->i64);
bool ok = s && ray_str_len(s) == 4 && memcmp(ray_str_ptr(s), "null", 4) == 0;
if (s) ray_release(s);
return ok;
}

#define DO_NULL_CACHE_N 2048
static uint64_t g_do_null_cache[DO_NULL_CACHE_N];
static uint64_t g_do_null_cache_env_gen[DO_NULL_CACHE_N];
static uint16_t g_do_null_cache_next = 0;

static bool do_null_cache_get(uint64_t hash) {
if (!hash) return false;
uint64_t env_gen = ray_env_generation();
for (uint16_t i = 0; i < DO_NULL_CACHE_N; i++)
if (g_do_null_cache[i] == hash &&
g_do_null_cache_env_gen[i] == env_gen)
return true;
return false;
}

static void do_null_cache_put(uint64_t hash) {
if (hash) {
uint16_t slot = g_do_null_cache_next++ % DO_NULL_CACHE_N;
g_do_null_cache[slot] = hash;
g_do_null_cache_env_gen[slot] = ray_env_generation();
}
}

/* (do expr1 expr2 ...) — evaluate in sequence, return last. Pushes local scope. */
ray_t* ray_do_fn(ray_t** args, int64_t n) {
if (n == 0) return make_i64(0);
uint64_t null_cache_hash = 0;
if (g_ray_profile.active &&
n == 2 && do_cache_is_null_name(args[1]) &&
!do_cache_contains_set(args[0])) {
null_cache_hash = do_cache_hash(args[0]);
if (do_null_cache_get(null_cache_hash))
return NULL;
}
if (ray_env_push_scope() != RAY_OK) return ray_error("oom", NULL);
ray_t* result = NULL;
for (int64_t i = 0; i < n; i++) {
Expand All @@ -1610,8 +1503,6 @@ ray_t* ray_do_fn(ray_t** args, int64_t n) {
}
}
ray_env_pop_scope();
if (null_cache_hash && result == NULL)
do_null_cache_put(null_cache_hash);
return result;
}

Expand Down
41 changes: 27 additions & 14 deletions src/mem/heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1471,20 +1471,33 @@ void ray_heap_gc(void) {
}

/* Pass 5: Release physical pages from free blocks in every
* idle heap. Pass 2 may have returned blocks to worker-owned
* freelists; releasing only the caller heap leaves those worker
* pages resident across large query repetitions. */
for (int hid = 0; hid < RAY_HEAP_REGISTRY_SIZE; hid++) {
ray_heap_t* gh = ray_heap_registry[hid];
if (!gh) continue;
for (int i = 13; i < RAY_HEAP_FL_SIZE; i++) {
ray_fl_head_t* head = &gh->freelist[i];
ray_t* blk = head->fl_next;
while (blk != (ray_t*)head) {
size_t bsize = BSIZEOF(i);
if (bsize > 4096)
ray_vm_release((char*)blk + 4096, bsize - 4096);
blk = blk->fl_next;
* idle heap, throttled to once every PASS5_PERIOD GCs.
*
* The original unthrottled walk issued one madvise(MADV_DONTNEED)
* per free block > 4 KB on every GC. For repeated-query
* workloads (any bench / OLAP loop) the freed blocks would be
* reused on the very next query — but the madvise tears down
* page tables and forces a re-fault, paying the cost twice.
*
* Period 16 keeps the long-running-process invariant (free
* blocks eventually return physical pages to the OS) while
* removing the per-query madvise cost. Explicit callers
* needing prompt release should use ray_heap_release_pages. */
static uint32_t pass5_counter = 0;
enum { PASS5_PERIOD = 16 };
if ((++pass5_counter % PASS5_PERIOD) == 0) {
for (int hid = 0; hid < RAY_HEAP_REGISTRY_SIZE; hid++) {
ray_heap_t* gh = ray_heap_registry[hid];
if (!gh) continue;
for (int i = 13; i < RAY_HEAP_FL_SIZE; i++) {
ray_fl_head_t* head = &gh->freelist[i];
ray_t* blk = head->fl_next;
while (blk != (ray_t*)head) {
size_t bsize = BSIZEOF(i);
if (bsize > 4096)
ray_vm_release((char*)blk + 4096, bsize - 4096);
blk = blk->fl_next;
}
}
}
}
Expand Down
73 changes: 71 additions & 2 deletions src/ops/agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "lang/internal.h"
#include "ops/ops.h"
#include "ops/idxop.h" /* RAY_IDX_CHUNK_ZONE fast path for min/max */
#include "mem/heap.h"

#include <stdlib.h> /* qsort (introselect fallback) */
Expand Down Expand Up @@ -328,7 +329,43 @@ ray_t* ray_min_fn(ray_t* x) {
if (ray_is_lazy(x)) return ray_lazy_append(x, OP_MIN);
if (RAY_IS_PARTED(x->type)) return agg_parted_minmax(x, 0);
if (ray_is_atom(x)) { ray_retain(x); return x; }
if (ray_is_vec(x)) AGG_VEC_VIA_DAG(x, ray_min_op);
if (ray_is_vec(x)) {
/* Per-chunk zone index fast path: O(n_chunks) instead of O(n_rows).
* Only valid when the index was built for the column's current len
* (mutation paths call ray_index_drop). */
if (ray_index_kind(x) == RAY_IDX_CHUNK_ZONE) {
ray_index_t* ix = ray_index_payload(x->index);
if (ix->built_for_len == x->len) {
uint32_t n_chunks = ix->u.chunk_zone.n_chunks;
if (ix->u.chunk_zone.is_f64) {
const double* mins = (const double*)ray_data(ix->u.chunk_zone.mins);
double mn = INFINITY;
for (uint32_t g = 0; g < n_chunks; g++)
if (mins[g] < mn) mn = mins[g];
if (mn == INFINITY) return ray_typed_null(-RAY_F64);
return make_f64(mn);
} else {
const int64_t* mins = (const int64_t*)ray_data(ix->u.chunk_zone.mins);
int64_t mn = INT64_MAX;
for (uint32_t g = 0; g < n_chunks; g++)
if (mins[g] < mn) mn = mins[g];
if (mn == INT64_MAX) return ray_typed_null(-x->type);
/* Preserve the column's storage width on the result. */
switch (x->type) {
case RAY_BOOL: return ray_bool((bool)mn);
case RAY_U8: return ray_u8((uint8_t)mn);
case RAY_I16: return ray_i16((int16_t)mn);
case RAY_I32: return ray_i32((int32_t)mn);
case RAY_DATE: return ray_date((int32_t)mn);
case RAY_TIME: return ray_time(mn);
case RAY_TIMESTAMP: return ray_timestamp(mn);
default: return ray_i64(mn);
}
}
}
}
AGG_VEC_VIA_DAG(x, ray_min_op);
}
if (!is_list(x)) return ray_error("type", NULL);
int64_t len = ray_len(x);
if (len == 0) return ray_error("domain", NULL);
Expand All @@ -350,7 +387,39 @@ ray_t* ray_max_fn(ray_t* x) {
if (ray_is_lazy(x)) return ray_lazy_append(x, OP_MAX);
if (RAY_IS_PARTED(x->type)) return agg_parted_minmax(x, 1);
if (ray_is_atom(x)) { ray_retain(x); return x; }
if (ray_is_vec(x)) AGG_VEC_VIA_DAG(x, ray_max_op);
if (ray_is_vec(x)) {
if (ray_index_kind(x) == RAY_IDX_CHUNK_ZONE) {
ray_index_t* ix = ray_index_payload(x->index);
if (ix->built_for_len == x->len) {
uint32_t n_chunks = ix->u.chunk_zone.n_chunks;
if (ix->u.chunk_zone.is_f64) {
const double* maxs = (const double*)ray_data(ix->u.chunk_zone.maxs);
double mx = -INFINITY;
for (uint32_t g = 0; g < n_chunks; g++)
if (maxs[g] > mx) mx = maxs[g];
if (mx == -INFINITY) return ray_typed_null(-RAY_F64);
return make_f64(mx);
} else {
const int64_t* maxs = (const int64_t*)ray_data(ix->u.chunk_zone.maxs);
int64_t mx = INT64_MIN;
for (uint32_t g = 0; g < n_chunks; g++)
if (maxs[g] > mx) mx = maxs[g];
if (mx == INT64_MIN) return ray_typed_null(-x->type);
switch (x->type) {
case RAY_BOOL: return ray_bool((bool)mx);
case RAY_U8: return ray_u8((uint8_t)mx);
case RAY_I16: return ray_i16((int16_t)mx);
case RAY_I32: return ray_i32((int32_t)mx);
case RAY_DATE: return ray_date((int32_t)mx);
case RAY_TIME: return ray_time(mx);
case RAY_TIMESTAMP: return ray_timestamp(mx);
default: return ray_i64(mx);
}
}
}
}
AGG_VEC_VIA_DAG(x, ray_max_op);
}
if (!is_list(x)) return ray_error("type", NULL);
int64_t len = ray_len(x);
if (len == 0) return ray_error("domain", NULL);
Expand Down
Loading
Loading