Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cli/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ ts_test_suite(
"index_project_test.ts",
"index_compile_test.ts",
"index_run_e2e_test.ts",
"util_test.ts"
"index_jit_main_test.ts",
"index_jit_advanced_test.ts",
"index_jit_dependency_test.ts",
"index_jit_runtime_test.ts",
"util_test.ts",
"jit_build_test.ts",
"jit_run_test.ts",
"bigquery_test.ts"
],
data = [
":node_modules",
Expand Down
6 changes: 4 additions & 2 deletions cli/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ node_modules(
ts_test_suite(
name = "tests",
srcs = [
"utils_test.ts",
"commands/jit/rpc_test.ts",
"dbadapters/bigquery_test.ts",
],
data = [
":node_modules",
Expand All @@ -68,10 +69,11 @@ ts_test_suite(
"@nodejs//:npm",
],
deps = [
"//cli/api",
":api",
"//core",
"//protos:ts",
"//testing",
"@npm//@google-cloud/bigquery",
"@npm//@types/chai",
"@npm//@types/fs-extra",
"@npm//@types/js-yaml",
Expand Down
74 changes: 74 additions & 0 deletions cli/api/commands/base_worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { ChildProcess, fork } from "child_process";

export abstract class BaseWorker<TResponse, TMessage = any> {
protected constructor(private readonly loaderPath: string) {}

protected async runWorker(
timeoutMillis: number,
onBoot: (child: ChildProcess) => void,
onMessage: (message: TMessage, child: ChildProcess, resolve: (res: TResponse) => void, reject: (err: Error) => void) => void
): Promise<TResponse> {
const forkScript = this.resolveScript();
const child = fork(forkScript, [], {
stdio: [0, 1, 2, "ipc", "pipe"]
});

return new Promise((resolve, reject) => {
let completed = false;
let booted = false;

const terminate = (fn: () => void) => {
if (completed) {
return;
}
completed = true;
clearTimeout(timeout);
child.kill();
fn();
};

const timeout = setTimeout(() => {
terminate(() =>
reject(new Error(`Worker timed out after ${timeoutMillis / 1000} seconds`))
);
}, timeoutMillis);

child.on("message", (message: any) => {
if (message.type === "worker_booted") {
if (!booted) {
booted = true;
onBoot(child);
}
return;
}
onMessage(message, child, (res) => terminate(() => resolve(res)), (err) => terminate(() => reject(err)));
});

child.on("error", err => {
terminate(() => reject(err));
});

child.on("exit", (code, signal) => {
if (!completed) {
const errorMsg =
code !== 0 && code !== null
? `Worker exited with code ${code} and signal ${signal}`
: "Worker exited without sending a response message";
terminate(() => reject(new Error(errorMsg)));
}
});
});
}

private resolveScript() {
const pathsToTry = [this.loaderPath, "./worker_bundle.js"];
for (const p of pathsToTry) {
try {
return require.resolve(p);
} catch (e) {
// Continue to next path.
}
}
throw new Error(`Could not resolve worker script. Tried: ${pathsToTry.join(", ")}`);
}
}
13 changes: 10 additions & 3 deletions cli/api/commands/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export class Builder {
runConfig: this.runConfig,
warehouseState: this.warehouseState,
declarationTargets: this.prunedGraph.declarations.map(declaration => declaration.target),
actions
actions,
jitData: this.prunedGraph.jitData
});
}

Expand Down Expand Up @@ -114,11 +115,17 @@ export class Builder {
private toPartialExecutionAction(
action: dataform.ITable | dataform.IOperation | dataform.IAssertion
) {
return dataform.ExecutionAction.create({
const jitCode = (action as any).jitCode;
const executionAction = dataform.ExecutionAction.create({
target: action.target,
fileName: action.fileName,
dependencyTargets: action.dependencyTargets,
actionDescriptor: action.actionDescriptor
actionDescriptor: action.actionDescriptor,
disabled: action.disabled
});
if (jitCode) {
executionAction.jitCode = jitCode;
}
return executionAction;
}
}
76 changes: 17 additions & 59 deletions cli/api/commands/compile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import * as path from "path";
import * as tmp from "tmp";
import { promisify } from "util";

import { BaseWorker } from "df/cli/api/commands/base_worker";
import { MISSING_CORE_VERSION_ERROR } from "df/cli/api/commands/install";
import { readConfigFromWorkflowSettings } from "df/cli/api/utils";
import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants";
import { coerceAsError } from "df/common/errors/errors";
import { decode64 } from "df/common/protos";
import { dataform } from "df/protos/ts";
Expand Down Expand Up @@ -86,7 +88,7 @@ export async function compile(
compileConfig.projectDir = temporaryProjectPath;
}

const result = await CompileChildProcess.forkProcess().compile(compileConfig);
const result = await new CompileChildProcess().compile(compileConfig);

const decodedResult = decode64(dataform.CoreExecutionResponse, result);
compiledGraph = dataform.CompiledGraph.create(decodedResult.compile.compiledGraph);
Expand All @@ -98,68 +100,24 @@ export async function compile(
return compiledGraph;
}

export class CompileChildProcess {
public static forkProcess() {
// Runs the worker_bundle script we generate for the package (see packages/@dataform/cli/BUILD)
// if it exists, otherwise run the bazel compile loader target.
const findForkScript = () => {
try {
const workerBundlePath = require.resolve("./worker_bundle.js");
return workerBundlePath;
} catch (e) {
return require.resolve("../../vm/compile_loader");
}
};
const forkScript = findForkScript();
return new CompileChildProcess(
fork(require.resolve(forkScript), [], { stdio: [0, 1, 2, "ipc", "pipe"] })
);
}
private readonly childProcess: ChildProcess;

constructor(childProcess: ChildProcess) {
this.childProcess = childProcess;
export class CompileChildProcess extends BaseWorker<string, string | Error> {
constructor() {
super(path.resolve(__dirname, "../../vm/compile_loader"));
}

public async compile(compileConfig: dataform.ICompileConfig) {
const compileInChildProcess = new Promise<string>(async (resolve, reject) => {
this.childProcess.on("error", (e: Error) => reject(coerceAsError(e)));

this.childProcess.on("message", (messageOrError: string | Error) => {
if (typeof messageOrError === "string") {
resolve(messageOrError);
return;
}
reject(coerceAsError(messageOrError));
});

this.childProcess.on("close", exitCode => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: in the base worker you have a hook for "exit", they'll be equivalent?

if (exitCode !== 0) {
reject(new Error(`Compilation child process exited with exit code ${exitCode}.`));
const timeoutValue = compileConfig.timeoutMillis || DEFAULT_COMPILATION_TIMEOUT_MILLIS;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the default timeout is smaller currently?


return await this.runWorker(
timeoutValue,
child => child.send(compileConfig),
(message, child, resolve, reject) => {
if (typeof message === "string") {
resolve(message);
} else {
reject(coerceAsError(message));
}
});

// Trigger the child process to start compiling.
this.childProcess.send(compileConfig);
});
let timer;
const timeout = new Promise(
(resolve, reject) =>
(timer = setTimeout(
() => reject(new CompilationTimeoutError("Compilation timed out")),
compileConfig.timeoutMillis || 5000
))
);
try {
await Promise.race([timeout, compileInChildProcess]);
return await compileInChildProcess;
} finally {
if (!this.childProcess.killed) {
this.childProcess.kill("SIGKILL");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the base worker you don't send SIGKILL, I'd preserve this logic to avoid compiler ignoring SIGTERM for some reason

}
if (timer) {
clearTimeout(timer);
}
}
);
}
}
103 changes: 103 additions & 0 deletions cli/api/commands/jit/compiler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { ChildProcess } from "child_process";
import * as path from "path";

import { BaseWorker } from "df/cli/api/commands/base_worker";
import { handleDbRequest } from "df/cli/api/commands/jit/rpc";
import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants";
import { dataform } from "df/protos/ts";

export interface IJitWorkerMessage {
type: "rpc_request" | "jit_response" | "jit_error";
method?: string;
request?: Uint8Array;
correlationId?: string;
response?: Uint8Array;
error?: string;
}

export class JitCompileChildProcess extends BaseWorker<
dataform.IJitCompilationResponse,
IJitWorkerMessage
> {
public static async compile(
request: dataform.IJitCompilationRequest,
projectDir: string,
dbadapter: IDbAdapter,
dbclient: IDbClient,
timeoutMillis: number = DEFAULT_COMPILATION_TIMEOUT_MILLIS,
options?: IBigQueryExecutionOptions
): Promise<dataform.IJitCompilationResponse> {
return await new JitCompileChildProcess().run(
request,
projectDir,
dbadapter,
dbclient,
timeoutMillis,
options
);
}

constructor() {
super(path.resolve(__dirname, "../../../vm/jit_loader"));
}

private async run(
request: dataform.IJitCompilationRequest,
projectDir: string,
dbadapter: IDbAdapter,
dbclient: IDbClient,
timeoutMillis: number,
options?: IBigQueryExecutionOptions
): Promise<dataform.IJitCompilationResponse> {
return await this.runWorker(
timeoutMillis,
child => {
child.send({
type: "jit_compile",
request,
projectDir
});
},
async (message, child, resolve, reject) => {
if (message.type === "rpc_request") {
await this.handleRpcRequest(message, child, dbadapter, dbclient, options);
} else if (message.type === "jit_response") {
resolve(dataform.JitCompilationResponse.fromObject(message.response));
} else if (message.type === "jit_error") {
reject(new Error(message.error));
}
}
);
}

private async handleRpcRequest(
message: IJitWorkerMessage,
child: ChildProcess,
dbadapter: IDbAdapter,
dbclient: IDbClient,
options?: IBigQueryExecutionOptions
) {
try {
const response = await handleDbRequest(
dbadapter,
dbclient,
message.method,
message.request,
options
);
child.send({
type: "rpc_response",
correlationId: message.correlationId,
response
});
} catch (e) {
child.send({
type: "rpc_response",
correlationId: message.correlationId,
error: e.message
});
}
}
}
Loading
Loading