Skip to content

Commit b04cd41

Browse files
committed
operation: merge sea-abstraction dep
2 parents f05f8a9 + 0085928 commit b04cd41

13 files changed

Lines changed: 1056 additions & 841 deletions

lib/DBSQLClient.ts

Lines changed: 46 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import thrift from 'thrift';
2-
import Int64 from 'node-int64';
32

43
import { EventEmitter } from 'events';
54
import TCLIService from '../thrift/TCLIService';
6-
import { TProtocolVersion } from '../thrift/TCLIService_types';
75
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
86
import IDriver from './contracts/IDriver';
97
import IClientContext, { ClientConfig } from './contracts/IClientContext';
@@ -14,9 +12,11 @@ import IDBSQLSession from './contracts/IDBSQLSession';
1412
import IAuthentication from './connection/contracts/IAuthentication';
1513
import HttpConnection from './connection/connections/HttpConnection';
1614
import IConnectionOptions from './connection/contracts/IConnectionOptions';
17-
import Status from './dto/Status';
1815
import HiveDriverError from './errors/HiveDriverError';
19-
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
16+
import { buildUserAgentString } from './utils';
17+
import IBackend from './contracts/IBackend';
18+
import ThriftBackend from './thrift-backend/ThriftBackend';
19+
import SeaBackend from './sea/SeaBackend';
2020
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
2121
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
2222
import {
@@ -39,19 +39,6 @@ function prependSlash(str: string): string {
3939
return str;
4040
}
4141

42-
function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
43-
if (!catalogName && !schemaName) {
44-
return {};
45-
}
46-
47-
return {
48-
initialNamespace: {
49-
catalogName,
50-
schemaName,
51-
},
52-
};
53-
}
54-
5542
export type ThriftLibrary = Pick<typeof thrift, 'createClient'>;
5643

5744
export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
@@ -75,6 +62,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
7562

7663
private readonly sessions = new CloseableCollection<DBSQLSession>();
7764

65+
private backend?: IBackend;
66+
7867
private static getDefaultLogger(): IDBSQLLogger {
7968
if (!this.defaultLogger) {
8069
this.defaultLogger = new DBSQLLogger();
@@ -248,38 +237,45 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
248237

249238
this.connectionProvider = this.createConnectionProvider(options);
250239

251-
const thriftConnection = await this.connectionProvider.getThriftConnection();
252-
253-
thriftConnection.on('error', (error: Error) => {
254-
// Error.stack already contains error type and message, so log stack if available,
255-
// otherwise fall back to just error type + message
256-
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
257-
try {
258-
this.emit('error', error);
259-
} catch (e) {
260-
// EventEmitter will throw unhandled error when emitting 'error' event.
261-
// Since we already logged it few lines above, just suppress this behaviour
262-
}
263-
});
264-
265-
thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
266-
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
267-
this.emit('reconnecting', params);
268-
});
269-
270-
thriftConnection.on('close', () => {
271-
this.logger.log(LogLevel.debug, 'Closing connection.');
272-
this.emit('close');
273-
});
240+
this.backend = options.useSEA
241+
? new SeaBackend()
242+
: new ThriftBackend({
243+
context: this,
244+
onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload),
245+
});
274246

275-
thriftConnection.on('timeout', () => {
276-
this.logger.log(LogLevel.debug, 'Connection timed out.');
277-
this.emit('timeout');
278-
});
247+
await this.backend.connect(options);
279248

280249
return this;
281250
}
282251

252+
private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void {
253+
switch (event) {
254+
case 'error': {
255+
const error = payload as Error;
256+
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
257+
try {
258+
this.emit('error', error);
259+
} catch (e) {
260+
// EventEmitter throws when 'error' has no listeners; we've already logged it.
261+
}
262+
return;
263+
}
264+
case 'reconnecting':
265+
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`);
266+
this.emit('reconnecting', payload);
267+
return;
268+
case 'close':
269+
this.logger.log(LogLevel.debug, 'Closing connection.');
270+
this.emit('close');
271+
return;
272+
case 'timeout':
273+
this.logger.log(LogLevel.debug, 'Connection timed out.');
274+
this.emit('timeout');
275+
// no default
276+
}
277+
}
278+
283279
/**
284280
* Starts new session
285281
* @public
@@ -290,44 +286,20 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
290286
* const session = await client.openSession();
291287
*/
292288
public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> {
293-
// Prepare session configuration
294-
const configuration = request.configuration ? { ...request.configuration } : {};
295-
296-
// Add metric view metadata config if enabled
297-
if (this.config.enableMetricViewMetadata) {
298-
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
299-
}
300-
301-
// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
302-
if (request.queryTags !== undefined) {
303-
const serialized = serializeQueryTags(request.queryTags);
304-
if (serialized) {
305-
configuration.QUERY_TAGS = serialized;
306-
} else {
307-
delete configuration.QUERY_TAGS;
308-
}
289+
if (!this.backend) {
290+
throw new HiveDriverError('DBSQLClient: not connected');
309291
}
310-
311-
const response = await this.driver.openSession({
312-
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
313-
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
314-
configuration,
315-
canUseMultipleCatalogs: true,
316-
});
317-
318-
Status.assert(response.status);
319-
const session = new DBSQLSession({
320-
handle: definedOrError(response.sessionHandle),
321-
context: this,
322-
serverProtocolVersion: response.serverProtocolVersion,
323-
});
292+
const sessionBackend = await this.backend.openSession(request);
293+
const session = new DBSQLSession({ backend: sessionBackend, context: this });
324294
this.sessions.add(session);
325295
return session;
326296
}
327297

328298
public async close(): Promise<void> {
329299
await this.sessions.closeAll();
300+
await this.backend?.close();
330301

302+
this.backend = undefined;
331303
this.client = undefined;
332304
this.connectionProvider = undefined;
333305
this.authProvider = undefined;

0 commit comments

Comments
 (0)