Skip to content

Commit 26175f6

Browse files
committed
feat: implement aggregation functionality across data connectors and operational resource
1 parent f351678 commit 26175f6

6 files changed

Lines changed: 479 additions & 13 deletions

File tree

adminforth/dataConnectors/baseConnector.ts

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import {
2-
AdminForthResource, IAdminForthDataSourceConnectorBase,
3-
AdminForthResourceColumn,
4-
IAdminForthSort, IAdminForthSingleFilter, IAdminForthAndOrFilter,
5-
AdminForthConfig
1+
import {
2+
AdminForthResource, IAdminForthDataSourceConnectorBase,
3+
AdminForthResourceColumn,
4+
IAdminForthSort, IAdminForthSingleFilter, IAdminForthAndOrFilter,
5+
AdminForthConfig,
6+
IAggregationRule, IGroupByRule, IGroupByDateTrunc,
67
} from "../types/Back.js";
78

89

@@ -252,16 +253,94 @@ export default class AdminForthBaseConnector implements IAdminForthDataSourceCon
252253
}
253254
}
254255

255-
getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: {
256-
resource: AdminForthResource,
257-
limit: number,
258-
offset: number,
259-
sort: IAdminForthSort[],
256+
getDataWithOriginalTypes({ resource, limit, offset, sort, filters }: {
257+
resource: AdminForthResource,
258+
limit: number,
259+
offset: number,
260+
sort: IAdminForthSort[],
260261
filters: IAdminForthAndOrFilter,
261262
}): Promise<any[]> {
262263
throw new Error('Method not implemented.');
263264
}
264265

266+
getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
267+
resource: AdminForthResource,
268+
filters: IAdminForthAndOrFilter,
269+
aggregations: { [alias: string]: IAggregationRule },
270+
groupBy?: IGroupByRule,
271+
}): Promise<Array<{ group?: string, [key: string]: any }>> {
272+
throw new Error('getAggregateWithOriginalTypes() not implemented for this connector.');
273+
}
274+
275+
private validateAggregateParams(
276+
resource: AdminForthResource,
277+
aggregations: { [alias: string]: IAggregationRule },
278+
groupBy?: IGroupByRule,
279+
): void {
280+
const VALID_ALIAS = /^[a-zA-Z_][a-zA-Z0-9_]*$/;
281+
const VALID_OPERATIONS = ['sum', 'count', 'avg', 'min', 'max', 'median'];
282+
const VALID_TRUNCATIONS = ['day', 'week', 'month', 'year'];
283+
const VALID_TIMEZONE = /^[a-zA-Z_\/\-\+0-9]+$/;
284+
const columnNames = new Set(resource.dataSourceColumns.map(c => c.name));
285+
286+
const assertColumn = (field: string, context: string) => {
287+
if (!columnNames.has(field)) {
288+
throw new Error(`${context}: unknown column "${field}". Available: ${[...columnNames].join(', ')}`);
289+
}
290+
};
291+
292+
for (const [alias, rule] of Object.entries(aggregations)) {
293+
if (!VALID_ALIAS.test(alias)) {
294+
throw new Error(`Invalid aggregation alias "${alias}". Must match ${VALID_ALIAS}`);
295+
}
296+
if (!VALID_OPERATIONS.includes(rule.operation)) {
297+
throw new Error(`Invalid aggregation operation "${rule.operation}". Must be one of: ${VALID_OPERATIONS.join(', ')}`);
298+
}
299+
if (rule.operation !== 'count') {
300+
if (!rule.field) {
301+
throw new Error(`Aggregation "${alias}" with operation "${rule.operation}" requires a field`);
302+
}
303+
assertColumn(rule.field, `Aggregation "${alias}"`);
304+
}
305+
}
306+
307+
if (groupBy) {
308+
if (groupBy.type === 'field') {
309+
assertColumn(groupBy.field, 'GroupBy.Field');
310+
} else if (groupBy.type === 'date_trunc') {
311+
const g = groupBy as IGroupByDateTrunc;
312+
assertColumn(g.field, 'GroupBy.DateTrunc');
313+
if (!VALID_TRUNCATIONS.includes(g.truncation)) {
314+
throw new Error(`Invalid truncation "${g.truncation}". Must be one of: ${VALID_TRUNCATIONS.join(', ')}`);
315+
}
316+
if (g.timezone && !VALID_TIMEZONE.test(g.timezone)) {
317+
throw new Error(`Invalid timezone "${g.timezone}". Must be a valid IANA timezone name`);
318+
}
319+
} else {
320+
throw new Error(`Unknown groupBy type "${(groupBy as any).type}"`);
321+
}
322+
}
323+
}
324+
325+
async aggregate({ resource, filters, aggregations, groupBy }: {
326+
resource: AdminForthResource,
327+
filters: IAdminForthAndOrFilter,
328+
aggregations: { [alias: string]: IAggregationRule },
329+
groupBy?: IGroupByRule,
330+
}): Promise<Array<{ group?: string, [key: string]: any }>> {
331+
this.validateAggregateParams(resource, aggregations, groupBy);
332+
333+
let normalizedFilters = filters;
334+
if (filters) {
335+
const filterValidation = this.validateAndNormalizeFilters(filters, resource);
336+
if (!filterValidation.ok) {
337+
throw new Error(filterValidation.error);
338+
}
339+
normalizedFilters = filterValidation.normalizedFilters as IAdminForthAndOrFilter;
340+
}
341+
return this.getAggregateWithOriginalTypes({ resource, filters: normalizedFilters, aggregations, groupBy });
342+
}
343+
265344
getCount({ resource, filters }: { resource: AdminForthResource; filters: IAdminForthAndOrFilter; }): Promise<number> {
266345
throw new Error('Method not implemented.');
267346
}

adminforth/dataConnectors/postgres.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import dayjs from 'dayjs';
2-
import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig } from '../types/Back.js';
2+
import { AdminForthResource, IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthDataSourceConnector, AdminForthConfig, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
33
import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections, } from '../types/Common.js';
44
import AdminForthBaseConnector from './baseConnector.js';
55
import pkg from 'pg';
@@ -372,6 +372,56 @@ class PostgresConnector extends AdminForthBaseConnector implements IAdminForthDa
372372
}
373373

374374

375+
async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
376+
resource: AdminForthResource,
377+
filters: IAdminForthAndOrFilter,
378+
aggregations: { [alias: string]: IAggregationRule },
379+
groupBy?: IGroupByRule,
380+
}): Promise<Array<{ group?: string, [key: string]: any }>> {
381+
const tableName = resource.table;
382+
const selectParts: string[] = [];
383+
let groupExpr: string | null = null;
384+
385+
if (groupBy?.type === 'date_trunc') {
386+
const g = groupBy as IGroupByDateTrunc;
387+
const tz = g.timezone ?? 'UTC';
388+
const col = resource.dataSourceColumns.find(c => c.name === g.field);
389+
const hasTZ = (col as any)?._baseTypeDebug?.includes('with time zone');
390+
const innerExpr = hasTZ
391+
? `"${g.field}" AT TIME ZONE '${tz}'`
392+
: `"${g.field}" AT TIME ZONE 'UTC' AT TIME ZONE '${tz}'`;
393+
const fieldExpr = `DATE_TRUNC('${g.truncation}', ${innerExpr})`;
394+
groupExpr = `TO_CHAR(${fieldExpr}, 'YYYY-MM-DD')`;
395+
selectParts.push(`${groupExpr} AS "group"`);
396+
} else if (groupBy?.type === 'field') {
397+
const g = groupBy as IGroupByField;
398+
groupExpr = `"${g.field}"`;
399+
selectParts.push(`${groupExpr} AS "group"`);
400+
}
401+
402+
for (const [alias, rule] of Object.entries(aggregations)) {
403+
switch (rule.operation) {
404+
case 'sum': selectParts.push(`SUM("${rule.field}") AS "${alias}"`); break;
405+
case 'count': selectParts.push(`COUNT(*) AS "${alias}"`); break;
406+
case 'avg': selectParts.push(`AVG("${rule.field}") AS "${alias}"`); break;
407+
case 'min': selectParts.push(`MIN("${rule.field}") AS "${alias}"`); break;
408+
case 'max': selectParts.push(`MAX("${rule.field}") AS "${alias}"`); break;
409+
case 'median': selectParts.push(`PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "${rule.field}") AS "${alias}"`); break;
410+
}
411+
}
412+
413+
const { sql: where, values: filterValues } = this.whereClauseAndValues(resource, filters);
414+
let query = `SELECT ${selectParts.join(', ')} FROM "${tableName}" ${where}`;
415+
416+
if (groupExpr) {
417+
query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
418+
}
419+
420+
dbLogger.trace(`🪲📜 PG AGG Q: ${query}, params: ${JSON.stringify(filterValues)}`);
421+
const stmt = await this.client.query(query, filterValues);
422+
return stmt.rows;
423+
}
424+
375425
async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise<any[]> {
376426
const columns = resource.dataSourceColumns.map((col) => `"${col.name}"`).join(', ');
377427
const tableName = resource.table;

adminforth/dataConnectors/sqlite.ts

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import betterSqlite3 from 'better-sqlite3';
2-
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn, AdminForthConfig } from '../types/Back.js';
2+
import { IAdminForthDataSourceConnector, IAdminForthSingleFilter, IAdminForthAndOrFilter, AdminForthResource, AdminForthResourceColumn, AdminForthConfig, IAggregationRule, IGroupByRule, IGroupByDateTrunc, IGroupByField } from '../types/Back.js';
33
import AdminForthBaseConnector from './baseConnector.js';
44
import dayjs from 'dayjs';
55
import { AdminForthDataTypes, AdminForthFilterOperators, AdminForthSortDirections } from '../types/Common.js';
@@ -299,6 +299,123 @@ class SQLiteConnector extends AdminForthBaseConnector implements IAdminForthData
299299
return filter.subFilters.length ? `WHERE ${this.getFilterString(filter)}` : '';
300300
}
301301

302+
private _dateGroupKey(rawValue: any, underlineType: string, truncation: string, timezone: string): string {
303+
const date = (underlineType === 'timestamp' || underlineType === 'int')
304+
? new Date(Number(rawValue) * 1000)
305+
: new Date(rawValue);
306+
307+
const fmt = (opts: Intl.DateTimeFormatOptions) =>
308+
new Intl.DateTimeFormat('en', { timeZone: timezone, ...opts }).formatToParts(date);
309+
310+
const get = (parts: Intl.DateTimeFormatPart[], type: string) =>
311+
parts.find(p => p.type === type)?.value ?? '';
312+
313+
const dateParts = fmt({ year: 'numeric', month: '2-digit', day: '2-digit', weekday: 'short' });
314+
const year = get(dateParts, 'year');
315+
const month = get(dateParts, 'month');
316+
const day = get(dateParts, 'day');
317+
const dateStr = `${year}-${month}-${day}`;
318+
319+
switch (truncation) {
320+
case 'day': return dateStr;
321+
case 'week': {
322+
const dowMap: Record<string, number> = { Sun: 0, Mon: 1, Tue: 2, Wed: 3, Thu: 4, Fri: 5, Sat: 6 };
323+
const dow = dowMap[get(dateParts, 'weekday')] ?? 0;
324+
const daysBack = dow === 0 ? 6 : dow - 1; // rewind to Monday (ISO)
325+
const [y, m, d] = dateStr.split('-').map(Number);
326+
return new Date(Date.UTC(y, m - 1, d - daysBack)).toISOString().split('T')[0];
327+
}
328+
case 'month': return `${year}-${month}-01`;
329+
case 'year': return `${year}-01-01`;
330+
default: return dateStr;
331+
}
332+
}
333+
334+
async getAggregateWithOriginalTypes({ resource, filters, aggregations, groupBy }: {
335+
resource: AdminForthResource,
336+
filters: IAdminForthAndOrFilter,
337+
aggregations: { [alias: string]: IAggregationRule },
338+
groupBy?: IGroupByRule,
339+
}): Promise<Array<{ group?: string, [key: string]: any }>> {
340+
const tableName = resource.table;
341+
const where = this.whereClause(filters);
342+
const filterValues = this.getFilterParams(filters);
343+
344+
if (!groupBy || groupBy.type === 'field') {
345+
const selectParts: string[] = [];
346+
let groupExpr: string | null = null;
347+
348+
if (groupBy?.type === 'field') {
349+
const g = groupBy as IGroupByField;
350+
groupExpr = `"${g.field}"`;
351+
selectParts.push(`${groupExpr} AS "group"`);
352+
}
353+
354+
for (const [alias, rule] of Object.entries(aggregations)) {
355+
switch (rule.operation) {
356+
case 'sum': selectParts.push(`SUM("${rule.field}") AS "${alias}"`); break;
357+
case 'count': selectParts.push(`COUNT(*) AS "${alias}"`); break;
358+
case 'avg': selectParts.push(`AVG("${rule.field}") AS "${alias}"`); break;
359+
case 'min': selectParts.push(`MIN("${rule.field}") AS "${alias}"`); break;
360+
case 'max': selectParts.push(`MAX("${rule.field}") AS "${alias}"`); break;
361+
case 'median': throw new Error('Aggregates.median() with GroupBy.Field is not supported in SQLite.');
362+
}
363+
}
364+
365+
let query = `SELECT ${selectParts.join(', ')} FROM ${tableName} ${where}`;
366+
if (groupExpr) query += ` GROUP BY ${groupExpr} ORDER BY ${groupExpr} ASC`;
367+
dbLogger.trace(`🪲📜 SQLITE AGG Q: ${query}, params: ${JSON.stringify(filterValues)}`);
368+
return this.client.prepare(query).all([...filterValues]);
369+
}
370+
371+
const g = groupBy as IGroupByDateTrunc;
372+
const timezone = g.timezone ?? 'UTC';
373+
const col = resource.dataSourceColumns.find(c => c.name === g.field);
374+
const underlineType = col?._underlineType ?? 'varchar';
375+
376+
const neededFields = new Set<string>([g.field]);
377+
for (const rule of Object.values(aggregations)) {
378+
if (rule.field) neededFields.add(rule.field);
379+
}
380+
const selectCols = [...neededFields].map(f => `"${f}"`).join(', ');
381+
const rawQuery = `SELECT ${selectCols} FROM ${tableName} ${where}`;
382+
dbLogger.trace(`🪲📜 SQLITE AGG RAW Q: ${rawQuery}, params: ${JSON.stringify(filterValues)}`);
383+
const rawRows: any[] = this.client.prepare(rawQuery).all([...filterValues]);
384+
385+
const groups = new Map<string, any[]>();
386+
for (const row of rawRows) {
387+
const key = this._dateGroupKey(row[g.field], underlineType, g.truncation, timezone);
388+
if (!groups.has(key)) groups.set(key, []);
389+
groups.get(key)!.push(row);
390+
}
391+
392+
const results: Array<{ group: string, [key: string]: any }> = [];
393+
for (const [groupKey, rows] of groups) {
394+
const result: { group: string, [key: string]: any } = { group: groupKey };
395+
for (const [alias, rule] of Object.entries(aggregations)) {
396+
const nums = rule.field ? rows.map(r => Number(r[rule.field!] ?? 0)) : [];
397+
switch (rule.operation) {
398+
case 'count': result[alias] = rows.length; break;
399+
case 'sum': result[alias] = nums.reduce((s, v) => s + v, 0); break;
400+
case 'avg': result[alias] = nums.reduce((s, x) => s + x, 0) / nums.length; break;
401+
case 'min': result[alias] = Math.min(...nums); break;
402+
case 'max': result[alias] = Math.max(...nums); break;
403+
case 'median': {
404+
const sorted = nums.slice().sort((a, b) => a - b);
405+
const mid = Math.floor(sorted.length / 2);
406+
result[alias] = sorted.length % 2 === 0
407+
? (sorted[mid - 1] + sorted[mid]) / 2
408+
: sorted[mid];
409+
break;
410+
}
411+
}
412+
}
413+
results.push(result);
414+
}
415+
416+
return results.sort((a, b) => a.group.localeCompare(b.group));
417+
}
418+
302419
async getDataWithOriginalTypes({ resource, limit, offset, sort, filters }): Promise<any[]> {
303420
const columns = resource.dataSourceColumns.map((col) => col.name).join(', ');
304421
const tableName = resource.table;

adminforth/modules/operationalResource.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthSort, IOperationalResource, IAdminForthDataSourceConnectorBase, AdminForthResource } from '../types/Back.js';
1+
import { IAdminForthSingleFilter, IAdminForthAndOrFilter, IAdminForthSort, IOperationalResource, IAdminForthDataSourceConnectorBase, AdminForthResource, IAggregationRule, IGroupByRule } from '../types/Back.js';
22
import { AdminForthFilterOperators } from '../types/Common.js';
33

44
function sortsIfSort(sort: IAdminForthSort | IAdminForthSort[]): IAdminForthSort[] {
@@ -61,6 +61,19 @@ export default class OperationalResource implements IOperationalResource {
6161
}
6262

6363

64+
async aggregate(
65+
filter: IAdminForthSingleFilter | IAdminForthAndOrFilter | Array<IAdminForthSingleFilter | IAdminForthAndOrFilter>,
66+
aggregations: { [alias: string]: IAggregationRule },
67+
groupBy?: IGroupByRule
68+
): Promise<Array<{ group?: string, [key: string]: any }>> {
69+
return this.dataConnector.aggregate({
70+
resource: this.resourceConfig,
71+
filters: this.dataConnector.validateAndNormalizeInputFilters(filter),
72+
aggregations,
73+
groupBy,
74+
});
75+
}
76+
6477
async count(filter?: IAdminForthSingleFilter | IAdminForthAndOrFilter | Array<IAdminForthSingleFilter | IAdminForthAndOrFilter> | undefined): Promise<number> {
6578
return await this.dataConnector.getCount({
6679
resource: this.resourceConfig,

0 commit comments

Comments
 (0)