Skip to content
Merged
28 changes: 15 additions & 13 deletions bench/adapters/datafusion_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,25 @@ def _ensure_ctx(self) -> SessionContext:
def load_data(self, path: Path, table_name: str = "data") -> None:
ctx = self._ensure_ctx()
df_table = f"bench_{table_name}"
# register_csv reads lazily; force materialization by collecting
# into an in-memory table so subsequent timed queries don't pay
# for disk reads.
# Read CSV once and register the materialized RecordBatches as
# an in-memory table. Without this, register_csv produces a
# listing table that re-parses CSV on every query — page cache
# avoids disk I/O but the parse cost remains, tilting timing
# against DataFusion vs adapters that hold native columnar
# storage (duckdb/chdb/polars/pandas/rayforce all do).
try:
ctx.deregister_table(df_table)
except Exception:
pass
ctx.register_csv(df_table, str(path))
# Materialize: read all batches, register as memtable.
batches = ctx.sql(f"SELECT * FROM {df_table}").collect()
ctx.deregister_table(df_table)
from datafusion import RecordBatchStream # noqa: F401
# The simplest robust path: keep CSV registered but warm OS page
# cache by collecting once. Re-register to drop any stream state.
ctx.register_csv(df_table, str(path))
# Warm by collecting once (already done above).
del batches
csv_tmp = f"{df_table}__csv"
try:
ctx.deregister_table(csv_tmp)
except Exception:
pass
ctx.register_csv(csv_tmp, str(path))
batches = ctx.sql(f"SELECT * FROM {csv_tmp}").collect()
ctx.deregister_table(csv_tmp)
ctx.register_record_batches(df_table, [batches])
self._table_names[table_name] = df_table

def _get_table(self, name: str = "data") -> str:
Expand Down
39 changes: 28 additions & 11 deletions bench/adapters/questdb_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ def load_data(self, path: Path, table_name: str = "data") -> None:
float_cols = [name for name, dtype in df.schema.items() if dtype == pl.Float64]
str_cols = [name for name, dtype in df.schema.items() if dtype in (pl.Utf8, pl.String)]

# Use ILP for fast ingestion
# Use ILP for ingestion. Flush every FLUSH_EVERY rows so the
# server can start committing before send finishes — without
# periodic flush, all rows sit in the sender's local buffer until
# the with-block exits, making first-commit visibility race
# against the polling deadline below.
FLUSH_EVERY = 100_000
conf = f"tcp::addr={self._host}:{self._ilp_port};"
with self._Sender.from_conf(conf) as sender:
for row in df.iter_rows(named=True):
for i, row in enumerate(df.iter_rows(named=True)):
sender.row(
sql_table_name,
symbols={col: row[col] for col in str_cols},
Expand All @@ -109,16 +114,19 @@ def load_data(self, path: Path, table_name: str = "data") -> None:
},
at=self._ServerTimestamp,
)
if (i + 1) % FLUSH_EVERY == 0:
sender.flush()
sender.flush()

# ILP is async — the rows (and even the table itself, on a first
# write) aren't queryable until QuestDB commits them (default
# cadence ~1s). Block until count() matches the load, treating any
# error from the SELECT as "not visible yet".
# cadence ~1s). For multi-million-row loads the commit chases
# the send tail by tens of seconds; 5-minute deadline covers the
# canonical-join `big` (N=10M) without false ILP timeouts.
import time as _time
expected = df.height
actual = 0
deadline = _time.time() + 30
deadline = _time.time() + 300
while _time.time() < deadline:
try:
with self._conn.cursor() as cur:
Expand Down Expand Up @@ -253,9 +261,12 @@ def _load_right(self, path: Path) -> str:
float_cols = [n for n, d in df.schema.items() if d == pl.Float64]
str_cols = [n for n, d in df.schema.items() if d in (pl.Utf8, pl.String)]

# Periodic flush so server commits in parallel with send — same
# rationale as load_data().
FLUSH_EVERY = 100_000
conf = f"tcp::addr={self._host}:{self._ilp_port};"
with self._Sender.from_conf(conf) as sender:
for row in df.iter_rows(named=True):
for i, row in enumerate(df.iter_rows(named=True)):
sender.row(
right_table,
symbols={c: row[c] for c in str_cols},
Expand All @@ -265,11 +276,13 @@ def _load_right(self, path: Path) -> str:
},
at=self._ServerTimestamp,
)
if (i + 1) % FLUSH_EVERY == 0:
sender.flush()
sender.flush()

# Wait for ILP commit visibility (same dance as load_data).
expected = df.height
deadline = _time.time() + 30
deadline = _time.time() + 300
while _time.time() < deadline:
try:
with self._conn.cursor() as cur:
Expand Down Expand Up @@ -374,20 +387,24 @@ def run_sort_typed_full(self, csv_path: Path, dtype: str,
# Random N-character strings are high-cardinality — push them as
# STRING via ILP `columns`, not SYMBOL via `symbols` (Symbol is
# a dictionary type and chokes on N=1M unique values).
# Periodic flush so server commits in parallel with send.
FLUSH_EVERY = 100_000
conf = f"tcp::addr={self._host}:{self._ilp_port};"
with self._Sender.from_conf(conf) as sender:
for row in df.iter_rows(named=True):
for i, row in enumerate(df.iter_rows(named=True)):
v = row["v"]
sender.row(sort_table, columns={"v": v},
at=self._ServerTimestamp)
if (i + 1) % FLUSH_EVERY == 0:
sender.flush()
sender.flush()

# Wait for ILP commit visibility. 1M-row ILP commits can take a
# while; raise loudly on timeout so we don't sort an empty table
# Wait for ILP commit visibility. 10M-row ILP commits can take
# minutes; raise loudly on timeout so we don't sort an empty table
# and report fake 0.36ms / 0 rows.
expected = df.height
actual = 0
deadline = _time.time() + 120
deadline = _time.time() + 300
while _time.time() < deadline:
try:
with self._conn.cursor() as cur:
Expand Down
Loading