Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
RenameColumns,
SkTransform,
TimeGradient,
TimeIntegrate,
ReplaceDuplicated,
STLFilter,
FillGapsAR,
Expand Down Expand Up @@ -279,6 +280,38 @@ def test_pd_time_gradient(self):
pd.testing.assert_frame_equal(ref, res, rtol=0.01)
check_feature_names_out(derivator, res)

def test_pd_time_integral(self):
test = pd.DataFrame(
{
"cpt1__W": [360.0, 360.0, 180.0, -5.68e-14, 180.0, 360.0],
"cpt2__W": [360.0, 360.0, 180.0, -5.68e-14, 180.0, 360.0],
},
index=pd.date_range("2009-01-01 00:00:00", freq="10s", periods=6, tz="UTC"),
)

ref = pd.DataFrame(
{
"cpt1__J": [0, 3600.0, 6300.0, 7200.0, 8100.0, 10800.0],
"cpt2__J": [0, 3600.0, 6300.0, 7200.0, 8100.0, 10800.0],
},
index=pd.date_range("2009-01-01 00:00:00", freq="10s", periods=6, tz="UTC"),
)

integrator = TimeIntegrate(new_unit="J")
res = integrator.fit_transform(test)
pd.testing.assert_frame_equal(ref, res, rtol=0.01)

integrator = TimeIntegrate()
res = integrator.fit_transform(test)
ref.columns = ["cpt1__W.s", "cpt2__W.s"]
pd.testing.assert_frame_equal(ref, res, rtol=0.01)

integrator = TimeIntegrate(drop_columns=False)
res = integrator.fit_transform(test)
pd.testing.assert_frame_equal(pd.concat([test, ref], axis=1), res, rtol=0.01)

check_feature_names_out(integrator, res)

def test_pd_ffill(self):
test = pd.DataFrame(
{
Expand Down
190 changes: 189 additions & 1 deletion tide/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,11 @@ class TimeGradient(BaseProcessing):
"""A transformer that calculates the time gradient (derivative) of a pandas DataFrame.

This transformer computes the rate of change of values with respect to time.
The gradient is calculated using the time difference between consecutive data points.
The gradient is calculated using np centered gradient method.

d(i) = (yi-1 - yi+1) / (ti-1 - ti+1)

Method assume linear variation at boundaries

Parameters
----------
Expand Down Expand Up @@ -772,6 +776,190 @@ def _transform_implementation(self, X: pd.Series | pd.DataFrame):
return derivative


class TimeIntegrate(BaseProcessing):
"""
A transformer that calculates the time integral (cumulative sum over time) of a
pandas DataFrame.

This transformer computes the cumulative integral of values with respect to time.
The integral is calculated using the trapezoidal rule between consecutive data points.

Int[i] = Int[i - 1] + dt * (Yi-1 + Yi) / 2

CAUTION given that TimeGradient is a centered derivative. This is not an exact
inverse transform.

Parameters
----------
new_unit : str, optional (default=None)
The new unit to apply to the column names after transformation.
If provided, the transformer will update the unit part of the column names
(the part after the second "__" in the Tide naming convention).
Example: If input columns are "power__W__building" and new_unit="J",
output columns will be "power__J__building".
If not provided and drop_columns=False, the unit will be "originalunit.s"

initial_value : float or dict, optional (default=0.0)
The initial value(s) for the integral at the first timestamp.
Can be:
- float: Same initial value for all columns
- dict: Mapping of column names to their initial values
Example: initial_value={"power__W__building": 1000.0}

drop_columns : bool, optional (default=True)
If True, only returns the integrated columns (replaces original data).
If False, keeps both the original columns and adds new integrated columns
with updated units.

Examples
--------
>>> import pandas as pd
>>> import numpy as np
>>> # Create DataFrame with DateTimeIndex
>>> dates = pd.date_range(
... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min"
... ).tz_localize("UTC")
>>> # Create power data (in Watts) - constant 6000W consumption
>>> df = pd.DataFrame(
... {"power__W__building": [6000, 6000, 6000, 6000, 6000]},
... index=dates,
... )
>>> # Calculate energy (J) from power (W) using time integration
>>> # Energy = integral of Power over time (in seconds)
>>> transformer = TimeIntegral(new_unit="J")
>>> result = transformer.fit_transform(df)
>>> print(result)
power__J__building
2024-01-01 00:00:00+00:00 0.0
2024-01-01 00:01:00+00:00 360000.0
2024-01-01 00:02:00+00:00 720000.0
2024-01-01 00:03:00+00:00 1080000.0
2024-01-01 00:04:00+00:00 1440000.0

>>> # Keep original columns alongside integrated ones
>>> transformer_keep = TimeIntegral(new_unit="J", drop_columns=False)
>>> result_keep = transformer_keep.fit_transform(df)
>>> print(result_keep)
power__W__building power__J__building
2024-01-01 00:00:00+00:00 6000 0.0
2024-01-01 00:01:00+00:00 6000 360000.0
2024-01-01 00:02:00+00:00 6000 720000.0
2024-01-01 00:03:00+00:00 6000 1080000.0
2024-01-01 00:04:00+00:00 6000 1440000.0

>>> # Without specifying new_unit, the unit becomes "W.s"
>>> transformer_auto = TimeIntegral(drop_columns=False)
>>> result_auto = transformer_auto.fit_transform(df)
>>> print(result_auto)
power__W__building power__W.s__building
2024-01-01 00:00:00+00:00 6000 0.0
2024-01-01 00:01:00+00:00 6000 360000.0
2024-01-01 00:02:00+00:00 6000 720000.0
2024-01-01 00:03:00+00:00 6000 1080000.0
2024-01-01 00:04:00+00:00 6000 1440000.0

Notes
-----
- The time integral is calculated using the trapezoidal rule:
integral += (time2 - time1) * (value1 + value2) / 2
- This simulates an energy meter that accumulates energy over time
- The first value is set to initial_value (default 0.0)
- Time differences are calculated in seconds
- When using new_unit, the transformer follows the Tide naming convention
of "name__unit__block" for column names
- For irregular time series, the integration automatically adapts to
the varying time steps
- When drop_columns=False, the original columns are preserved and new
integrated columns are added with the appropriate unit suffix

Returns
-------
pd.DataFrame
If drop_columns=True: DataFrame with cumulative time integrals for each column.
If drop_columns=False: DataFrame with both original and integrated columns.
The output maintains the same DateTimeIndex as the input.
If new_unit is specified, the integrated column names are updated accordingly.
"""

def __init__(
self,
new_unit: str = None,
initial_value: float | dict = 0.0,
drop_columns: bool = True,
):
super().__init__()
self.new_unit = new_unit
self.initial_value = initial_value
self.drop_columns = drop_columns

def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None):
# Determine the unit to use for integrated columns
if self.new_unit is not None:
unit_suffix = self.new_unit
else:
# Extract original unit and append ".s"
# Assuming Tide naming convention: "name__unit__block__sub_block"
unit_suffix = None
sample_col = X.columns[0] if len(X.columns) > 0 else None
if sample_col and "__" in sample_col:
parts = sample_col.split("__")
if len(parts) >= 2:
original_unit = parts[1]
unit_suffix = f"{original_unit}.s"

# Create feature names for integrated columns
self.integrated_feature_names_ = self.get_set_tags_values_columns(
X.copy(), 1, unit_suffix
)

# Determine final output feature names
if self.drop_columns:
self.feature_names_out_ = self.integrated_feature_names_
else:
self.feature_names_out_ = list(X.columns) + list(
self.integrated_feature_names_
)

def _transform_implementation(self, X: pd.Series | pd.DataFrame):
check_is_fitted(self, attributes=["feature_names_in_", "feature_names_out_"])

if len(X) == 0:
return X

time_diffs = np.diff(X.index.view("int64")) * 1e-9

# Create DataFrame for integrated values
integrated = pd.DataFrame(index=X.index, columns=X.columns, dtype=float)

for col in X.columns:
values = X[col].to_numpy()

if isinstance(self.initial_value, dict):
init_val = self.initial_value.get(col, 0.0)
else:
init_val = self.initial_value

integrals = np.zeros(len(values))
integrals[0] = init_val

for i in range(1, len(values)):
dt = time_diffs[i - 1]

# Trapezoidal rule: area = dt * (y1 + y2) / 2
increment = dt * (values[i - 1] + values[i]) / 2
integrals[i] = integrals[i - 1] + increment

integrated[col] = integrals

integrated.columns = self.integrated_feature_names_

if self.drop_columns:
return integrated
else:
result = pd.concat([X, integrated], axis=1)
return result


class Ffill(BaseFiller, BaseProcessing):
"""A transformer that forward-fills missing values in a pandas DataFrame.

Expand Down
Loading