Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def provision_bq_managed_function(
# Augment user package requirements with any internal package
# requirements.
packages = _utils._get_updated_package_requirements(
packages, is_row_processor, capture_references
packages, is_row_processor, capture_references, ignore_numpy_version=True
)
if packages:
managed_function_options["packages"] = packages
Expand Down Expand Up @@ -277,7 +277,7 @@ def provision_bq_managed_function(
import cloudpickle

pickled = cloudpickle.dumps(func)
udf_code = textwrap.dedent(
func_code = textwrap.dedent(
f"""
import cloudpickle
{udf_name} = cloudpickle.loads({pickled})
Expand All @@ -287,11 +287,26 @@ def provision_bq_managed_function(
# This code path ensures that if the udf body is self contained,
# i.e. there are no references to variables or imports outside the
# body.
udf_code = textwrap.dedent(inspect.getsource(func))
match = re.search(r"^def ", udf_code, flags=re.MULTILINE)
func_code = textwrap.dedent(inspect.getsource(func))
match = re.search(r"^def ", func_code, flags=re.MULTILINE)
if match is None:
raise ValueError("The UDF is not defined correctly.")
udf_code = udf_code[match.start() :]
func_code = func_code[match.start() :]

if is_row_processor:
Comment thread
jialuoo marked this conversation as resolved.
Outdated
Comment thread
jialuoo marked this conversation as resolved.
Outdated
udf_code = textwrap.dedent(inspect.getsource(bff_template.get_pd_series))
udf_code = udf_code[udf_code.index("def") :]
bigframes_handler_code = textwrap.dedent(
f"""def bigframes_handler(str_arg):
return {udf_name}({bff_template.get_pd_series.__name__}(str_arg))"""
Comment thread
jialuoo marked this conversation as resolved.
Outdated
)
else:
udf_code = ""
bigframes_handler_code = textwrap.dedent(
f"""def bigframes_handler(*args):
return {udf_name}(*args)"""
)
udf_code = f"{udf_code}\n{func_code}"

with_connection_clause = (
(
Expand All @@ -311,8 +326,7 @@ def provision_bq_managed_function(
OPTIONS ({managed_function_options_str})
AS r'''
__UDF_PLACE_HOLDER__
def bigframes_handler(*args):
return {udf_name}(*args)
{bigframes_handler_code}
'''
"""
)
Expand Down
4 changes: 2 additions & 2 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,15 +847,15 @@ def wrapper(func):
if output_type:
py_sig = py_sig.replace(return_annotation=output_type)

udf_sig = udf_def.UdfSignature.from_py_signature(py_sig)

# The function will actually be receiving a pandas Series, but allow
# both BigQuery DataFrames and pandas object types for compatibility.
is_row_processor = False
if new_sig := _convert_row_processor_sig(py_sig):
py_sig = new_sig
is_row_processor = True

udf_sig = udf_def.UdfSignature.from_py_signature(py_sig)

managed_function_client = _function_client.FunctionClient(
dataset_ref.project,
bq_location,
Expand Down
14 changes: 12 additions & 2 deletions bigframes/functions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ def get_remote_function_locations(bq_location):


def _get_updated_package_requirements(
package_requirements=None, is_row_processor=False, capture_references=True
package_requirements=None,
Comment thread
jialuoo marked this conversation as resolved.
is_row_processor=False,
capture_references=True,
ignore_numpy_version=False,
Comment thread
jialuoo marked this conversation as resolved.
Outdated
):
requirements = []
if capture_references:
Expand All @@ -72,9 +75,16 @@ def _get_updated_package_requirements(
# would be converted to a pandas series and processed Ensure numpy
# versions match to avoid unpickling problems. See internal issue
# b/347934471.
requirements.append(f"numpy=={numpy.__version__}")
requirements.append(f"pandas=={pandas.__version__}")
requirements.append(f"pyarrow=={pyarrow.__version__}")
# TODO(jialuo): Add back the version after b/410924784 is resolved.
# Due to current limitations on the numpy version in Python UDFs, we use
# `ignore_numpy_version` to optionally omit the version for managed
# functions only.
numpy_package = (
"numpy" if ignore_numpy_version else f"numpy=={numpy.__version__}"
)
requirements.append(numpy_package)

if package_requirements:
requirements.extend(package_requirements)
Expand Down
223 changes: 223 additions & 0 deletions tests/system/large/functions/test_managed_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,226 @@ def foo(x: int) -> int:
container_cpu=2,
container_memory="64Mi",
)(foo)


def test_managed_function_df_apply_axis_1(session, dataset_id, scalars_dfs):
columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"]
scalars_df, scalars_pandas_df = scalars_dfs
try:

def serialize_row(row):
# Explicitly casting types ensures consistent behavior between
Comment thread
jialuoo marked this conversation as resolved.
Outdated
# BigFrames and pandas. Without it, BigFrames return plain Python
# types, e.g. 0, while pandas return NumPy types, e.g. np.int64(0),
# which could lead to mismatches and requires further investigation.
Comment thread
jialuoo marked this conversation as resolved.
Outdated
custom = {
"name": int(row.name),
"index": [idx for idx in row.index],
"values": [
val.item() if hasattr(val, "item") else val for val in row.values
],
}

return str(
{
"default": row.to_json(),
"split": row.to_json(orient="split"),
"records": row.to_json(orient="records"),
"index": row.to_json(orient="index"),
"table": row.to_json(orient="table"),
"custom": custom,
}
)

serialize_row_mf = session.udf(
input_types=bigframes.series.Series,
output_type=str,
dataset=dataset_id,
name=prefixer.create_prefix(),
)(serialize_row)

assert getattr(serialize_row_mf, "is_row_processor")

bf_result = scalars_df[columns].apply(serialize_row_mf, axis=1).to_pandas()
pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1)

# bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object'
# , ignore this mismatch by using check_dtype=False.
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)

# Let's make sure the read_gbq_function path works for this function.
serialize_row_reuse = session.read_gbq_function(
serialize_row_mf.bigframes_bigquery_function, is_row_processor=True
)
bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas()
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)

finally:
# clean up the gcp assets created for the managed function.
cleanup_function_assets(
serialize_row_mf, session.bqclient, session.cloudfunctionsclient
)


def test_managed_function_df_apply_axis_1_aggregates(session, dataset_id, scalars_dfs):
columns = ["int64_col", "int64_too", "float64_col"]
scalars_df, scalars_pandas_df = scalars_dfs

try:

def analyze(row):
# Explicitly casting types ensures consistent behavior between
# BigFrames and pandas. Without it, BigFrames return plain Python
# types, e.g. 0, while pandas return NumPy types, e.g. np.int64(0),
# which could lead to mismatches and requires further investigation.
return str(
{
"dtype": row.dtype,
"count": int(row.count()),
"min": int(row.min()),
"max": int(row.max()),
"mean": float(row.mean()),
"std": float(row.std()),
"var": float(row.var()),
}
)

analyze_mf = session.udf(
input_types=bigframes.series.Series,
output_type=str,
dataset=dataset_id,
name=prefixer.create_prefix(),
)(analyze)

assert getattr(analyze_mf, "is_row_processor")

bf_result = scalars_df[columns].dropna().apply(analyze_mf, axis=1).to_pandas()
pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1)

# bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object'
# , ignore this mismatch by using check_dtype=False.
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)

finally:
# clean up the gcp assets created for the managed function.
cleanup_function_assets(
analyze_mf, session.bqclient, session.cloudfunctionsclient
)


@pytest.mark.parametrize(
("pd_df",),
[
pytest.param(
pandas.DataFrame(
{
"2": [1, 2, 3],
2: [1.5, 3.75, 5],
"name, [with. special'- chars\")/\\": [10, 20, 30],
(3, 4): ["pq", "rs", "tu"],
(5.0, "six", 7): [8, 9, 10],
'raise Exception("hacked!")': [11, 12, 13],
},
# Default pandas index has non-numpy type, whereas bigframes is
# always numpy-based type, so let's use the index compatible
# with bigframes. See more details in b/369689696.
index=pandas.Index([0, 1, 2], dtype=pandas.Int64Dtype()),
),
id="all-kinds-of-column-names",
),
pytest.param(
pandas.DataFrame(
{
"x": [1, 2, 3],
"y": [1.5, 3.75, 5],
"z": ["pq", "rs", "tu"],
},
index=pandas.MultiIndex.from_frame(
pandas.DataFrame(
{
"idx0": pandas.Series(
["a", "a", "b"], dtype=pandas.StringDtype()
),
"idx1": pandas.Series(
[100, 200, 300], dtype=pandas.Int64Dtype()
),
}
)
),
),
id="multiindex",
marks=pytest.mark.skip(
reason="TODO: revert this skip after this pandas bug is fixed: https://github.com/pandas-dev/pandas/issues/59908"
),
),
pytest.param(
pandas.DataFrame(
[
[10, 1.5, "pq"],
[20, 3.75, "rs"],
[30, 8.0, "tu"],
],
# Default pandas index has non-numpy type, whereas bigframes is
# always numpy-based type, so let's use the index compatible
# with bigframes. See more details in b/369689696.
index=pandas.Index([0, 1, 2], dtype=pandas.Int64Dtype()),
columns=pandas.MultiIndex.from_arrays(
[
["first", "last_two", "last_two"],
[1, 2, 3],
]
),
),
id="column-multiindex",
),
],
)
def test_managed_function_df_apply_axis_1_complex(session, dataset_id, pd_df):
bf_df = session.read_pandas(pd_df)

try:

def serialize_row(row):
# Explicitly casting types ensures consistent behavior between
# BigFrames and pandas. Without it, BigFrames return plain Python
# types, e.g. 0, while pandas return NumPy types, e.g. np.int64(0),
# which could lead to mismatches and requires further investigation.
custom = {
"name": int(row.name),
"index": [idx for idx in row.index],
"values": [
val.item() if hasattr(val, "item") else val for val in row.values
],
}
return str(
{
"default": row.to_json(),
"split": row.to_json(orient="split"),
"records": row.to_json(orient="records"),
"index": row.to_json(orient="index"),
"custom": custom,
}
)

serialize_row_mf = session.udf(
input_types=bigframes.series.Series,
output_type=str,
dataset=dataset_id,
name=prefixer.create_prefix(),
)(serialize_row)

assert getattr(serialize_row_mf, "is_row_processor")

bf_result = bf_df.apply(serialize_row_mf, axis=1).to_pandas()
pd_result = pd_df.apply(serialize_row, axis=1)

# ignore known dtype difference between pandas and bigframes.
pandas.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)

finally:
# clean up the gcp assets created for the managed function.
cleanup_function_assets(
serialize_row_mf, session.bqclient, session.cloudfunctionsclient
Comment thread
jialuoo marked this conversation as resolved.
Outdated
)
Comment thread
jialuoo marked this conversation as resolved.