Skip to content

Commit a1f7db1

Browse files
committed
feat: refactor median calculation in MySQL connector to use window functions and improve performance
1 parent ed5ae7e commit a1f7db1

1 file changed

Lines changed: 83 additions & 62 deletions

File tree

adminforth/dataConnectors/mysql.ts

Lines changed: 83 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -338,15 +338,6 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
338338
} : { sql: '', values: [] };
339339
}
340340

341-
private calculateMedian(values: number[]): number | null {
342-
if (!values.length) return null;
343-
const sorted = values.sort((a, b) => a - b);
344-
const mid = Math.floor(sorted.length / 2);
345-
return sorted.length % 2 === 0
346-
? (sorted[mid - 1] + sorted[mid]) / 2
347-
: sorted[mid];
348-
}
349-
350341
async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
351342
resource: AdminForthResource;
352343
filters: IAdminForthAndOrFilter;
@@ -355,7 +346,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
355346
}): Promise<Array<{ group?: string, [key: string]: any }>> {
356347
const tableName = resource.table;
357348
const selectParts: string[] = [];
358-
const medianAliases: string[] = [];
349+
const medianFields: { alias: string; field: string }[] = [];
359350
let groupExpr: string | null = null;
360351

361352
if (groupBy?.type === 'field') {
@@ -364,81 +355,111 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
364355
} else if (groupBy?.type === 'date_trunc') {
365356
const g = groupBy as IGroupByDateTrunc;
366357
const tz = g.timezone ?? 'UTC';
367-
358+
if (!/^[A-Za-z0-9/_+\-]+$/.test(tz)) {
359+
throw new Error(`Invalid timezone value: ${tz}`);
360+
}
368361
const innerExpr = `COALESCE(CONVERT_TZ(\`${g.field}\`, 'UTC', '${tz}'), \`${g.field}\`)`;
369-
370362
switch (g.truncation) {
371-
case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break;
363+
case 'day': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-%d')`; break;
372364
case 'month': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-%m-01')`; break;
373-
case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break;
374-
case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break;
365+
case 'year': groupExpr = `DATE_FORMAT(${innerExpr}, '%Y-01-01')`; break;
366+
case 'week': groupExpr = `DATE_FORMAT(DATE_SUB(${innerExpr}, INTERVAL WEEKDAY(${innerExpr}) DAY), '%Y-%m-%d')`; break;
375367
}
376-
377368
selectParts.push(`${groupExpr} AS \`group\``);
378369
}
379370

380371
for (const [alias, rule] of Object.entries(aggregations)) {
381372
const f = `\`${rule.field}\``;
382373
switch (rule.operation) {
383-
case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break;
384-
case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break;
385-
case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break;
386-
case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break;
387-
case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break;
388-
case 'median':
389-
selectParts.push(`GROUP_CONCAT(${f} ORDER BY ${f} ASC SEPARATOR ',') AS \`${alias}\``);
390-
medianAliases.push(alias);
391-
break;
374+
case 'sum': selectParts.push(`SUM(${f}) AS \`${alias}\``); break;
375+
case 'count': selectParts.push(`COUNT(*) AS \`${alias}\``); break;
376+
case 'avg': selectParts.push(`AVG(${f}) AS \`${alias}\``); break;
377+
case 'min': selectParts.push(`MIN(${f}) AS \`${alias}\``); break;
378+
case 'max': selectParts.push(`MAX(${f}) AS \`${alias}\``); break;
379+
case 'median': medianFields.push({ alias, field: rule.field }); break;
392380
}
393381
}
394382

395383
const { sql: where, values: filterValues } = this.whereClauseAndValues(filters);
396-
let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`;
397-
if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
398-
399-
if (medianAliases.length > 0) {
400-
let connection;
401-
let originalMaxLen = null;
402-
try {
403-
connection = await this.client.getConnection();
404-
405-
const [originalLenRows]: any = await connection.execute("SELECT @@SESSION.group_concat_max_len as original_len");
406-
originalMaxLen = originalLenRows[0].original_len;
407-
408-
await connection.execute("SET SESSION group_concat_max_len = 1000000");
409-
410-
const [rows]: any = await connection.execute(query, filterValues);
411-
412-
return rows.map((row: any) => {
413-
medianAliases.forEach(alias => {
414-
const raw = row[alias];
415-
const nums = raw ? raw.split(',').map(Number).filter((n: number) => !isNaN(n)) : [];
416-
row[alias] = this.calculateMedian(nums);
417-
});
418-
return row;
419-
});
420-
421-
} finally {
422-
if (connection) {
423-
if (originalMaxLen !== null) {
424-
await connection.execute(`SET SESSION group_concat_max_len = ${originalMaxLen}`);
384+
385+
type AggRow = { group?: string } & Record<string, number | string | null>;
386+
387+
// Run non-median aggregations
388+
let rows: AggRow[] = [];
389+
const hasNonMedian = selectParts.length > (groupExpr ? 1 : 0);
390+
if (hasNonMedian) {
391+
let query = `SELECT ${selectParts.join(', ')} FROM \`${tableName}\` ${where}`;
392+
if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
393+
dbLogger.trace(`🪲📜 MySQL AGG Q: ${query} values: ${JSON.stringify(filterValues)}`);
394+
const [result] = await this.client.execute(query, filterValues);
395+
rows = result as AggRow[];
396+
}
397+
398+
// Run each median via window functions (MySQL 8+) — no session variables, no memory pressure
399+
for (const { alias, field } of medianFields) {
400+
const f = `\`${field}\``;
401+
const nullGuard = where ? `${where} AND ${f} IS NOT NULL` : `WHERE ${f} IS NOT NULL`;
402+
403+
let medianQuery: string;
404+
if (groupExpr) {
405+
medianQuery = `
406+
SELECT ${groupExpr} AS \`group\`, AVG(${f}) AS \`${alias}\`
407+
FROM (
408+
SELECT ${groupExpr}, ${f},
409+
ROW_NUMBER() OVER (PARTITION BY ${groupExpr} ORDER BY ${f}) AS rn,
410+
COUNT(*) OVER (PARTITION BY ${groupExpr}) AS cnt
411+
FROM \`${tableName}\` ${nullGuard}
412+
) t
413+
WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0))
414+
GROUP BY ${groupExpr}
415+
ORDER BY ${groupExpr} ASC
416+
`;
417+
} else {
418+
medianQuery = `
419+
SELECT AVG(${f}) AS \`${alias}\`
420+
FROM (
421+
SELECT ${f},
422+
ROW_NUMBER() OVER (ORDER BY ${f}) AS rn,
423+
COUNT(*) OVER () AS cnt
424+
FROM \`${tableName}\` ${nullGuard}
425+
) t
426+
WHERE rn IN (FLOOR((cnt + 1) / 2.0), CEIL((cnt + 1) / 2.0))
427+
`;
428+
}
429+
430+
dbLogger.trace(`🪲📜 MySQL MEDIAN Q: ${medianQuery} values: ${JSON.stringify(filterValues)}`);
431+
const [medianResult] = await this.client.execute(medianQuery, filterValues);
432+
const medianRows = medianResult as AggRow[];
433+
434+
if (groupExpr) {
435+
if (rows.length === 0) {
436+
rows = medianRows.map((r) => ({ group: r.group, [alias]: r[alias] }));
437+
} else {
438+
const byGroup = new Map(medianRows.map((r) => [String(r.group), r[alias]]));
439+
for (const row of rows) {
440+
row[alias] = byGroup.get(String(row.group)) ?? null;
425441
}
426-
connection.release();
442+
}
443+
} else {
444+
const medianVal = medianRows[0]?.[alias] ?? null;
445+
if (rows.length === 0) {
446+
rows = [{ [alias]: medianVal }];
447+
} else {
448+
rows[0][alias] = medianVal;
427449
}
428450
}
429-
} else {
430-
const [rows]: any = await this.client.execute(query, filterValues);
431-
return rows;
432451
}
452+
453+
return rows;
433454
}
434455

435456
async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise<any[]> {
436-
const columns = resource.dataSourceColumns.map((col) => `${col.name}`).join(', ');
457+
const columns = resource.dataSourceColumns.map((col: { name: string }) => `${col.name}`).join(', ');
437458
const tableName = resource.table;
438459

439460
const { sql: where, values: filterValues } = this.whereClauseAndValues(filters);
440461

441-
const orderBy = sort.length ? `ORDER BY ${sort.map((s) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : '';
462+
const orderBy = sort.length ? `ORDER BY ${sort.map((s: { field: string; direction: AdminForthSortDirections }) => `${s.field} ${this.SortDirectionsMap[s.direction]}`).join(', ')}` : '';
442463
let selectQuery = `SELECT ${columns} FROM ${tableName}`;
443464
if (where) selectQuery += ` ${where}`;
444465
if (orderBy) selectQuery += ` ${orderBy}`;
@@ -479,7 +500,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
479500
async getMinMaxForColumnsWithOriginalTypes({ resource, columns }) {
480501
const tableName = resource.table;
481502
const result = {};
482-
await Promise.all(columns.map(async (col) => {
503+
await Promise.all(columns.map(async (col: { name: string }) => {
483504
const q = `SELECT MIN(${col.name}) as min, MAX(${col.name}) as max FROM ${tableName}`;
484505
dbLogger.trace(`🪲📜 MySQL Q: ${q}`);
485506
const [results] = await this.client.execute(q);
@@ -504,7 +525,7 @@ class MysqlConnector extends AdminForthBaseConnector implements IAdminForthDataS
504525

505526
async updateRecordOriginalValues({ resource, recordId, newValues }) {
506527
const values = [...Object.values(newValues), recordId];
507-
const columnsWithPlaceholders = Object.keys(newValues).map((col, i) => `${col} = ?`).join(', ');
528+
const columnsWithPlaceholders = Object.keys(newValues).map((col) => `${col} = ?`).join(', ');
508529
const q = `UPDATE ${resource.table} SET ${columnsWithPlaceholders} WHERE ${this.getPrimaryKey(resource)} = ?`;
509530
dbLogger.trace(`🪲📜 MySQL Q: ${q} values: ${JSON.stringify(values)}`);
510531
await this.client.execute(q, values);

0 commit comments

Comments
 (0)