Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
16 changes: 9 additions & 7 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,17 @@ def expand(self, pcoll):
lambda _: 1, sum, 'count')


class _Fakes:
fn = str
class SomeTransform(beam.PTransform):
def __init__(self, *args, **kwargs):
super().__init__()

class SomeTransform(beam.PTransform):
def __init__(*args, **kwargs):
pass
def expand(self, pcoll):
return pcoll

def expand(self, pcoll):
return pcoll

class _Fakes:
fn = str
SomeTransform = SomeTransform


RENDER_DIR = None
Expand Down
154 changes: 75 additions & 79 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

"""This module defines the basic MapToFields operation."""
import itertools
import json
import re
import threading
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Define the thread-local cache at the module level to avoid redundant global lookups and non-idiomatic lazy initialization within the class method.

Suggested change
import threading
import threading
_THREAD_LOCAL_JS_CACHE = threading.local()

from collections import abc
from collections.abc import Callable
from collections.abc import Collection
Expand Down Expand Up @@ -53,13 +55,11 @@
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn
from apache_beam.yaml.yaml_provider import dicts_to_rows

# Import js2py package if it exists
# Import quickjs package if it exists
try:
import js2py
from js2py.base import JsObjectWrapper
import quickjs
except ImportError:
js2py = None
JsObjectWrapper = object
quickjs = None

_str_expression_fields = {
'AssignTimestamps': 'timestamp',
Expand Down Expand Up @@ -178,18 +178,40 @@ def _check_mapping_arguments(
raise ValueError(f'{transform_name} cannot specify "name" without "path"')


# js2py's JsObjectWrapper object has a self-referencing __dict__ property
# that cannot be pickled without implementing the __getstate__ and
# __setstate__ methods.
class _CustomJsObjectWrapper(JsObjectWrapper):
def __init__(self, js_obj):
super().__init__(js_obj.__dict__['_obj'])
class _JsWrapper:
def __init__(self, source_code, entrypoint_name):
self.source_code = source_code
self.entrypoint_name = entrypoint_name

def __getstate__(self):
return self.__dict__.copy()
def _get_wrapper_fn(self):
global _THREAD_LOCAL_JS_CACHE
if '_THREAD_LOCAL_JS_CACHE' not in globals():
globals()['_THREAD_LOCAL_JS_CACHE'] = threading.local()

def __setstate__(self, state):
self.__dict__.update(state)
cache = globals()['_THREAD_LOCAL_JS_CACHE']
if not hasattr(cache, 'functions'):
cache.functions = {}
Comment on lines +187 to +193
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Simplify the cache access by using the module-level variable defined above. This avoids the use of globals() and the global keyword, which is cleaner and more idiomatic.

Suggested change
global _THREAD_LOCAL_JS_CACHE
if '_THREAD_LOCAL_JS_CACHE' not in globals():
globals()['_THREAD_LOCAL_JS_CACHE'] = threading.local()
def __setstate__(self, state):
self.__dict__.update(state)
cache = globals()['_THREAD_LOCAL_JS_CACHE']
if not hasattr(cache, 'functions'):
cache.functions = {}
if not hasattr(_THREAD_LOCAL_JS_CACHE, 'functions'):
_THREAD_LOCAL_JS_CACHE.functions = {}
cache = _THREAD_LOCAL_JS_CACHE


cache_key = (self.source_code, self.entrypoint_name)
if cache_key not in cache.functions:
import quickjs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The quickjs module is already imported at the top of the file (line 60). This local import is redundant.

ctx = quickjs.Context()
ctx.eval(self.source_code)
cache.functions[cache_key] = ctx.get(self.entrypoint_name)

return cache.functions[cache_key]

def __call__(self, row):
wrapper_fn = self._get_wrapper_fn()
row_as_dict = py_value_to_js_dict(row)
try:
js_result = wrapper_fn(json.dumps(row_as_dict))
except Exception as exn:
raise RuntimeError(
f"Error evaluating javascript expression: {exn}") from exn
if hasattr(js_result, 'json'):
js_result = json.loads(js_result.json())
return dicts_to_rows(js_result)


# TODO(yaml) Improve type inferencing for JS UDF's
Expand All @@ -210,80 +232,54 @@ def py_value_to_js_dict(py_value):
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):

# Check for installed js2py package
if js2py is None:
if quickjs is None:
raise ValueError(
"Javascript mapping functions are not supported on"
" Python 3.12 or later.")

# import remaining js2py objects
from js2py import base
from js2py.constructors import jsdate
from js2py.internals import simplex

js_array_type = (
base.PyJsArray,
base.PyJsArrayBuffer,
base.PyJsInt8Array,
base.PyJsUint8Array,
base.PyJsUint8ClampedArray,
base.PyJsInt16Array,
base.PyJsUint16Array,
base.PyJsInt32Array,
base.PyJsUint32Array,
base.PyJsFloat32Array,
base.PyJsFloat64Array)

def _js_object_to_py_object(obj):
if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
return base.to_python(obj)
elif isinstance(obj, js_array_type):
return [_js_object_to_py_object(value) for value in obj.to_list()]
elif isinstance(obj, jsdate.PyJsDate):
return obj.to_utc_dt()
elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
return None
elif isinstance(obj, base.PyJsError):
raise RuntimeError(obj['message'])
elif isinstance(obj, base.PyJsObject):
return {
key: _js_object_to_py_object(value['value'])
for (key, value) in obj.own.items()
}
elif isinstance(obj, base.JsObjectWrapper):
return _js_object_to_py_object(obj._obj)

return obj
"Javascript mapping functions are not supported because the "
"quickjs-ng library is not installed.")

if expression:
source = '\n'.join(['function(__row__) {'] + [
f' {name} = __row__.{name}'
for name in original_fields if name in expression
] + [' return (' + expression + ')'] + ['}'])
js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
unpacking_code = '\n'.join([
f' var {name} = __row__.{name};' for name in original_fields
if name in expression
])
source_code = f"""
const udf = function(__row__) {{
{unpacking_code}
return ({expression});
}};
function wrapper(json_str) {{
return udf(JSON.parse(json_str));
}}
"""
entrypoint = 'wrapper'

elif callable:
js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
match = re.search(r'function\s+([a-zA-Z0-9_]+)\s*\(', callable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current regex for finding the function name is quite restrictive. It won't match async functions or functions with different spacing. A more robust regex would improve compatibility with various JavaScript function declaration styles.

Suggested change
match = re.search(r'function\s+([a-zA-Z0-9_]+)\s*\(', callable)
match = re.search(r'(?:async\s+)?function\s+([a-zA-Z0-9_]+)', callable)

if not match:
raise ValueError(
f"Could not find function declaration in callable: {callable}")
udf_name = match.group(1)
source_code = f"""
{callable}
function wrapper(json_str) {{
return {udf_name}(JSON.parse(json_str));
}}
"""
entrypoint = 'wrapper'

else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
js_func = _CustomJsObjectWrapper(getattr(js, name))

def js_wrapper(row):
row_as_dict = py_value_to_js_dict(row)
try:
js_result = js_func(row_as_dict)
except simplex.JsException as exn:
raise RuntimeError(
f"Error evaluating javascript expression: "
f"{exn.mes['message']}") from exn
return dicts_to_rows(_js_object_to_py_object(js_result))

return js_wrapper
source_code = f"""
{udf_code}
function wrapper(json_str) {{
return {name}(JSON.parse(json_str));
}}
"""
entrypoint = 'wrapper'

return _JsWrapper(source_code, entrypoint)


def _expand_python_mapping_func(
Expand Down
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/yaml/yaml_udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
from apache_beam.yaml.yaml_transform import YamlTransform

try:
import js2py
import quickjs
except ImportError:
js2py = None
logging.warning('js2py is not installed; some tests will be skipped.')
quickjs = None
logging.warning('quickjs-ng is not installed; some tests will be skipped.')


def as_rows():
Expand Down Expand Up @@ -63,7 +63,7 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.tmpdir)

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_map_to_fields_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -197,7 +197,7 @@ def test_map_to_fields_sql_reserved_keyword_append():
beam.Row(label='389a', timestamp=2, label_copy="389a"),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -252,7 +252,7 @@ def test_filter_inline_py(self):
row=beam.Row(rank=2, values=[7, 8, 9])),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_filter_expression_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -296,7 +296,7 @@ def test_filter_expression_py(self):
row=beam.Row(rank=0, values=[1, 2, 3])),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_filter_inline_js_file(self):
data = '''
function f(x) {
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,7 @@ def get_portability_package_data():
'docstring-parser>=0.15,<1.0',
'jinja2>=3.0,<3.2',
'virtualenv-clone>=0.5,<1.0',
# https://github.com/PiotrDabkowski/Js2Py/issues/317
'js2py>=0.74,<1; python_version<"3.12"',
'quickjs-ng>=0.14.0,<1.0.0',
'jsonschema>=4.0.0,<5.0.0',
] + dataframe_dependency,
# Keep the following dependencies in line with what we test against
Expand Down
Loading