-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[wip] js2py to quickjs #38473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[wip] js2py to quickjs #38473
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| { | ||
| "comment": "Modify this file in a trivial way to cause this test suite to run", | ||
| "revision": 2 | ||
| "revision": 3 | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,7 +17,9 @@ | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| """This module defines the basic MapToFields operation.""" | ||||||||||||||||||||||||||
| import itertools | ||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||
| import re | ||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||
| from collections import abc | ||||||||||||||||||||||||||
| from collections.abc import Callable | ||||||||||||||||||||||||||
| from collections.abc import Collection | ||||||||||||||||||||||||||
|
|
@@ -53,13 +55,11 @@ | |||||||||||||||||||||||||
| from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn | ||||||||||||||||||||||||||
| from apache_beam.yaml.yaml_provider import dicts_to_rows | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Import js2py package if it exists | ||||||||||||||||||||||||||
| # Import quickjs package if it exists | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| import js2py | ||||||||||||||||||||||||||
| from js2py.base import JsObjectWrapper | ||||||||||||||||||||||||||
| import quickjs | ||||||||||||||||||||||||||
| except ImportError: | ||||||||||||||||||||||||||
| js2py = None | ||||||||||||||||||||||||||
| JsObjectWrapper = object | ||||||||||||||||||||||||||
| quickjs = None | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| _str_expression_fields = { | ||||||||||||||||||||||||||
| 'AssignTimestamps': 'timestamp', | ||||||||||||||||||||||||||
|
|
@@ -178,18 +178,40 @@ def _check_mapping_arguments( | |||||||||||||||||||||||||
| raise ValueError(f'{transform_name} cannot specify "name" without "path"') | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # js2py's JsObjectWrapper object has a self-referencing __dict__ property | ||||||||||||||||||||||||||
| # that cannot be pickled without implementing the __getstate__ and | ||||||||||||||||||||||||||
| # __setstate__ methods. | ||||||||||||||||||||||||||
| class _CustomJsObjectWrapper(JsObjectWrapper): | ||||||||||||||||||||||||||
| def __init__(self, js_obj): | ||||||||||||||||||||||||||
| super().__init__(js_obj.__dict__['_obj']) | ||||||||||||||||||||||||||
| class _JsWrapper: | ||||||||||||||||||||||||||
| def __init__(self, source_code, entrypoint_name): | ||||||||||||||||||||||||||
| self.source_code = source_code | ||||||||||||||||||||||||||
| self.entrypoint_name = entrypoint_name | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def __getstate__(self): | ||||||||||||||||||||||||||
| return self.__dict__.copy() | ||||||||||||||||||||||||||
| def _get_wrapper_fn(self): | ||||||||||||||||||||||||||
| global _THREAD_LOCAL_JS_CACHE | ||||||||||||||||||||||||||
| if '_THREAD_LOCAL_JS_CACHE' not in globals(): | ||||||||||||||||||||||||||
| globals()['_THREAD_LOCAL_JS_CACHE'] = threading.local() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def __setstate__(self, state): | ||||||||||||||||||||||||||
| self.__dict__.update(state) | ||||||||||||||||||||||||||
| cache = globals()['_THREAD_LOCAL_JS_CACHE'] | ||||||||||||||||||||||||||
| if not hasattr(cache, 'functions'): | ||||||||||||||||||||||||||
| cache.functions = {} | ||||||||||||||||||||||||||
|
Comment on lines
+187
to
+193
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simplify the cache access by using the module-level variable defined above. This avoids the use of
Suggested change
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| cache_key = (self.source_code, self.entrypoint_name) | ||||||||||||||||||||||||||
| if cache_key not in cache.functions: | ||||||||||||||||||||||||||
| import quickjs | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||
| ctx = quickjs.Context() | ||||||||||||||||||||||||||
| ctx.eval(self.source_code) | ||||||||||||||||||||||||||
| cache.functions[cache_key] = ctx.get(self.entrypoint_name) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return cache.functions[cache_key] | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def __call__(self, row): | ||||||||||||||||||||||||||
| wrapper_fn = self._get_wrapper_fn() | ||||||||||||||||||||||||||
| row_as_dict = py_value_to_js_dict(row) | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| js_result = wrapper_fn(json.dumps(row_as_dict)) | ||||||||||||||||||||||||||
| except Exception as exn: | ||||||||||||||||||||||||||
| raise RuntimeError( | ||||||||||||||||||||||||||
| f"Error evaluating javascript expression: {exn}") from exn | ||||||||||||||||||||||||||
| if hasattr(js_result, 'json'): | ||||||||||||||||||||||||||
| js_result = json.loads(js_result.json()) | ||||||||||||||||||||||||||
| return dicts_to_rows(js_result) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # TODO(yaml) Improve type inferencing for JS UDF's | ||||||||||||||||||||||||||
|
|
@@ -210,80 +232,54 @@ def py_value_to_js_dict(py_value): | |||||||||||||||||||||||||
| def _expand_javascript_mapping_func( | ||||||||||||||||||||||||||
| original_fields, expression=None, callable=None, path=None, name=None): | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Check for installed js2py package | ||||||||||||||||||||||||||
| if js2py is None: | ||||||||||||||||||||||||||
| if quickjs is None: | ||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||
| "Javascript mapping functions are not supported on" | ||||||||||||||||||||||||||
| " Python 3.12 or later.") | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # import remaining js2py objects | ||||||||||||||||||||||||||
| from js2py import base | ||||||||||||||||||||||||||
| from js2py.constructors import jsdate | ||||||||||||||||||||||||||
| from js2py.internals import simplex | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| js_array_type = ( | ||||||||||||||||||||||||||
| base.PyJsArray, | ||||||||||||||||||||||||||
| base.PyJsArrayBuffer, | ||||||||||||||||||||||||||
| base.PyJsInt8Array, | ||||||||||||||||||||||||||
| base.PyJsUint8Array, | ||||||||||||||||||||||||||
| base.PyJsUint8ClampedArray, | ||||||||||||||||||||||||||
| base.PyJsInt16Array, | ||||||||||||||||||||||||||
| base.PyJsUint16Array, | ||||||||||||||||||||||||||
| base.PyJsInt32Array, | ||||||||||||||||||||||||||
| base.PyJsUint32Array, | ||||||||||||||||||||||||||
| base.PyJsFloat32Array, | ||||||||||||||||||||||||||
| base.PyJsFloat64Array) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def _js_object_to_py_object(obj): | ||||||||||||||||||||||||||
| if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): | ||||||||||||||||||||||||||
| return base.to_python(obj) | ||||||||||||||||||||||||||
| elif isinstance(obj, js_array_type): | ||||||||||||||||||||||||||
| return [_js_object_to_py_object(value) for value in obj.to_list()] | ||||||||||||||||||||||||||
| elif isinstance(obj, jsdate.PyJsDate): | ||||||||||||||||||||||||||
| return obj.to_utc_dt() | ||||||||||||||||||||||||||
| elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): | ||||||||||||||||||||||||||
| return None | ||||||||||||||||||||||||||
| elif isinstance(obj, base.PyJsError): | ||||||||||||||||||||||||||
| raise RuntimeError(obj['message']) | ||||||||||||||||||||||||||
| elif isinstance(obj, base.PyJsObject): | ||||||||||||||||||||||||||
| return { | ||||||||||||||||||||||||||
| key: _js_object_to_py_object(value['value']) | ||||||||||||||||||||||||||
| for (key, value) in obj.own.items() | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| elif isinstance(obj, base.JsObjectWrapper): | ||||||||||||||||||||||||||
| return _js_object_to_py_object(obj._obj) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return obj | ||||||||||||||||||||||||||
| "Javascript mapping functions are not supported because the " | ||||||||||||||||||||||||||
| "quickjs-ng library is not installed.") | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if expression: | ||||||||||||||||||||||||||
| source = '\n'.join(['function(__row__) {'] + [ | ||||||||||||||||||||||||||
| f' {name} = __row__.{name}' | ||||||||||||||||||||||||||
| for name in original_fields if name in expression | ||||||||||||||||||||||||||
| ] + [' return (' + expression + ')'] + ['}']) | ||||||||||||||||||||||||||
| js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) | ||||||||||||||||||||||||||
| unpacking_code = '\n'.join([ | ||||||||||||||||||||||||||
| f' var {name} = __row__.{name};' for name in original_fields | ||||||||||||||||||||||||||
| if name in expression | ||||||||||||||||||||||||||
| ]) | ||||||||||||||||||||||||||
| source_code = f""" | ||||||||||||||||||||||||||
| const udf = function(__row__) {{ | ||||||||||||||||||||||||||
| {unpacking_code} | ||||||||||||||||||||||||||
| return ({expression}); | ||||||||||||||||||||||||||
| }}; | ||||||||||||||||||||||||||
| function wrapper(json_str) {{ | ||||||||||||||||||||||||||
| return udf(JSON.parse(json_str)); | ||||||||||||||||||||||||||
| }} | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| entrypoint = 'wrapper' | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| elif callable: | ||||||||||||||||||||||||||
| js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) | ||||||||||||||||||||||||||
| match = re.search(r'function\s+([a-zA-Z0-9_]+)\s*\(', callable) | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current regex for finding the function name is quite restrictive. It won't match
Suggested change
|
||||||||||||||||||||||||||
| if not match: | ||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||
| f"Could not find function declaration in callable: {callable}") | ||||||||||||||||||||||||||
| udf_name = match.group(1) | ||||||||||||||||||||||||||
| source_code = f""" | ||||||||||||||||||||||||||
| {callable} | ||||||||||||||||||||||||||
| function wrapper(json_str) {{ | ||||||||||||||||||||||||||
| return {udf_name}(JSON.parse(json_str)); | ||||||||||||||||||||||||||
| }} | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| entrypoint = 'wrapper' | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||
| if not path.endswith('.js'): | ||||||||||||||||||||||||||
| raise ValueError(f'File "{path}" is not a valid .js file.') | ||||||||||||||||||||||||||
| udf_code = FileSystems.open(path).read().decode() | ||||||||||||||||||||||||||
| js = js2py.EvalJs() | ||||||||||||||||||||||||||
| js.eval(udf_code) | ||||||||||||||||||||||||||
| js_func = _CustomJsObjectWrapper(getattr(js, name)) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def js_wrapper(row): | ||||||||||||||||||||||||||
| row_as_dict = py_value_to_js_dict(row) | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| js_result = js_func(row_as_dict) | ||||||||||||||||||||||||||
| except simplex.JsException as exn: | ||||||||||||||||||||||||||
| raise RuntimeError( | ||||||||||||||||||||||||||
| f"Error evaluating javascript expression: " | ||||||||||||||||||||||||||
| f"{exn.mes['message']}") from exn | ||||||||||||||||||||||||||
| return dicts_to_rows(_js_object_to_py_object(js_result)) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return js_wrapper | ||||||||||||||||||||||||||
| source_code = f""" | ||||||||||||||||||||||||||
| {udf_code} | ||||||||||||||||||||||||||
| function wrapper(json_str) {{ | ||||||||||||||||||||||||||
| return {name}(JSON.parse(json_str)); | ||||||||||||||||||||||||||
| }} | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| entrypoint = 'wrapper' | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return _JsWrapper(source_code, entrypoint) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def _expand_python_mapping_func( | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define the thread-local cache at the module level to avoid redundant global lookups and non-idiomatic lazy initialization within the class method.