Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions drizzle-kit/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,53 @@ export type DrizzlePgDB = DB & {
proxy: Proxy;
migrate: (config: string | MigrationConfig) => Promise<void>;
};
export type PreparePgDBOptions = {
queryConcurrency?: number;
};
export type DrizzlePgDBIntrospectSchema = Omit<
PgSchemaKit,
'internal'
>;

function createConcurrencyLimiter(concurrency?: number) {
if (concurrency === undefined) {
return <T>(fn: () => Promise<T>) => fn();
}

if (!Number.isInteger(concurrency) || concurrency < 1) {
throw new RangeError('queryConcurrency must be a positive integer');
}

let activeCount = 0;
const queue: Array<() => void> = [];

const runNext = () => {
if (activeCount >= concurrency) return;

const next = queue.shift();
if (!next) return;

activeCount += 1;
next();
};

return <T>(fn: () => Promise<T>) => {
return new Promise<T>((resolve, reject) => {
queue.push(() => {
Promise.resolve()
.then(fn)
.then(resolve, reject)
.finally(() => {
activeCount -= 1;
runNext();
});
});

runNext();
});
};
}

export const introspectPgDB = async (
db: DrizzlePgDB,
filters: string[],
Expand Down Expand Up @@ -86,6 +128,7 @@ export const introspectPgDB = async (

export const preparePgDB = async (
pool: import('pg').Pool | import('pg').PoolClient,
options: PreparePgDBOptions = {},
): Promise<
DrizzlePgDB
> => {
Expand Down Expand Up @@ -119,22 +162,27 @@ export const preparePgDB = async (
const migrateFn = async (config: MigrationConfig) => {
return migrate(db, config);
};
const limitQuery = createConcurrencyLimiter(options.queryConcurrency);

const query = async (sql: string, params?: any[]) => {
const result = await pool.query({
text: sql,
values: params ?? [],
types,
const result = await limitQuery(() => {
return pool.query({
text: sql,
values: params ?? [],
types,
});
});
return result.rows;
};

const proxy: Proxy = async (params: ProxyParams) => {
const result = await pool.query({
text: params.sql,
values: params.params,
...(params.mode === 'array' && { rowMode: 'array' }),
types,
const result = await limitQuery(() => {
return pool.query({
text: params.sql,
values: params.params,
...(params.mode === 'array' && { rowMode: 'array' }),
types,
});
});
return result.rows;
};
Expand Down
94 changes: 94 additions & 0 deletions drizzle-kit/tests/api.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { describe, expect, test, vi } from 'vitest';
import { preparePgDB } from '../src/api';

vi.mock('pg', () => ({
default: {
types: {
builtins: {
DATE: 1082,
INTERVAL: 1186,
TIMESTAMP: 1114,
TIMESTAMPTZ: 1184,
},
getTypeParser: vi.fn(() => (value: unknown) => value),
},
},
}));

vi.mock('drizzle-orm/node-postgres', () => ({
drizzle: vi.fn().mockReturnValue({}),
}));

vi.mock('drizzle-orm/node-postgres/migrator', () => ({
migrate: vi.fn(),
}));

function createObservedPool() {
let activeQueries = 0;
let maxActiveQueries = 0;

const query = vi.fn(async (input: { text: string }) => {
activeQueries += 1;
maxActiveQueries = Math.max(maxActiveQueries, activeQueries);

await new Promise((resolve) => setTimeout(resolve, 10));

activeQueries -= 1;
return { rows: [input.text] };
});

return {
pool: { query },
query,
getMaxActiveQueries: () => maxActiveQueries,
};
}

describe('preparePgDB', () => {
test('does not limit query concurrency by default', async () => {
const observed = createObservedPool();
const db = await preparePgDB(observed.pool as any);

await Promise.all([
db.query('select 1'),
db.query('select 2'),
db.query('select 3'),
db.query('select 4'),
]);

expect(observed.query).toHaveBeenCalledTimes(4);
expect(observed.getMaxActiveQueries()).toBe(4);
});

test('limits query and proxy calls with queryConcurrency', async () => {
const observed = createObservedPool();
const db = await preparePgDB(observed.pool as any, {
queryConcurrency: 2,
});

await Promise.all([
db.query('select 1'),
db.query('select 2'),
db.query('select 3'),
db.proxy({ mode: 'array', params: [], sql: 'select 4' }),
db.proxy({ mode: 'object', params: [], sql: 'select 5' }),
]);

expect(observed.query).toHaveBeenCalledTimes(5);
expect(observed.getMaxActiveQueries()).toBe(2);
});

test('rejects invalid queryConcurrency values', async () => {
const observed = createObservedPool();

await expect(
preparePgDB(observed.pool as any, { queryConcurrency: 0 }),
).rejects.toThrow('queryConcurrency must be a positive integer');
await expect(
preparePgDB(observed.pool as any, { queryConcurrency: -1 }),
).rejects.toThrow('queryConcurrency must be a positive integer');
await expect(
preparePgDB(observed.pool as any, { queryConcurrency: 1.5 }),
).rejects.toThrow('queryConcurrency must be a positive integer');
});
});