Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1393,8 +1393,8 @@ class ApiGateway {
});

await res(queryType === QueryTypeEnum.REGULAR_QUERY ?
{ sql: toQuery(sqlQueries[0]) } :
sqlQueries.map((sqlQuery) => ({ sql: toQuery(sqlQuery) })));
{ sql: toQuery(sqlQueries[0]), dataSource: sqlQueries[0].dataSource } :
sqlQueries.map((sqlQuery) => ({ sql: toQuery(sqlQuery), dataSource: sqlQuery.dataSource })));
} catch (e: any) {
this.handleError({
e, context, query, res, requestStarted
Expand Down
37 changes: 37 additions & 0 deletions packages/cubejs-api-gateway/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,43 @@ describe('API Gateway', () => {
});
});

describe('/v1/sql endpoint dataSource', () => {
test('returns dataSource for single query', async () => {
const { app } = await createApiGateway();
const query = JSON.stringify({ measures: ['Foo.bar'] });

const res = await request(app)
.get(`/cubejs-api/v1/sql?query=${encodeURIComponent(query)}`)
.set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M')
.expect(200);

expect(res.body).toHaveProperty('sql');
expect(res.body).toHaveProperty('dataSource');
expect(res.body.dataSource).toBe('default');
});

test('returns dataSource for blending query', async () => {
const { app } = await createApiGateway();
const query = JSON.stringify([
{ measures: ['Foo.bar'], timeDimensions: [{ dimension: 'Foo.time', granularity: 'day' }] },
{ measures: ['Foo.bar'], timeDimensions: [{ dimension: 'Foo.time', granularity: 'day' }] }
]);

const res = await request(app)
.get(`/cubejs-api/v1/sql?query=${encodeURIComponent(query)}`)
.set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M')
.expect(200);

expect(Array.isArray(res.body)).toBe(true);
expect(res.body.length).toBe(2);
res.body.forEach((item: any) => {
expect(item).toHaveProperty('sql');
expect(item).toHaveProperty('dataSource');
expect(item.dataSource).toBe('default');
});
});
});

describe('/cubejs-system/v1', () => {
const scheduledRefreshContextsFactory = () => ([
{ securityContext: { foo: 'bar' } },
Expand Down
3 changes: 2 additions & 1 deletion packages/cubejs-api-gateway/test/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ export const compilerApi = jest.fn().mockImplementation(async () => ({
foo__bar: 'Foo.bar',
foo__time: 'Foo.time',
},
order: [{ id: 'id', desc: true, }]
order: [{ id: 'id', desc: true, }],
dataSource: 'default'
};
},

Expand Down
48 changes: 35 additions & 13 deletions packages/cubejs-cubestore-driver/src/WebSocketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,29 @@ import {
HttpTable
} from '../codegen';

interface SentMessage {
resolve: (value: any) => void;
reject: (reason?: any) => void;
buffer: Uint8Array;
}

interface CubeStoreWebSocket extends WebSocket {
readyPromise: Promise<CubeStoreWebSocket>;
lastHeartBeat: Date;
sentMessages: Record<number, SentMessage>;
sendAsync: (message: Uint8Array) => Promise<void>;
}

export class WebSocketConnection {
protected messageCounter: number;

protected maxConnectRetries: number;
protected readonly maxConnectRetries: number;

protected noHeartBeatTimeout: number;
protected readonly noHeartBeatTimeout: number;

protected currentConnectionTry: number;

protected webSocket: any;
protected webSocket: CubeStoreWebSocket | null = null;

private readonly url: string;

Expand All @@ -38,10 +51,10 @@ export class WebSocketConnection {
this.connectionId = uuidv4();
}

protected async initWebSocket() {
protected async initWebSocket(): Promise<CubeStoreWebSocket> {
if (!this.webSocket) {
const webSocket: any = new WebSocket(this.url);
webSocket.readyPromise = new Promise<WebSocket>((resolve, reject) => {
const webSocket = new WebSocket(this.url) as CubeStoreWebSocket;
webSocket.readyPromise = new Promise<CubeStoreWebSocket>((resolve, reject) => {
webSocket.lastHeartBeat = new Date();
const pingInterval = setInterval(() => {
if (webSocket.readyState === WebSocket.OPEN) {
Expand All @@ -53,12 +66,15 @@ export class WebSocketConnection {
}
}, 5000);

webSocket.sendAsync = async (message) => new Promise<void>((resolveSend, rejectSend) => {
webSocket.sendAsync = async (message: Uint8Array) => new Promise<void>((resolveSend, rejectSend) => {
// If socket is closing this message should be resent
if (webSocket.readyState === WebSocket.OPEN) {
webSocket.send(message, (err) => {
if (err) {
rejectSend(err);
rejectSend(new ConnectionError(
`CubeStore connection error: ${err.message}`,
err
));
} else {
resolveSend();
}
Expand All @@ -81,7 +97,7 @@ export class WebSocketConnection {
}

if (webSocket === this.webSocket) {
this.webSocket = undefined;
this.webSocket = null;
}
});
webSocket.on('pong', () => {
Expand Down Expand Up @@ -112,10 +128,10 @@ export class WebSocketConnection {
}

if (webSocket === this.webSocket) {
this.webSocket = undefined;
this.webSocket = null;
}
});
webSocket.on('message', async (msg) => {
webSocket.on('message', async (msg: Buffer) => {
const buf = new flatbuffers.ByteBuffer(msg);
const httpMessage = HttpMessage.getRootAsHttpMessage(buf);
const resolvers = webSocket.sentMessages[httpMessage.messageId()];
Expand Down Expand Up @@ -179,10 +195,12 @@ export class WebSocketConnection {
}
});
});

webSocket.sentMessages = {};
this.webSocket = webSocket;
}
return this.webSocket.readyPromise;

return this.webSocket!.readyPromise;
}

private retryWaitTime() {
Expand All @@ -196,10 +214,14 @@ export class WebSocketConnection {
socket.send(buffer, (err) => {
if (err) {
delete socket.sentMessages[messageId];
reject(err);
reject(new ConnectionError(
`CubeStore connection error: ${err.message}`,
err
));
}
});
}

socket.sentMessages[messageId] = {
resolve,
reject,
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-cubestore-driver/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export class ConnectionError extends CubeStoreError {

public constructor(message: string, cause?: Error) {
super(message);

this.name = 'ConnectionError';
this.cause = cause;
}
Expand All @@ -15,6 +16,7 @@ export class ConnectionError extends CubeStoreError {
export class QueryError extends CubeStoreError {
public constructor(message: string) {
super(message);

this.name = 'QueryError';
}
}
Loading