Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,33 @@ const variables: Record<string, (...args: any) => any> = {
: undefined
),

/**
* Database min pool size.
*/
dbMinPoolSize: ({
dataSource,
}: {
dataSource: string,
}) => {
if (process.env[keyByDataSource('CUBEJS_DB_MIN_POOL', dataSource)]) {
const min = parseInt(
`${process.env[keyByDataSource('CUBEJS_DB_MIN_POOL', dataSource)]}`,
10,
);
if (min < 0) {
throw new Error(
`The ${
keyByDataSource('CUBEJS_DB_MIN_POOL', dataSource)
} must be a positive number or zero.`
);
}

return min;
}

return undefined;
},

/**
* Max polling interval. Currently used in BigQuery and Databricks.
* TODO: clarify this env.
Expand Down
16 changes: 13 additions & 3 deletions packages/cubejs-backend-shared/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import genericPool, { Pool as GenericPool, Factory, Options } from 'generic-pool';

export { Factory, Options as PoolOptions } from 'generic-pool';

export class PoolTimeoutError extends Error {
public readonly poolName: string;

Expand All @@ -18,6 +16,14 @@ export class PoolTimeoutError extends Error {
}
}

export type PoolFactory<T> = Factory<T>;
export type PoolOptions = Options;
// Allow passing specific options from Pool options in the specific driver
export type PoolUserOptions = Pick<
PoolOptions,
'acquireTimeoutMillis' | 'evictionRunIntervalMillis' | 'softIdleTimeoutMillis' | 'idleTimeoutMillis'
>;

/**
* Uses composition instead of inheritance because generic-pool doesn't export
* a Pool class, the Pool type is an interface, not an extendable class.
Expand All @@ -27,7 +33,7 @@ export class Pool<T> {

private readonly name: string;

public constructor(name: string, factory: Factory<T>, options?: Options) {
public constructor(name: string, factory: PoolFactory<T>, options?: Options) {
this.name = name;
this.pool = genericPool.createPool<T>(factory, options);
}
Expand Down Expand Up @@ -96,6 +102,10 @@ export class Pool<T> {
return this.pool.min;
}

public get options(): { max: number; min: number } {
return { max: this.pool.max, min: this.pool.min };
}

// Event handling
public on(event: 'factoryCreateError' | 'factoryDestroyError', listener: (err: Error) => void): this {
this.pool.on(event, listener);
Expand Down
3 changes: 1 addition & 2 deletions packages/cubejs-crate-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
},
"dependencies": {
"@cubejs-backend/postgres-driver": "1.6.11",
"@cubejs-backend/shared": "1.6.11",
"pg": "^8.7.1"
"@cubejs-backend/shared": "1.6.11"
},
"license": "Apache-2.0",
"devDependencies": {
Expand Down
5 changes: 2 additions & 3 deletions packages/cubejs-crate-driver/src/CrateDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
* @fileoverview The `CrateDriver` and related types declaration.
*/

import { PostgresDriver } from '@cubejs-backend/postgres-driver';
import { PoolClient } from 'pg';
import { PostgresDriver, PgClient } from '@cubejs-backend/postgres-driver';

export class CrateDriver extends PostgresDriver {
protected async prepareConnection(conn: PoolClient, _options: any) {
protected async prepareConnection(conn: PgClient, _options: any) {
// Not supported by Crate yet... https://github.com/crate/crate/issues/12356
// await conn.query(`SET TIME ZONE '${this.config.storeTimezone || 'UTC'}'`);
// await conn.query(`SET statement_timeout TO ${options.executionTimeout}`);
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-jdbc-driver/src/JDBCDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,13 @@ export class JDBCDriver extends BaseDriver {
public async testConnection() {
let err;
let connection;

try {
connection = await this.pool._factory.create();
} catch (e: any) {
err = e.message || e;
}

if (err) {
throw new Error(err.toString());
} else {
Expand Down
2 changes: 0 additions & 2 deletions packages/cubejs-materialize-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
"@cubejs-backend/base-driver": "1.6.11",
"@cubejs-backend/postgres-driver": "1.6.11",
"@cubejs-backend/shared": "1.6.11",
"@types/pg": "^8.6.0",
"pg": "^8.6.0",
"semver": "^7.6.3"
},
"license": "Apache-2.0",
Expand Down
15 changes: 7 additions & 8 deletions packages/cubejs-materialize-driver/src/MaterializeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* @fileoverview The `MaterializeDriver` and related types declaration.
*/

import { PostgresDriver, PostgresDriverConfiguration } from '@cubejs-backend/postgres-driver';
import { PostgresDriver, PostgresDriverConfiguration, PgClient, type PgQueryResult } from '@cubejs-backend/postgres-driver';
import {
BaseDriver,
DatabaseStructure,
Expand All @@ -15,7 +15,6 @@ import {
StreamTableDataWithTypes,
TableStructure
} from '@cubejs-backend/base-driver';
import { PoolClient, QueryResult } from 'pg';
import { Readable } from 'stream';
import semver from 'semver';

Expand Down Expand Up @@ -91,7 +90,7 @@ export class MaterializeDriver extends PostgresDriver {
}

protected async prepareConnection(
conn: PoolClient
conn: PgClient
) {
await conn.query(`SET TIME ZONE '${this.config.storeTimezone || 'UTC'}'`);
// Support for statement_timeout is still pending. https://github.com/MaterializeInc/materialize/issues/10390
Expand Down Expand Up @@ -183,7 +182,7 @@ export class MaterializeDriver extends PostgresDriver {
return sortedData.reduce<DatabaseStructure>(this.informationColumnsSchemaReducer, {});
}

protected async* asyncFetcher<R extends unknown>(conn: PoolClient, cursorId: string): AsyncGenerator<R> {
protected async* asyncFetcher<R extends unknown>(conn: PgClient, cursorId: string): AsyncGenerator<R> {
const timeout = `${this.config.executionTimeout ? <number>(this.config.executionTimeout) * 1000 : 600000} milliseconds`;
const queryParams = {
text: `FETCH 1000 ${cursorId} WITH (TIMEOUT='${timeout}');`,
Expand All @@ -193,7 +192,7 @@ export class MaterializeDriver extends PostgresDriver {
let finish = false;

while (!finish) {
const results: QueryResult<any> | undefined = await conn.query(queryParams);
const results: PgQueryResult<any> | undefined = await conn.query(queryParams);
const { rows, rowCount } = results;

if (rowCount === 0) {
Expand All @@ -206,11 +205,11 @@ export class MaterializeDriver extends PostgresDriver {
}
}

private async releaseStream(conn: PoolClient): Promise<void> {
private async releaseStream(conn: PgClient): Promise<void> {
try {
await conn.query('COMMIT;', []);
} finally {
await conn.release();
await this.pool.release(conn);
}
}

Expand All @@ -219,7 +218,7 @@ export class MaterializeDriver extends PostgresDriver {
values: unknown[],
{ highWaterMark }: StreamOptions
): Promise<ReadableStreamTableDataWithTypes> {
const conn = await this.pool.connect();
const conn = await this.pool.acquire();
try {
const cursorId = 'mz_cursor';
await this.prepareConnection(conn);
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-postgres-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
"dependencies": {
"@cubejs-backend/base-driver": "1.6.11",
"@cubejs-backend/shared": "1.6.11",
"@types/pg": "^8.6.0",
"@types/pg": "^8.16.0",
"@types/pg-query-stream": "^1.0.3",
"moment": "^2.24.0",
"pg": "^8.6.0",
"pg": "^8.18.0",
"pg-query-stream": "^4.1.0"
},
"license": "Apache-2.0",
Expand Down
18 changes: 18 additions & 0 deletions packages/cubejs-postgres-driver/src/PgClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Client, QueryResult, ClientConfig, QueryResultRow } from 'pg';

export class PgClient extends Client {
public isEnding(): boolean {
return (this as any)._ending;
}

public isEnded(): boolean {
return (this as any)._ended;
}

public isQueryable(): boolean {
return (this as any)._queryable;
}
}

export type PgClientConfig = ClientConfig;
export type PgQueryResult<T extends QueryResultRow = any> = QueryResult<T>;
Loading
Loading