Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions .github/workflows/hamilton-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
uv run pytest plugin_tests/h_dask

- name: Test ray
if: ${{ matrix.python-version != '3.14' }}
env:
RAY_ENABLE_UV_RUN_RUNTIME_ENV: 0 # https://github.com/ray-project/ray/issues/53848
run: |
Expand All @@ -121,7 +122,7 @@ jobs:
run: |
sudo apt-get install --no-install-recommends --yes default-jre
uv sync --group test --extra pyspark
uv pip install 'numpy<2' 'pyspark[connect]' 'grpcio'
uv pip install 'pyspark[connect]' 'grpcio'
uv pip install --no-cache --reinstall --strict 'grpcio-status >= 1.48.1'
uv run pytest plugin_tests/h_spark

Expand All @@ -131,23 +132,23 @@ jobs:
PYSPARK_SUBMIT_ARGS: "--conf spark.sql.ansi.enabled=false pyspark-shell"
run: |
uv sync --group test --extra pyspark
uv pip install 'numpy<2' 'pyspark[connect]' 'grpcio'
uv pip install 'pyspark[connect]' 'grpcio'
uv pip install --no-cache --reinstall --strict 'grpcio-status >= 1.48.1'
uv run pytest plugin_tests/h_spark

# Vaex 4.19 supports py<=3.12 and numpy>2 (https://github.com/vaexio/vaex/pull/2449) but limited by dask<2024.9
# For now the test matrix is py3.10 and numpy<2
- name: Test vaex
# Vaex supports <= py3.10 and numpy<2
if: ${{ runner.os == 'Linux' && matrix.python-version == '3.10' }}
run: |
sudo apt-get install --no-install-recommends --yes libpcre3-dev cargo
uv sync --group test --extra vaex
uv pip install "numpy<2"
uv run pytest plugin_tests/h_vaex
uv run --no-sync pytest plugin_tests/h_vaex

- name: Test vaex
# Vaex supports <= py3.10 and numpy<2
if: ${{ runner.os != 'Linux' && matrix.python-version == '3.10' }}
run: |
uv sync --group test --extra vaex
uv pip install "numpy<2"
uv run pytest plugin_tests/h_vaex
uv run --no-sync pytest plugin_tests/h_vaex
1 change: 1 addition & 0 deletions .github/workflows/hamilton-sdk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ${{ github.workspace }}
pip install -r requirements.txt
pip install -r requirements-test.txt
pip install -e .
Expand Down
15 changes: 11 additions & 4 deletions hamilton/plugins/h_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,15 @@ def _inspect_kwargs(kwargs: dict[str, Any]) -> tuple[DataFrame, dict[str, Any]]:
def _format_pandas_udf(func_name: str, ordered_params: list[str]) -> str:
formatting_params = {
"name": func_name,
"return_type": "pd.Series",
"params": ", ".join([f"{param}: pd.Series" for param in ordered_params]),
"params": ", ".join(ordered_params),
"param_call": ", ".join([f"{param}={param}" for param in ordered_params]),
}
# NOTE: we intentionally omit type annotations here. The return type is passed
# explicitly to pyspark's pandas_udf(), and parameter annotations are not needed.
# On Python 3.14+, annotations in dynamically compiled code create __annotate__
# functions (PEP 749) that break PySpark's UDF serialization.
func_string = """
def {name}({params}) -> {return_type}:
def {name}({params}):
return partial_fn({param_call})
""".format(**formatting_params)
return func_string
Expand Down Expand Up @@ -380,7 +383,11 @@ def _fabricate_spark_function(
else:
func_string = _format_udf(func_name, ordered_params)
module_code = compile(func_string, "<string>", "exec")
func_code = [c for c in module_code.co_consts if isinstance(c, CodeType)][0]
# Filter by name to avoid picking up __annotate__ or other helper code objects
# that Python 3.14+ may generate alongside the function (PEP 749).
func_code = [
c for c in module_code.co_consts if isinstance(c, CodeType) and c.co_name == func_name
][0]
return FunctionType(func_code, {**globals(), **{"partial_fn": partial_fn}}, func_name)


Expand Down
16 changes: 8 additions & 8 deletions hamilton/plugins/numpy_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ class NumpyNpyWriter(DataSaver):
fix_imports: bool | None = None

def save_data(self, data: np.ndarray) -> dict[str, Any]:
np.save(
file=self.path,
arr=data,
allow_pickle=self.allow_pickle,
fix_imports=self.fix_imports,
)
kwargs = dict(file=self.path, arr=data, allow_pickle=self.allow_pickle)
if np.__version__ < "2.4" and self.fix_imports is not None:
kwargs["fix_imports"] = self.fix_imports
np.save(**kwargs)
return utils.get_file_metadata(self.path)

@classmethod
Expand Down Expand Up @@ -77,13 +75,15 @@ def applicable_types(cls) -> Collection[type]:
return [np.ndarray]

def load_data(self, type_: type) -> tuple[np.ndarray, dict[str, Any]]:
array = np.load(
kwargs = dict(
file=self.path,
mmap_mode=self.mmap_mode,
allow_pickle=self.allow_pickle,
fix_imports=self.fix_imports,
encoding=self.encoding,
)
if np.__version__ < "2.4" and self.fix_imports is not None:
kwargs["fix_imports"] = self.fix_imports
array = np.load(**kwargs)
metadata = utils.get_file_metadata(self.path)
return array, metadata

Expand Down
6 changes: 3 additions & 3 deletions hamilton/plugins/pandas_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ def _get_loading_kwargs(self) -> dict[str, Any]:
kwargs["keep_default_na"] = self.keep_default_na
if self.na_filter is not None:
kwargs["na_filter"] = self.na_filter
if self.verbose is not None:
if pd.__version__ < "3.0" and self.verbose is not None:
kwargs["verbose"] = self.verbose
if self.skip_blank_lines is not None:
kwargs["skip_blank_lines"] = self.skip_blank_lines
if self.parse_dates is not None:
kwargs["parse_dates"] = self.parse_dates
if self.keep_date_col is not None:
if pd.__version__ < "3.0" and self.keep_date_col is not None:
kwargs["keep_date_col"] = self.keep_date_col
if self.date_format is not None:
kwargs["date_format"] = self.date_format
Expand Down Expand Up @@ -242,7 +242,7 @@ def _get_loading_kwargs(self) -> dict[str, Any]:
kwargs["dialect"] = self.dialect
if self.on_bad_lines is not None:
kwargs["on_bad_lines"] = self.on_bad_lines
if self.delim_whitespace is not None:
if pd.__version__ < "3.0" and self.delim_whitespace is not None:
kwargs["delim_whitespace"] = self.delim_whitespace
if self.low_memory is not None:
kwargs["low_memory"] = self.low_memory
Expand Down
6 changes: 1 addition & 5 deletions hamilton/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,7 @@ def initialize():

try:
load_extension(extension_name)
except NotImplementedError as e:
logger.debug(f"Did not load {extension_name} extension because {str(e)}.")
except ModuleNotFoundError as e:
logger.debug(f"Did not load {extension_name} extension because {e.msg}.")
except ImportError as e:
except (NotImplementedError, ImportError, Warning) as e:
logger.debug(f"Did not load {extension_name} extension because {str(e)}.")

global INITIALIZED
Expand Down
4 changes: 2 additions & 2 deletions plugin_tests/h_dask/test_h_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def test_smoke_screen_module(client):
]


@pytest.mark.parametrize("outputs, expected", dd_test_cases, ids=dd_test_case_ids)
@pytest.mark.parametrize(("outputs", "expected"), dd_test_cases, ids=dd_test_case_ids)
def test_DDFR_build_result_pandas(client, outputs: dict[str, typing.Any], expected: dd.DataFrame):
"""Tests using pandas objects works"""
actual = h_dask.DaskDataFrameResult.build_result(**outputs)
Expand All @@ -215,7 +215,7 @@ def test_DDFR_build_result_pandas(client, outputs: dict[str, typing.Any], expect
pd.testing.assert_frame_equal(actual_pdf, expected_pdf)


@pytest.mark.parametrize("outputs, expected", dd_test_cases, ids=dd_test_case_ids)
@pytest.mark.parametrize(("outputs", "expected"), dd_test_cases, ids=dd_test_case_ids)
def test_DDFR_build_result_dask(client, outputs: dict[str, typing.Any], expected: dd.DataFrame):
"""Tests that using dask objects works."""
dask_outputs = {}
Expand Down
13 changes: 13 additions & 0 deletions plugin_tests/h_ray/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,20 @@
# specific language governing permissions and limitations
# under the License.

import sys

from hamilton import telemetry

# disable telemetry for all tests!
telemetry.disable_telemetry()

# Skip tests that require packages not yet available on Python 3.14
collect_ignore = []
if sys.version_info >= (3, 14):
collect_ignore.extend(
[
# ray - no Python 3.14 support yet
"test_h_ray.py",
"test_parse_ray_remote_options_from_tags.py",
]
)
2 changes: 1 addition & 1 deletion plugin_tests/h_spark/test_h_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def spark_session():


@pytest.mark.parametrize(
"spark_type,arrow_type",
("spark_type", "arrow_type"),
[
(pt.NullType(), pa.null()),
(pt.BooleanType(), pa.bool_()),
Expand Down
17 changes: 8 additions & 9 deletions plugin_tests/h_spark/test_h_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def test_smoke_screen_udf_graph_adapter(spark_session):

# Test cases for python_to_spark_type function
@pytest.mark.parametrize(
"python_type,expected_spark_type",
("python_type", "expected_spark_type"),
[
(int, types.IntegerType()),
(float, types.FloatType()),
Expand All @@ -341,7 +341,7 @@ def test_python_to_spark_type_invalid(invalid_python_type):
# Test cases for get_spark_type function
# 1. Basic Python types
@pytest.mark.parametrize(
"return_type,expected_spark_type",
("return_type", "expected_spark_type"),
[
(int, types.IntegerType()),
(float, types.FloatType()),
Expand All @@ -356,7 +356,7 @@ def test_get_spark_type_basic_types(return_type, expected_spark_type):

# 2. Lists of basic Python types
@pytest.mark.parametrize(
"return_type,expected_spark_type",
("return_type", "expected_spark_type"),
[
(int, types.ArrayType(types.IntegerType())),
(float, types.ArrayType(types.FloatType())),
Expand All @@ -372,7 +372,7 @@ def test_get_spark_type_list_types(return_type, expected_spark_type):

# 3. Numpy types (assuming you have a numpy_to_spark_type function that handles these)
@pytest.mark.parametrize(
"return_type,expected_spark_type",
("return_type", "expected_spark_type"),
[
(np.int64, types.IntegerType()),
(np.float64, types.FloatType()),
Expand Down Expand Up @@ -548,7 +548,7 @@ def _two_pyspark_dataframe_parameters(foo: DataFrame, bar: int, baz: DataFrame)


@pytest.mark.parametrize(
"fn,requested_parameter,expected",
("fn", "requested_parameter", "expected"),
[
(_only_pyspark_dataframe_parameter, "foo", "foo"),
(_one_pyspark_dataframe_parameter, "foo", "foo"),
Expand All @@ -564,7 +564,7 @@ def test_derive_dataframe_parameter_succeeds(fn, requested_parameter, expected):


@pytest.mark.parametrize(
"fn,requested_parameter",
("fn", "requested_parameter"),
[
(_no_pyspark_dataframe_parameter, "foo"),
(_no_pyspark_dataframe_parameter, None),
Expand Down Expand Up @@ -735,8 +735,7 @@ def df_as_pandas(df: DataFrame) -> pd.DataFrame:

def test__format_pandas_udf():
assert (
h_spark._format_pandas_udf("foo", ["a", "b"]).strip()
== "def foo(a: pd.Series, b: pd.Series) -> pd.Series:\n"
h_spark._format_pandas_udf("foo", ["a", "b"]).strip() == "def foo(a, b):\n"
" return partial_fn(a=a, b=b)"
)

Expand Down Expand Up @@ -871,7 +870,7 @@ def not_pyspark_fn(foo: DataFrame, bar: DataFrame) -> DataFrame:


@pytest.mark.parametrize(
"fn,expected", [(pyspark_fn_1, True), (pyspark_fn_2, True), (not_pyspark_fn, False)]
("fn", "expected"), [(pyspark_fn_1, True), (pyspark_fn_2, True), (not_pyspark_fn, False)]
)
def test_is_default_pyspark_node(fn, expected):
node_ = node.Node.from_fn(fn)
Expand Down
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pandera = ["pandera"]
pydantic = ["pydantic>=2.0"]
pyspark = [
# we have to run these dependencies because Spark does not check to ensure the right target was called
"pyspark[pandas_on_spark,sql]",
"pyspark[pandas_on_spark,sql] >= 4.0.0",
]
ray = ["ray>=2.0.0; python_version < '3.14'", "pyarrow"]
rich = ["rich"]
Expand Down Expand Up @@ -149,13 +149,13 @@ docs = [
"mock==1.0.1", # read the docs pins
"myst-nb",
"narwhals",
"numpy < 2.0.0",
"numpy",
"packaging",
"pandera",
"pillow",
"polars",
"pyarrow >= 1.0.0",
"pydantic >=2.0",
"pydantic >= 2.0",
"pyspark",
"openlineage-python",
"PyYAML",
Expand Down Expand Up @@ -223,6 +223,8 @@ extend-select = [
# "PERF",# Linting rules for performance
# "PIE", # flake8-pie rules
# "PT", # flake8-pytest-style rules
"PT006",
"PT007",
# "PYI", # Linting rules for type annotations.
"Q", # Linting rules for quites
# "RUF", # Unused noqa directive
Expand Down
2 changes: 1 addition & 1 deletion tests/caching/result_store/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_delete_all(file_store):


@pytest.mark.parametrize(
"format,value",
("format", "value"),
[
("json", {"key1": "value1", "key2": 2}),
("pickle", ("value1", "value2", "value3")),
Expand Down
4 changes: 2 additions & 2 deletions tests/caching/test_fingerprinting.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, obj):


@pytest.mark.parametrize(
"obj,expected_hash",
("obj", "expected_hash"),
[
("hello-world", "IJUxIYl1PeatR9_iDL6X7A=="),
(17.31231, "vAYX8MD8yEHK6dwnIPVUaw=="),
Expand All @@ -142,7 +142,7 @@ def test_hash_primitive(obj, expected_hash):


@pytest.mark.parametrize(
"obj,expected_hash",
("obj", "expected_hash"),
[
([0, True, "hello-world"], "Pg9LP3Y-8yYsoWLXedPVKDwTAa7W8_fjJNTTUA=="),
((17.0, False, "world"), "wyuuKMuL8rp53_CdYAtyMmyetnTJ9LzmexhJrQ=="),
Expand Down
4 changes: 3 additions & 1 deletion tests/caching/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,9 @@ def foo() -> dict:


@pytest.mark.parametrize(
"executor,metadata_store,result_store", EXECUTORS_AND_STORES_CONFIGURATIONS, indirect=True
("executor", "metadata_store", "result_store"),
EXECUTORS_AND_STORES_CONFIGURATIONS,
indirect=True,
)
def test_parallel_synchronous_step_by_step(executor, metadata_store, result_store): # noqa: F811
dr = (
Expand Down
2 changes: 1 addition & 1 deletion tests/caching/test_result_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_delete_all(result_store):


@pytest.mark.parametrize(
"format,value",
("format", "value"),
[
("json", {"key1": "value1", "key2": 2}),
("pickle", ("value1", "value2", "value3")),
Expand Down
2 changes: 1 addition & 1 deletion tests/execution/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def create_dummy_task(task_purpose: NodeGroupPurpose):


@pytest.mark.parametrize(
"purpose, check",
("purpose", "check"),
[
(NodeGroupPurpose.EXECUTE_BLOCK, lambda x: isinstance(x, MultiProcessingExecutor)),
(NodeGroupPurpose.EXECUTE_SINGLE, lambda x: isinstance(x, SynchronousLocalTaskExecutor)),
Expand Down
4 changes: 2 additions & 2 deletions tests/execution/test_graph_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _assert_topologically_sorted(nodes, sorted_nodes):


@pytest.mark.parametrize(
"dag_input, expected_sorted_nodes",
("dag_input", "expected_sorted_nodes"),
[
({"a": [], "b": ["a"], "c": ["a"], "d": ["b", "c"], "e": ["d"]}, ["a", "b", "c", "d", "e"]),
({}, []),
Expand Down Expand Up @@ -109,7 +109,7 @@ def _inner(n: node.Node) -> bool:


@pytest.mark.parametrize(
"dag_repr, expected_nodes_in_between, start_node, end_node",
("dag_repr", "expected_nodes_in_between", "start_node", "end_node"),
[
(
{"a": [], "b": ["a"], "c": ["b"]},
Expand Down
2 changes: 1 addition & 1 deletion tests/function_modifiers/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def fn_bool_inject(injected_data: bool) -> bool:
# Note that this tests an internal API, but we would like to test this to ensure
# class selection is correct
@pytest.mark.parametrize(
"type_,classes,correct_class",
("type_", "classes", "correct_class"),
[
(str, [StringDataLoader, IntDataLoader, IntDataLoader2], StringDataLoader),
(int, [StringDataLoader, IntDataLoader, IntDataLoader2], IntDataLoader2),
Expand Down
Loading
Loading