Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion magicparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Builder,
builtins as builtins_composite_processors,
)
from .transform import ParsingTransform, Transform
from .transform import ParsingTransform, Transform, TransformError
from .type_converters import TypeConverter, builtins as builtins_type_converters
from typing import Any
from .validators import Validator, builtins as builtins_validators
Expand All @@ -31,6 +31,7 @@
"RowSkipped",
"RowFailed",
"Transform",
"TransformError",
"Validator",
]

Expand Down
135 changes: 131 additions & 4 deletions magicparse/transform.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from abc import ABC, abstractmethod
from collections.abc import Callable, Collection, Sequence
from dataclasses import dataclass
from decimal import Decimal
from enum import StrEnum
from typing import Any, Self
from typing import Any, NoReturn, Self
from jsonata import Jsonata # pyright: ignore[reportMissingTypeStubs]


Expand Down Expand Up @@ -48,7 +50,132 @@ def register(cls, transform: type[Self]) -> None:
cls.registry[transform.key()] = transform


class TransformError(Exception):
def __init__(self, message: str, params: Sequence[Any] | dict[str, Any]) -> None:
super().__init__(message)
match params:
case dict():
for param, value in params.items():
self.add_note(f"param({param}) = {value}")
case _:
for i, value in enumerate(params):
self.add_note(f"param({i}) = {value}")


def coalesce_numbers[T: int | float | Decimal | None](*args: T | None) -> T:
for arg in args:
if arg:
return arg

raise TransformError("No non-zero value to coalesce into", params=args)


def divide[T: int | Decimal](numerator: T, denominator: T) -> float | Decimal:
"JSONata native x / y operator can only divide int and float, but not Decimal"
try:
return numerator / denominator
except Exception as error:
raise TransformError("Cannot divide", params={"numerator": numerator, "denominator": denominator}) from error


def is_positive[T: int | float | Decimal](value: T) -> T:
if value <= 0:
raise TransformError("Value is not positive", params={"value": value})
return value


def left_pad_zeroes(value: str, width: int) -> str:
try:
return value.zfill(width)
except Exception as error:
raise TransformError("Cannot left pad zeroes", params={"value": value, "width": width}) from error


def length(value: Collection[Any]) -> int:
try:
return len(value)
except Exception as error:
raise TransformError("Cannot get length", params={"value": value}) from error


def map_to[K, V](key: K, mapping: dict[K, V]) -> V:
try:
return mapping[key]
except Exception as error:
raise TransformError("Cannot map to", params={"key": key, "mapping": mapping}) from error


class SkippedRow(Exception):
pass


def skip_row(reason: str | None) -> NoReturn:
raise SkippedRow(reason or "")
Copy link
Contributor Author

@ducdetronquito ducdetronquito Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful to emulate a early return because this construct does not exist in JSONata as it is an expression based language where only the last expression is returned.



def strip_whitespaces(value: str) -> str:
try:
return value.strip()
except Exception as error:
raise TransformError("Cannot strip whitespaces", params={"value": value}) from error


def to_decimal(value: str | float | int) -> Decimal:
try:
if isinstance(value, str):
return Decimal(value.strip().replace(",", "."))
else:
return Decimal(value)
except Exception as error:
raise TransformError("Cannot convert to decimal", params={"value": value}) from error


def to_int(value: str) -> int:
try:
return int(value.strip())
except Exception as error:
raise TransformError("Cannot convert to int", params={"value": value}) from error


def type_of(value: Any) -> str:
match value:
case int():
return "int"
case float():
return "float"
case Decimal():
return "decimal"
case str():
return "string"
case _:
return str(type(value)) # pyright: ignore[reportUnknownArgumentType]


class Transform(Jsonata):
@classmethod
def build(cls, expression: str) -> "Transform":
return Transform(expr=expression)
def __init__(self, expression: str) -> None:
super().__init__(expression)
self.validate_input = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, JSONata validate that the input it evaluates (the data, not the expression) is only composed of JSON values like int, float, map, list, null.

But it does not support python Decimal object which we use a lot.

In order to allow that, and reduce unecessary input validation which is costly and likely redundant, I just disable it by default.


@staticmethod
def get_builtin_functions() -> dict[str, Callable[..., Any]]:
return {
"coalesce_numbers": coalesce_numbers,
"divide": divide,
"is_positive": is_positive,
"left_pad_zeroes": left_pad_zeroes,
"length": length,
"map_to": map_to,
"skip_row": skip_row,
"strip_whitespaces": strip_whitespaces,
"to_decimal": to_decimal,
"to_int": to_int,
"type_of": type_of,
}


def _register_builtin_functions():
for function_name, function in Transform.get_builtin_functions().items():
Jsonata.static_frame.bind(function_name, Jsonata.JLambda(function))


_register_builtin_functions()
109 changes: 109 additions & 0 deletions tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from decimal import Decimal
from magicparse import Transform
import pytest

from magicparse.transform import SkippedRow, TransformError


def test_coalesce_numbers():
assert Transform("$coalesce_numbers(1, 2, 3)").evaluate({}) == 1
assert Transform("$coalesce_numbers(0, 2, 3)").evaluate({}) == 2
assert Transform("$coalesce_numbers(0, 0, 3)").evaluate({}) == 3

with pytest.raises(TransformError, match="No non-zero value to coalesce into"):
Transform("$coalesce_numbers(0, 0, 0)").evaluate({})


def test_divide():
assert Transform("$divide(1, 2)").evaluate({}) == 0.5

assert Transform("$divide(a, b)").evaluate({"a": Decimal(1), "b": Decimal(2)}) == Decimal("0.5")

with pytest.raises(TransformError, match="Cannot divide"):
Transform("$divide(1, 0)").evaluate({})


def test_is_positive():
assert Transform("$is_positive(1)").evaluate({})

with pytest.raises(TransformError, match="Value is not positive"):
Transform("$is_positive(0)").evaluate({})

with pytest.raises(TransformError, match="Value is not positive"):
Transform("$is_positive(-1)").evaluate({})


def test_left_pad_zeroes():
assert Transform('$left_pad_zeroes("", 5)').evaluate({}) == "00000"
assert Transform('$left_pad_zeroes("111", 5)').evaluate({}) == "00111"
assert Transform('$left_pad_zeroes("11111", 5)').evaluate({}) == "11111"
assert Transform('$left_pad_zeroes("11111111", 5)').evaluate({}) == "11111111"

with pytest.raises(TransformError, match="Cannot left pad zeroes"):
Transform("$left_pad_zeroes(-1, 5)").evaluate({})


def test_length():
assert Transform('$length(["A", "B", "C"])').evaluate({}) == 3
assert Transform('$length("ABCD")').evaluate({}) == 4
assert Transform("$length($)").evaluate({"a": "a", "b": "b"}) == 2

with pytest.raises(TransformError, match="Cannot get length"):
Transform("$length(5)").evaluate({})


def test_map_to():
expression = """
(
$values := {
"A": 1,
"B": 2
};
input ~> $map_to($values)
)
"""
assert Transform(expression).evaluate({"input": "A"}) == 1
assert Transform(expression).evaluate({"input": "B"}) == 2

with pytest.raises(TransformError, match="Cannot map to"):
Transform(expression).evaluate({"input": "C"})


def test_skip_row():
with pytest.raises(SkippedRow, match="some reason"):
Transform('$skip_row("some reason")').evaluate({})


def test_strip_whitespaces():
assert Transform('$strip_whitespaces("ABC")').evaluate({}) == "ABC"
assert Transform('$strip_whitespaces(" ABC ")').evaluate({}) == "ABC"

with pytest.raises(TransformError, match="Cannot strip whitespaces"):
Transform("$strip_whitespaces(5)").evaluate({})


def test_to_decimal():
assert Transform("$to_decimal(1)").evaluate({}) == Decimal(1)
assert Transform("$to_decimal(1.5)").evaluate({}) == Decimal("1.5")
assert Transform('$to_decimal("1.5")').evaluate({}) == Decimal("1.5")
assert Transform('$to_decimal("1,5")').evaluate({}) == Decimal("1.5")
assert Transform('$to_decimal(" 1.5 ")').evaluate({}) == Decimal("1.5")

with pytest.raises(TransformError, match="Cannot convert to decimal"):
Transform('$to_decimal("abc")').evaluate({})


def test_to_int():
assert Transform('$to_int("15")').evaluate({}) == 15
assert Transform('$to_int(" 15 ")').evaluate({}) == 15

with pytest.raises(TransformError, match="Cannot convert to int"):
Transform('$to_int("abc")').evaluate({})


def test_type_of():
assert Transform('$type_of("abc")').evaluate({}) == "string"
assert Transform("$type_of(1)").evaluate({}) == "int"
assert Transform("$type_of(1.5)").evaluate({}) == "float"
assert Transform("$type_of(input)").evaluate({"input": Decimal("1.5")}) == "decimal"
assert Transform("$type_of({})").evaluate({}) == "<class 'dict'>"
15 changes: 15 additions & 0 deletions typings/jsonata/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from collections.abc import Callable
from typing import Any, ClassVar

class Frame:
def bind(self, name: str, val: Any) -> None: ...

class Jsonata:
static_frame: ClassVar[Frame]
validate_input: bool

def __init__(self, expr: str) -> None: ...
def evaluate(self, input: Any) -> Any: ...

class JLambda:
def __init__(self, function: Callable[..., Any]) -> None: ...