-
Notifications
You must be signed in to change notification settings - Fork 198
Integrate JiT compilation into CLI #2110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e687190
d2f544f
92736be
a684ab1
e51e27c
552097f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| import { ChildProcess, fork } from "child_process"; | ||
|
|
||
| export abstract class BaseWorker<TResponse, TMessage = any> { | ||
| protected constructor(private readonly loaderPath: string) {} | ||
|
|
||
| protected async runWorker( | ||
| timeoutMillis: number, | ||
| onBoot: (child: ChildProcess) => void, | ||
| onMessage: (message: TMessage, child: ChildProcess, resolve: (res: TResponse) => void, reject: (err: Error) => void) => void | ||
| ): Promise<TResponse> { | ||
| const forkScript = this.resolveScript(); | ||
| const child = fork(forkScript, [], { | ||
| stdio: [0, 1, 2, "ipc", "pipe"] | ||
| }); | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| let completed = false; | ||
| let booted = false; | ||
|
|
||
| const terminate = (fn: () => void) => { | ||
| if (completed) { | ||
| return; | ||
| } | ||
| completed = true; | ||
| clearTimeout(timeout); | ||
| child.kill(); | ||
| fn(); | ||
| }; | ||
|
|
||
| const timeout = setTimeout(() => { | ||
| terminate(() => | ||
| reject(new Error(`Worker timed out after ${timeoutMillis / 1000} seconds`)) | ||
| ); | ||
| }, timeoutMillis); | ||
|
|
||
| child.on("message", (message: any) => { | ||
| if (message.type === "worker_booted") { | ||
| if (!booted) { | ||
| booted = true; | ||
| onBoot(child); | ||
| } | ||
| return; | ||
| } | ||
| onMessage(message, child, (res) => terminate(() => resolve(res)), (err) => terminate(() => reject(err))); | ||
| }); | ||
|
|
||
| child.on("error", err => { | ||
| terminate(() => reject(err)); | ||
| }); | ||
|
|
||
| child.on("exit", (code, signal) => { | ||
| if (!completed) { | ||
| const errorMsg = | ||
| code !== 0 && code !== null | ||
| ? `Worker exited with code ${code} and signal ${signal}` | ||
| : "Worker exited without sending a response message"; | ||
| terminate(() => reject(new Error(errorMsg))); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| private resolveScript() { | ||
| const pathsToTry = [this.loaderPath, "./worker_bundle.js"]; | ||
| for (const p of pathsToTry) { | ||
| try { | ||
| return require.resolve(p); | ||
| } catch (e) { | ||
| // Continue to next path. | ||
| } | ||
| } | ||
| throw new Error(`Could not resolve worker script. Tried: ${pathsToTry.join(", ")}`); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,8 +4,10 @@ import * as path from "path"; | |
| import * as tmp from "tmp"; | ||
| import { promisify } from "util"; | ||
|
|
||
| import { BaseWorker } from "df/cli/api/commands/base_worker"; | ||
| import { MISSING_CORE_VERSION_ERROR } from "df/cli/api/commands/install"; | ||
| import { readConfigFromWorkflowSettings } from "df/cli/api/utils"; | ||
| import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants"; | ||
| import { coerceAsError } from "df/common/errors/errors"; | ||
| import { decode64 } from "df/common/protos"; | ||
| import { dataform } from "df/protos/ts"; | ||
|
|
@@ -86,7 +88,7 @@ export async function compile( | |
| compileConfig.projectDir = temporaryProjectPath; | ||
| } | ||
|
|
||
| const result = await CompileChildProcess.forkProcess().compile(compileConfig); | ||
| const result = await new CompileChildProcess().compile(compileConfig); | ||
|
|
||
| const decodedResult = decode64(dataform.CoreExecutionResponse, result); | ||
| compiledGraph = dataform.CompiledGraph.create(decodedResult.compile.compiledGraph); | ||
|
|
@@ -98,68 +100,24 @@ export async function compile( | |
| return compiledGraph; | ||
| } | ||
|
|
||
| export class CompileChildProcess { | ||
| public static forkProcess() { | ||
| // Runs the worker_bundle script we generate for the package (see packages/@dataform/cli/BUILD) | ||
| // if it exists, otherwise run the bazel compile loader target. | ||
| const findForkScript = () => { | ||
| try { | ||
| const workerBundlePath = require.resolve("./worker_bundle.js"); | ||
| return workerBundlePath; | ||
| } catch (e) { | ||
| return require.resolve("../../vm/compile_loader"); | ||
| } | ||
| }; | ||
| const forkScript = findForkScript(); | ||
| return new CompileChildProcess( | ||
| fork(require.resolve(forkScript), [], { stdio: [0, 1, 2, "ipc", "pipe"] }) | ||
| ); | ||
| } | ||
| private readonly childProcess: ChildProcess; | ||
|
|
||
| constructor(childProcess: ChildProcess) { | ||
| this.childProcess = childProcess; | ||
| export class CompileChildProcess extends BaseWorker<string, string | Error> { | ||
| constructor() { | ||
| super(path.resolve(__dirname, "../../vm/compile_loader")); | ||
| } | ||
|
|
||
| public async compile(compileConfig: dataform.ICompileConfig) { | ||
| const compileInChildProcess = new Promise<string>(async (resolve, reject) => { | ||
| this.childProcess.on("error", (e: Error) => reject(coerceAsError(e))); | ||
|
|
||
| this.childProcess.on("message", (messageOrError: string | Error) => { | ||
| if (typeof messageOrError === "string") { | ||
| resolve(messageOrError); | ||
| return; | ||
| } | ||
| reject(coerceAsError(messageOrError)); | ||
| }); | ||
|
|
||
| this.childProcess.on("close", exitCode => { | ||
| if (exitCode !== 0) { | ||
| reject(new Error(`Compilation child process exited with exit code ${exitCode}.`)); | ||
| const timeoutValue = compileConfig.timeoutMillis || DEFAULT_COMPILATION_TIMEOUT_MILLIS; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that the default timeout is smaller currently? |
||
|
|
||
| return await this.runWorker( | ||
| timeoutValue, | ||
| child => child.send(compileConfig), | ||
| (message, child, resolve, reject) => { | ||
| if (typeof message === "string") { | ||
| resolve(message); | ||
| } else { | ||
| reject(coerceAsError(message)); | ||
| } | ||
| }); | ||
|
|
||
| // Trigger the child process to start compiling. | ||
| this.childProcess.send(compileConfig); | ||
| }); | ||
| let timer; | ||
| const timeout = new Promise( | ||
| (resolve, reject) => | ||
| (timer = setTimeout( | ||
| () => reject(new CompilationTimeoutError("Compilation timed out")), | ||
| compileConfig.timeoutMillis || 5000 | ||
| )) | ||
| ); | ||
| try { | ||
| await Promise.race([timeout, compileInChildProcess]); | ||
| return await compileInChildProcess; | ||
| } finally { | ||
| if (!this.childProcess.killed) { | ||
| this.childProcess.kill("SIGKILL"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the base worker you don't send SIGKILL, I'd preserve this logic to avoid compiler ignoring SIGTERM for some reason |
||
| } | ||
| if (timer) { | ||
| clearTimeout(timer); | ||
| } | ||
| } | ||
| ); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| import { ChildProcess } from "child_process"; | ||
| import * as path from "path"; | ||
|
|
||
| import { BaseWorker } from "df/cli/api/commands/base_worker"; | ||
| import { handleDbRequest } from "df/cli/api/commands/jit/rpc"; | ||
| import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters"; | ||
| import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery"; | ||
| import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants"; | ||
| import { dataform } from "df/protos/ts"; | ||
|
|
||
| export interface IJitWorkerMessage { | ||
| type: "rpc_request" | "jit_response" | "jit_error"; | ||
| method?: string; | ||
| request?: Uint8Array; | ||
| correlationId?: string; | ||
| response?: Uint8Array; | ||
| error?: string; | ||
| } | ||
|
|
||
| export class JitCompileChildProcess extends BaseWorker< | ||
| dataform.IJitCompilationResponse, | ||
| IJitWorkerMessage | ||
| > { | ||
| public static async compile( | ||
| request: dataform.IJitCompilationRequest, | ||
| projectDir: string, | ||
| dbadapter: IDbAdapter, | ||
| dbclient: IDbClient, | ||
| timeoutMillis: number = DEFAULT_COMPILATION_TIMEOUT_MILLIS, | ||
| options?: IBigQueryExecutionOptions | ||
| ): Promise<dataform.IJitCompilationResponse> { | ||
| return await new JitCompileChildProcess().run( | ||
| request, | ||
| projectDir, | ||
| dbadapter, | ||
| dbclient, | ||
| timeoutMillis, | ||
| options | ||
| ); | ||
| } | ||
|
|
||
| constructor() { | ||
| super(path.resolve(__dirname, "../../../vm/jit_loader")); | ||
| } | ||
|
|
||
| private async run( | ||
| request: dataform.IJitCompilationRequest, | ||
| projectDir: string, | ||
| dbadapter: IDbAdapter, | ||
| dbclient: IDbClient, | ||
| timeoutMillis: number, | ||
| options?: IBigQueryExecutionOptions | ||
| ): Promise<dataform.IJitCompilationResponse> { | ||
| return await this.runWorker( | ||
| timeoutMillis, | ||
| child => { | ||
| child.send({ | ||
| type: "jit_compile", | ||
| request, | ||
| projectDir | ||
| }); | ||
| }, | ||
| async (message, child, resolve, reject) => { | ||
| if (message.type === "rpc_request") { | ||
| await this.handleRpcRequest(message, child, dbadapter, dbclient, options); | ||
| } else if (message.type === "jit_response") { | ||
| resolve(dataform.JitCompilationResponse.fromObject(message.response)); | ||
| } else if (message.type === "jit_error") { | ||
| reject(new Error(message.error)); | ||
| } | ||
| } | ||
| ); | ||
| } | ||
|
|
||
| private async handleRpcRequest( | ||
| message: IJitWorkerMessage, | ||
| child: ChildProcess, | ||
| dbadapter: IDbAdapter, | ||
| dbclient: IDbClient, | ||
| options?: IBigQueryExecutionOptions | ||
| ) { | ||
| try { | ||
| const response = await handleDbRequest( | ||
| dbadapter, | ||
| dbclient, | ||
| message.method, | ||
| message.request, | ||
| options | ||
| ); | ||
| child.send({ | ||
| type: "rpc_response", | ||
| correlationId: message.correlationId, | ||
| response | ||
| }); | ||
| } catch (e) { | ||
| child.send({ | ||
| type: "rpc_response", | ||
| correlationId: message.correlationId, | ||
| error: e.message | ||
| }); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify: in the base worker you have a hook for "exit", they'll be equivalent?