Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 197 additions & 52 deletions backend/src/config/database.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
/**
* database.ts — Issue #214
* database.ts
*
* Database query optimisation configuration and connection pool tuning.
* Provides:
* - Connection pool sizing based on environment
* - Slow query detection thresholds and alerting hooks
* - Query execution plan helpers (EXPLAIN wrapper)
* - Recommended index definitions for common query patterns
* - Prepared statement registry for high-frequency queries
* Database configuration, query profiling, connection pool tuning,
* and recommended composite indexes for AgenticPay.
*/

import { featureFlags } from './featureFlags.js';

// ── Pool configuration ─────────────────────────────────────────────────────────

export interface PoolConfig {
/** Maximum concurrent connections in the pool. */
max: number;
/** Minimum idle connections to keep warm. */
min: number;
/** ms to wait for a free connection before throwing (acquire timeout). */
acquireTimeoutMs: number;
/** ms an idle connection may sit before being closed. */
idleTimeoutMs: number;
/** ms the pool will try to create a connection before failing. */
createTimeoutMs: number;
/** Reap connections older than this (ms), regardless of idle state. */
maxConnectionAgeMs: number;
}

Expand All @@ -39,9 +30,9 @@ export function buildPoolConfig(env = process.env.NODE_ENV): PoolConfig {
max: envInt('DB_POOL_MAX', 50),
min: envInt('DB_POOL_MIN', 5),
acquireTimeoutMs: envInt('DB_ACQUIRE_TIMEOUT_MS', 10_000),
idleTimeoutMs: envInt('DB_IDLE_TIMEOUT_MS', 300_000), // 5 min
idleTimeoutMs: envInt('DB_IDLE_TIMEOUT_MS', 300_000),
createTimeoutMs: envInt('DB_CREATE_TIMEOUT_MS', 10_000),
maxConnectionAgeMs: envInt('DB_MAX_AGE_MS', 1_800_000), // 30 min
maxConnectionAgeMs: envInt('DB_MAX_AGE_MS', 1_800_000),
};
case 'staging':
return {
Expand Down Expand Up @@ -87,16 +78,6 @@ export function onSlowQuery(handler: SlowQueryHandler): void {
slowQueryHandlers.push(handler);
}

/**
* Wrap a database query function to track execution time and fire slow-query
* alerts when thresholds are exceeded.
*
* ```ts
* const rows = await withQueryTimer('SELECT * FROM payments WHERE id = $1', [id], () =>
* db.query('SELECT * FROM payments WHERE id = $1', [id])
* );
* ```
*/
export async function withQueryTimer<T>(
sql: string,
params: unknown[],
Expand All @@ -118,71 +99,116 @@ export async function withQueryTimer<T>(
timestamp: new Date(),
};
for (const handler of slowQueryHandlers) {
try { handler(event); } catch { /* never let alerting break the query path */ }
try { handler(event); } catch { }
}
}
}
}

// ── Default slow-query handler (console logging) ──────────────────────────────

onSlowQuery((event) => {
const label = event.severity === 'critical' ? '🔴 CRITICAL' : '🟡 SLOW';
console.warn(
`[db] ${label} query ${event.durationMs}ms: ${event.sql.slice(0, 120)}…`
);
const label = event.severity === 'critical' ? 'CRITICAL' : 'SLOW';
console.warn(`[db] ${label} query ${event.durationMs}ms: ${event.sql.slice(0, 120)}`);
});

// ── Recommended index definitions ─────────────────────────────────────────────
// ── Composite index definitions ────────────────────────────────────────────────

export interface IndexDefinition {
export interface CompositeIndex {
name: string;
table: string;
columns: string[];
description: string;
targetQuery: string;
unique?: boolean;
partial?: string; // WHERE clause
reason: string;
partial?: string;
}

/**
* Indexes that should exist for common AgenticPay query patterns.
* Use these definitions to generate migration scripts.
*/
export const RECOMMENDED_INDEXES: IndexDefinition[] = [
export const RECOMMENDED_INDEXES: CompositeIndex[] = [
{
name: 'idx_invoices_project_created',
table: 'invoices',
columns: ['project_id', 'created_at'],
description: 'Optimizes listing invoices by project ordered by date',
targetQuery: 'SELECT * FROM invoices WHERE project_id = ? ORDER BY created_at DESC',
},
{
name: 'idx_verifications_status_type',
table: 'verifications',
columns: ['status', 'verification_type'],
description: 'Filters verifications by status and type',
targetQuery: 'SELECT * FROM verifications WHERE status = ? AND verification_type = ?',
},
{
name: 'idx_transactions_account_ledger',
table: 'transactions',
columns: ['account_id', 'ledger_seq'],
description: 'Looks up transactions for an account sorted by ledger sequence',
targetQuery: 'SELECT * FROM transactions WHERE account_id = ? ORDER BY ledger_seq DESC',
},
{
name: 'idx_payments_recipient_status',
table: 'payments',
columns: ['tenant_id', 'created_at'],
reason: 'Hot path: list payments by tenant ordered by date',
columns: ['recipient', 'status'],
description: 'Finds pending payments for a recipient',
targetQuery: 'SELECT * FROM payments WHERE recipient = ? AND status = ?',
},
{
name: 'idx_payments_created_status',
table: 'payments',
columns: ['status'],
partial: "WHERE status IN ('pending', 'processing')",
reason: 'Background job: poll for non-terminal payments',
columns: ['created_at', 'status'],
description: 'Oldest pending payments for processing',
targetQuery: 'SELECT * FROM payments WHERE status = ? ORDER BY created_at ASC LIMIT ?',
},
{
name: 'idx_payments_tx_hash',
table: 'payments',
columns: ['tx_hash'],
unique: true,
reason: 'Idempotency and on-chain lookup by transaction hash',
description: 'Idempotency and on-chain lookup by transaction hash',
targetQuery: 'SELECT * FROM payments WHERE tx_hash = ?',
},
{
name: 'idx_sessions_user_expires',
table: 'sessions',
columns: ['user_id', 'expires_at'],
description: 'Finds active sessions for a user',
targetQuery: 'SELECT * FROM sessions WHERE user_id = ? AND expires_at > ?',
},
{
name: 'idx_refunds_invoice_created',
table: 'refunds',
columns: ['invoice_id', 'created_at'],
description: 'Lists refunds for an invoice ordered by date',
targetQuery: 'SELECT * FROM refunds WHERE invoice_id = ? ORDER BY created_at DESC',
},
{
name: 'idx_users_tenant_email',
table: 'users',
columns: ['tenant_id', 'email'],
unique: true,
reason: 'Login and uniqueness constraint per tenant',
description: 'Login and uniqueness constraint per tenant',
targetQuery: 'SELECT * FROM users WHERE tenant_id = ? AND email = ?',
},
{
name: 'idx_audit_logs_entity_created',
table: 'audit_logs',
columns: ['entity_id', 'created_at'],
reason: 'Audit trail queries per resource ordered by time',
description: 'Audit trail queries per resource ordered by time',
targetQuery: 'SELECT * FROM audit_logs WHERE entity_id = ? ORDER BY created_at DESC',
},
{
name: 'idx_gas_estimates_network_recorded',
table: 'gas_estimates',
columns: ['network', 'recorded_at'],
reason: 'Gas analytics aggregation by network and time window',
description: 'Gas analytics aggregation by network and time window',
targetQuery: 'SELECT * FROM gas_estimates WHERE network = ? ORDER BY recorded_at DESC',
},
];

export function getRecommendedIndexes(): CompositeIndex[] {
if (!featureFlags.evaluate('db-composite-indexes')) return [];
return RECOMMENDED_INDEXES;
}

// ── Prepared statement registry ───────────────────────────────────────────────

export const PREPARED_STATEMENTS = {
Expand Down Expand Up @@ -229,7 +255,126 @@ export function buildReplicaConfigs(): ReplicaConfig[] {
});
}

/** Returns true if the query is safe to route to a read replica. */
export function isReadQuery(sql: string): boolean {
return /^\s*(SELECT|WITH\s)/i.test(sql);
}

// ── Query Profiler ────────────────────────────────────────────────────────────

export interface QueryProfile {
query: string;
durationMs: number;
timestamp: string;
source: string;
rowsExamined?: number;
rowsReturned?: number;
}

export interface NPlusOneDetection {
source: string;
parentQuery: string;
childQueries: number;
threshold: number;
detectedAt: string;
}

class QueryProfiler {
private slowQueries: QueryProfile[] = [];
private allQueries: QueryProfile[] = [];
private maxSlowQueries = 100;
private maxAllQueries = 1000;
private readonly slowThresholdMs: number;

constructor(slowThresholdMs = 100) {
this.slowThresholdMs = slowThresholdMs;
}

isEnabled(): boolean {
return featureFlags.evaluate('db-query-profiling');
}

profile<T>(query: string, source: string, fn: () => Promise<T>): Promise<T> {
if (!this.isEnabled()) return fn();

const start = Date.now();
return fn().then((result) => {
const durationMs = Date.now() - start;
const profile: QueryProfile = { query, durationMs, timestamp: new Date().toISOString(), source };

this.allQueries.push(profile);
if (this.allQueries.length > this.maxAllQueries) this.allQueries.shift();

if (durationMs > this.slowThresholdMs) {
console.warn(`[QueryProfiler] SLOW QUERY (${durationMs.toFixed(0)}ms) [${source}]: ${query.substring(0, 200)}`);
this.slowQueries.push(profile);
if (this.slowQueries.length > this.maxSlowQueries) this.slowQueries.shift();
}

return result;
});
}

detectNPlusOne(source: string, parentFn: () => Promise<unknown[]>): Promise<unknown[]> {
if (!this.isEnabled()) return parentFn();
const originalQuery = this.allQueries[this.allQueries.length - 1]?.query || 'unknown';

return parentFn().then((results) => {
const total = this.allQueries.length;
if (total > 10 && results.length > 1) {
console.warn(`[QueryProfiler] N+1 DETECTED [${source}]: ${total} queries for ${results.length} results`);
console.warn(` Parent: ${originalQuery.substring(0, 150)}`);
}
return results;
});
}

getSlowQueries(): QueryProfile[] { return [...this.slowQueries]; }

getTopSlowQueries(n = 10): QueryProfile[] {
return [...this.slowQueries].sort((a, b) => b.durationMs - a.durationMs).slice(0, n);
}

getAllQueries(): QueryProfile[] { return [...this.allQueries]; }

getStats() {
const total = this.allQueries.length;
const slow = this.slowQueries.length;
const avgDuration = total > 0 ? this.allQueries.reduce((sum, q) => sum + q.durationMs, 0) / total : 0;
return {
totalQueries: total,
slowQueries: slow,
slowPercentage: total > 0 ? (slow / total) * 100 : 0,
avgDurationMs: avgDuration.toFixed(2),
p95DurationMs: this.calculatePercentile(95),
slowThresholdMs: this.slowThresholdMs,
};
}

private calculatePercentile(pct: number): number {
if (this.allQueries.length === 0) return 0;
const sorted = [...this.allQueries].sort((a, b) => a.durationMs - b.durationMs);
const idx = Math.ceil((pct / 100) * sorted.length) - 1;
return sorted[Math.max(0, idx)].durationMs;
}

reset(): void {
this.slowQueries = [];
this.allQueries = [];
}
}

export const queryProfiler = new QueryProfiler(
Number(process.env.DB_SLOW_QUERY_THRESHOLD_MS) || 100,
);

export async function withQueryProfiling<T>(
query: string,
source: string,
fn: () => Promise<T>,
): Promise<T> {
return queryProfiler.profile(query, source, fn);
}

export function getQueryProfiler(): QueryProfiler {
return queryProfiler;
}
5 changes: 5 additions & 0 deletions backend/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const envSchema = z.object({
STRIPE_SECRET_KEY: z.string().default(''),
STRIPE_WEBHOOK_SECRET: z.string().default(''),
STRIPE_PUBLISHABLE_KEY: z.string().default(''),
REDIS_URL: z.string().default(''),
REDIS_ENABLED: z.coerce.string().transform((val) => val === 'true').default('false'),
CACHE_WARMING_ENABLED: z.coerce.string().transform((val) => val === 'true').default('false'),
DB_QUERY_LOGGING_ENABLED: z.coerce.string().transform((val) => val === 'true').default('false'),
DB_SLOW_QUERY_THRESHOLD_MS: z.coerce.number().default(100),
});

export type Env = z.infer<typeof envSchema>;
Expand Down
38 changes: 37 additions & 1 deletion backend/src/config/featureFlags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ export type FeatureFlagName =
| 'message-queue'
| 'rate-limit-tiering'
| 'sla-tracking'
| 'response-caching';
| 'response-caching'
| 'multi-level-cache'
| 'single-flight'
| 'cache-warming'
| 'db-query-profiling'
| 'db-composite-indexes';

// ─── Runtime state ────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -162,6 +167,37 @@ const FLAG_DEFINITIONS: FeatureFlagDefinition[] = [
strategy: 'percentage',
rolloutPercentage: 100,
},
{
name: 'multi-level-cache',
description: 'In-memory + Redis multi-level caching with TTL',
defaultEnabled: true,
strategy: 'percentage',
rolloutPercentage: 100,
},
{
name: 'single-flight',
description: 'Single-flight pattern to prevent cache stampede on hot keys',
defaultEnabled: true,
strategy: 'all',
},
{
name: 'cache-warming',
description: 'Pre-warm cache on application startup for known endpoints',
defaultEnabled: process.env.CACHE_WARMING_ENABLED === 'true',
strategy: 'all',
},
{
name: 'db-query-profiling',
description: 'Database query profiling and slow query logging',
defaultEnabled: process.env.DB_QUERY_LOGGING_ENABLED === 'true',
strategy: 'all',
},
{
name: 'db-composite-indexes',
description: 'Composite index management for optimized query patterns',
defaultEnabled: true,
strategy: 'all',
},
];

// ─── Consistent-hash helper ───────────────────────────────────────────────────
Expand Down
Loading
Loading