Skip to content

Commit 0fe90aa

Browse files
feat: implement JiT worker and RPC infrastructure
* feat: add worker process management for JiT compilation - Introduce base worker and JiT-specific child process logic - Implement RPC bridge for database access during JiT - Add VM scripts and loader for isolated execution - Add unit tests for the RPC mechanism
1 parent f6996e8 commit 0fe90aa

16 files changed

Lines changed: 1161 additions & 65 deletions

File tree

cli/api/BUILD

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ node_modules(
5858
ts_test_suite(
5959
name = "tests",
6060
srcs = [
61-
"utils_test.ts",
61+
"commands/jit/rpc_test.ts",
62+
"dbadapters/bigquery_test.ts",
6263
],
6364
data = [
6465
":node_modules",
@@ -68,10 +69,11 @@ ts_test_suite(
6869
"@nodejs//:npm",
6970
],
7071
deps = [
71-
"//cli/api",
72+
":api",
7273
"//core",
7374
"//protos:ts",
7475
"//testing",
76+
"@npm//@google-cloud/bigquery",
7577
"@npm//@types/chai",
7678
"@npm//@types/fs-extra",
7779
"@npm//@types/js-yaml",

cli/api/commands/base_worker.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { ChildProcess, fork } from "child_process";
2+
3+
export abstract class BaseWorker<TResponse, TMessage = any> {
4+
protected constructor(private readonly loaderPath: string) {}
5+
6+
protected async runWorker(
7+
timeoutMillis: number,
8+
onBoot: (child: ChildProcess) => void,
9+
onMessage: (message: TMessage, child: ChildProcess, resolve: (res: TResponse) => void, reject: (err: Error) => void) => void
10+
): Promise<TResponse> {
11+
const forkScript = this.resolveScript();
12+
const child = fork(forkScript, [], {
13+
stdio: [0, 1, 2, "ipc", "pipe"]
14+
});
15+
16+
return new Promise((resolve, reject) => {
17+
let completed = false;
18+
19+
const terminate = (fn: () => void) => {
20+
if (completed) {
21+
return;
22+
}
23+
completed = true;
24+
clearTimeout(timeout);
25+
child.kill();
26+
fn();
27+
};
28+
29+
const timeout = setTimeout(() => {
30+
terminate(() =>
31+
reject(new Error(`Worker timed out after ${timeoutMillis / 1000} seconds`))
32+
);
33+
}, timeoutMillis);
34+
35+
child.on("message", (message: any) => {
36+
if (message.type === "worker_booted") {
37+
onBoot(child);
38+
return;
39+
}
40+
onMessage(message, child, (res) => terminate(() => resolve(res)), (err) => terminate(() => reject(err)));
41+
});
42+
43+
child.on("error", err => {
44+
terminate(() => reject(err));
45+
});
46+
47+
child.on("exit", (code, signal) => {
48+
if (!completed) {
49+
const errorMsg =
50+
code !== 0 && code !== null
51+
? `Worker exited with code ${code} and signal ${signal}`
52+
: "Worker exited without sending a response message";
53+
terminate(() => reject(new Error(errorMsg)));
54+
}
55+
});
56+
});
57+
}
58+
59+
private resolveScript() {
60+
const pathsToTry = [this.loaderPath, "./worker_bundle.js"];
61+
for (const p of pathsToTry) {
62+
try {
63+
return require.resolve(p);
64+
} catch (e) {
65+
// Continue to next path.
66+
}
67+
}
68+
throw new Error(`Could not resolve worker script. Tried: ${pathsToTry.join(", ")}`);
69+
}
70+
}

cli/api/commands/jit/compiler.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { ChildProcess } from "child_process";
2+
3+
import { BaseWorker } from "df/cli/api/commands/base_worker";
4+
import { handleDbRequest } from "df/cli/api/commands/jit/rpc";
5+
import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
6+
import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
7+
import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants";
8+
import { dataform } from "df/protos/ts";
9+
10+
export interface IJitWorkerMessage {
11+
type: "rpc_request" | "jit_response" | "jit_error";
12+
method?: string;
13+
request?: string;
14+
correlationId?: string;
15+
response?: any;
16+
error?: string;
17+
}
18+
19+
export class JitCompileChildProcess extends BaseWorker<
20+
dataform.IJitCompilationResponse,
21+
IJitWorkerMessage
22+
> {
23+
public static async compile(
24+
request: dataform.IJitCompilationRequest,
25+
projectDir: string,
26+
dbadapter: IDbAdapter,
27+
dbclient: IDbClient,
28+
timeoutMillis: number = DEFAULT_COMPILATION_TIMEOUT_MILLIS,
29+
options?: IBigQueryExecutionOptions
30+
): Promise<dataform.IJitCompilationResponse> {
31+
return await new JitCompileChildProcess().run(
32+
request,
33+
projectDir,
34+
dbadapter,
35+
dbclient,
36+
timeoutMillis,
37+
options
38+
);
39+
}
40+
41+
constructor() {
42+
super("../../../vm/jit_loader");
43+
}
44+
45+
private async run(
46+
request: dataform.IJitCompilationRequest,
47+
projectDir: string,
48+
dbadapter: IDbAdapter,
49+
dbclient: IDbClient,
50+
timeoutMillis: number,
51+
options?: IBigQueryExecutionOptions
52+
): Promise<dataform.IJitCompilationResponse> {
53+
return await this.runWorker(
54+
timeoutMillis,
55+
child => {
56+
child.send({
57+
type: "jit_compile",
58+
request,
59+
projectDir
60+
});
61+
},
62+
async (message, child, resolve, reject) => {
63+
if (message.type === "rpc_request") {
64+
await this.handleRpcRequest(message, child, dbadapter, dbclient, options);
65+
} else if (message.type === "jit_response") {
66+
resolve(dataform.JitCompilationResponse.fromObject(message.response));
67+
} else if (message.type === "jit_error") {
68+
reject(new Error(message.error));
69+
}
70+
}
71+
);
72+
}
73+
74+
private async handleRpcRequest(
75+
message: IJitWorkerMessage,
76+
child: ChildProcess,
77+
dbadapter: IDbAdapter,
78+
dbclient: IDbClient,
79+
options?: IBigQueryExecutionOptions
80+
) {
81+
try {
82+
const response = await handleDbRequest(
83+
dbadapter,
84+
dbclient,
85+
message.method,
86+
Buffer.from(message.request, "base64"),
87+
options
88+
);
89+
child.send({
90+
type: "rpc_response",
91+
correlationId: message.correlationId,
92+
response: Buffer.from(response).toString("base64")
93+
});
94+
} catch (e) {
95+
child.send({
96+
type: "rpc_response",
97+
correlationId: message.correlationId,
98+
error: e.message
99+
});
100+
}
101+
}
102+
}

cli/api/commands/jit/rpc.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import Long from "long";
2+
3+
import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
4+
import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
5+
import { Structs } from "df/common/protos/structs";
6+
import { dataform, google } from "df/protos/ts";
7+
8+
export async function handleDbRequest(
9+
dbadapter: IDbAdapter,
10+
dbclient: IDbClient,
11+
method: string,
12+
request: Uint8Array,
13+
options?: IBigQueryExecutionOptions
14+
): Promise<Uint8Array> {
15+
switch (method) {
16+
case "Execute":
17+
return await handleExecute(dbclient, request, options);
18+
case "ListTables":
19+
return await handleListTables(dbadapter, request);
20+
case "GetTable":
21+
return await handleGetTable(dbadapter, request);
22+
case "DeleteTable":
23+
return await handleDeleteTable(dbadapter, request, options?.dryRun);
24+
default:
25+
throw new Error(`Unrecognized RPC method: ${method}`);
26+
}
27+
}
28+
29+
async function handleExecute(
30+
dbclient: IDbClient,
31+
request: Uint8Array,
32+
options?: IBigQueryExecutionOptions
33+
): Promise<Uint8Array> {
34+
const executeRequest = dataform.ExecuteRequest.decode(request);
35+
const executeRequestObj = dataform.ExecuteRequest.toObject(executeRequest, {
36+
defaults: false
37+
});
38+
const requestOptions = executeRequestObj.bigQueryOptions;
39+
40+
const results = await dbclient.executeRaw(executeRequest.statement, {
41+
rowLimit: executeRequest.rowLimit ? (executeRequest.rowLimit as Long).toNumber() : undefined,
42+
params: Structs.toObject(executeRequest.params),
43+
bigquery: {
44+
...options,
45+
...requestOptions,
46+
labels: {
47+
...options?.labels,
48+
...requestOptions?.labels
49+
},
50+
jobPrefix: [options?.jobPrefix, requestOptions?.jobPrefix].filter(Boolean).join("-") || undefined
51+
}
52+
});
53+
54+
return dataform.ExecuteResponse.encode({
55+
rows: (results.rows || []).map(row => Structs.fromObject(row)),
56+
schemaFields: results.schema || []
57+
} as any).finish();
58+
}
59+
60+
async function handleListTables(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> {
61+
const listTablesRequest = dataform.ListTablesRequest.decode(request);
62+
const targets = await dbadapter.tables(listTablesRequest.database, listTablesRequest.schema);
63+
const tablesMetadata = await Promise.all(targets.map(target => dbadapter.table(target)));
64+
const listTablesResponse = dataform.ListTablesResponse.create({
65+
tables: tablesMetadata
66+
});
67+
return dataform.ListTablesResponse.encode(listTablesResponse).finish();
68+
}
69+
70+
async function handleGetTable(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> {
71+
const getTableRequest = dataform.GetTableRequest.decode(request);
72+
const tableMetadata = await dbadapter.table(getTableRequest.target);
73+
if (!tableMetadata) {
74+
throw new Error(`Table not found: ${JSON.stringify(getTableRequest.target)}`);
75+
}
76+
return dataform.TableMetadata.encode(tableMetadata).finish();
77+
}
78+
79+
async function handleDeleteTable(
80+
dbadapter: IDbAdapter,
81+
request: Uint8Array,
82+
dryRun?: boolean
83+
): Promise<Uint8Array> {
84+
const deleteTableRequest = dataform.DeleteTableRequest.decode(request);
85+
if (dryRun) {
86+
return new Uint8Array();
87+
}
88+
await dbadapter.deleteTable(deleteTableRequest.target);
89+
return new Uint8Array();
90+
}

0 commit comments

Comments
 (0)