Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ jobs:
python -m unittest discover tests/datamining/
python -m unittest discover tests/downloaders/
python -m unittest discover tests/interpolation/
python -m unittest discover tests/predictor/
python -m unittest discover tests/wrappers/
32 changes: 25 additions & 7 deletions bluemath_tk/core/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,25 +130,42 @@ def wrapper(
min_number_of_points: int = None,
max_number_of_iterations: int = 10,
normalize_data: bool = False,
regression_guided: Dict[str, List] = {},
):
if data is None:
raise ValueError("Data cannot be None")
raise ValueError("data cannot be None")
elif not isinstance(data, pd.DataFrame):
raise TypeError("Data must be a pandas DataFrame")
raise TypeError("data must be a pandas DataFrame")
if not isinstance(directional_variables, list):
raise TypeError("Directional variables must be a list")
raise TypeError("directional_variables must be a list")
if not isinstance(custom_scale_factor, dict):
raise TypeError("Custom scale factor must be a dict")
raise TypeError("custom_scale_factor must be a dict")
if min_number_of_points is not None:
if not isinstance(min_number_of_points, int) or min_number_of_points <= 0:
raise ValueError("Minimum number of points must be integer and > 0")
raise ValueError("min_number_of_points must be integer and > 0")
if (
not isinstance(max_number_of_iterations, int)
or max_number_of_iterations <= 0
):
raise ValueError("Maximum number of iterations must be integer and > 0")
raise ValueError("max_number_of_iterations must be integer and > 0")
if not isinstance(normalize_data, bool):
raise TypeError("Normalize data must be a boolean")
raise TypeError("normalize_data must be a boolean")
if not isinstance(regression_guided, dict):
raise TypeError("regression_guided must be a dictionary")
if not all(
isinstance(var, str) and var in data.columns
for var in regression_guided.get("vars", [])
):
raise TypeError(
"regression_guided vars must be a list of strings and must exist in data"
)
if not all(
isinstance(alpha, float) and alpha >= 0 and alpha <= 1
for alpha in regression_guided.get("alpha", [])
):
raise TypeError(
"regression_guided alpha must be a list of floats between 0 and 1"
)
return func(
self,
data,
Expand All @@ -157,6 +174,7 @@ def wrapper(
min_number_of_points,
max_number_of_iterations,
normalize_data,
regression_guided,
)

return wrapper
Expand Down
75 changes: 72 additions & 3 deletions bluemath_tk/datamining/kma.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import List, Tuple
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.linear_model import LinearRegression

from ..core.decorators import validate_data_kma
from ._base_datamining import BaseClustering
Expand Down Expand Up @@ -132,6 +133,7 @@ def __init__(
self.normalized_centroids: pd.DataFrame = pd.DataFrame()
self.centroid_real_indices: np.array = np.array([])
self.is_fitted: bool = False
self.regression_guided: dict = {}

@property
def kma(self) -> KMeans:
Expand Down Expand Up @@ -176,6 +178,57 @@ def data_to_fit(self) -> pd.DataFrame:

return self._data_to_fit

@staticmethod
def add_regression_guided(
data: pd.DataFrame, vars: List[str], alpha: List[float]
) -> pd.DataFrame:
"""
Calculate regression-guided variables.

Parameters
----------
data : pd.DataFrame
The data to fit the K-Means algorithm.
vars : List[str]
The variables to use for regression-guided clustering.
alpha : List[float]
The alpha values to use for regression-guided clustering.

Returns
-------
pd.DataFrame
The data with the regression-guided variables.
"""

# Stack guiding variables into (time, n_vars) array
X = data.drop(columns=vars)
Y = np.stack([data[var].values for var in vars], axis=1)

# Normalize input features
X_std = X.std().replace(0, 1)
X_norm = X / X_std

# Add intercept column to input
X_design = np.column_stack((np.ones(len(X)), X_norm.values))

# Normalize guiding targets
Y_std = np.nanstd(Y, axis=0)
Y_std[Y_std == 0] = 1.0

# Fit regression model to predict guiding vars from input
model = LinearRegression(fit_intercept=False).fit(X_design, Y / Y_std)
Y_pred = model.predict(X_design) * Y_std # De-normalize predictions

# Weight columns by input alpha
X_weight = 1.0 - np.sum(alpha)
X_scaled = X_weight * X.values
Y_scaled = Y_pred * alpha

df = pd.DataFrame(np.hstack([X_scaled, Y_scaled]), index=data.index)
df.columns = list(X.columns) + vars

return df

@validate_data_kma
def fit(
self,
Expand All @@ -185,6 +238,7 @@ def fit(
min_number_of_points: int = None,
max_number_of_iterations: int = 10,
normalize_data: bool = False,
regression_guided: Dict[str, List] = {},
) -> None:
"""
Fit the K-Means algorithm to the provided data.
Expand All @@ -193,8 +247,7 @@ def fit(
provided dataframe and custom scale factor.
It normalizes the data, and returns the calculated centroids.

TODO: Implement KMA regression guided with variable.
Add option to force KMA initialization with MDA centroids.
TODO: Add option to force KMA initialization with MDA centroids.

Parameters
----------
Expand All @@ -215,8 +268,18 @@ def fit(
Default is 10.
normalize_data : bool, optional
A flag to normalize the data. Default is False.
regression_guided: dict, optional
A dictionary specifying regression-guided clustering variables and relative weights.
Example: {"vars":["Fe"],"alpha":[0.6]}. Default is {}.
"""

if regression_guided:
data = self.add_regression_guided(
data=data,
vars=regression_guided.get("vars", None),
alpha=regression_guided.get("alpha", None),
)

super().fit(
data=data,
directional_variables=directional_variables,
Expand Down Expand Up @@ -255,6 +318,7 @@ def fit(
self.centroids = self.denormalize(
normalized_data=self.normalized_centroids, scale_factor=self.scale_factor
)

for directional_variable in self.directional_variables:
self.centroids[directional_variable] = self.get_degrees_from_uv(
xu=self.centroids[f"{directional_variable}_u"].values,
Expand Down Expand Up @@ -299,6 +363,7 @@ def fit_predict(
min_number_of_points: int = None,
max_number_of_iterations: int = 10,
normalize_data: bool = False,
regression_guided: Dict[str, List] = {},
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Fit the K-Means algorithm to the provided data and predict the nearest centroid
Expand All @@ -323,6 +388,9 @@ def fit_predict(
Default is 10.
normalize_data : bool, optional
A flag to normalize the data. Default is False.
regression_guided: dict, optional
A dictionary specifying regression-guided clustering variables and relative weights.
Example: {"vars":["Fe"],"alpha":[0.6]}. Default is {}.

Returns
-------
Expand All @@ -338,6 +406,7 @@ def fit_predict(
min_number_of_points=min_number_of_points,
max_number_of_iterations=max_number_of_iterations,
normalize_data=normalize_data,
regression_guided=regression_guided,
)

return self.predict(data=data)
20 changes: 17 additions & 3 deletions bluemath_tk/predictor/xwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ def fit(
------
XWTError
If the data is not PCA formatted.

TODO: Standarize PCs by first PC variance.
pca.pcs_df / pca.pcs.stds.isel(n_component=0).values ??
"""

# Make a copy of the data to avoid modifying the original dataset
Expand All @@ -344,10 +347,21 @@ def fit(

kma: KMA = self.steps.get("kma")
self.num_clusters = kma.num_clusters
# TODO: standarize PCs by first PC variance
# pca.pcs_df / pca.pcs.stds.isel(n_component=0).values

data_to_kma = pca.pcs_df.copy()

if "regression_guided" in fit_params.get("kma", {}):
guiding_vars = fit_params["kma"]["regression_guided"].get("vars", [])

if guiding_vars:
guiding_data = pd.DataFrame(
{var: data[var].values for var in guiding_vars},
index=data.time.values,
)
data_to_kma = pd.concat([data_to_kma, guiding_data], axis=1)

kma_bmus, _kma_bmus_df = kma.fit_predict(
data=pca.pcs_df,
data=data_to_kma,
**fit_params.get("kma", {}),
)
self.kma_bmus = kma_bmus + 1 # TODO: Check if this is necessary!!!
Expand Down
15 changes: 15 additions & 0 deletions tests/datamining/test_kma.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import unittest

import numpy as np
import pandas as pd

from bluemath_tk.datamining.kma import KMA


Expand Down Expand Up @@ -46,6 +48,19 @@ def test_fit_predict(self):
self.assertIsInstance(predicted_labels_df, pd.DataFrame)
self.assertEqual(predicted_labels_df.shape[0], 1000)

def test_add_regression_guided(self):
data = self.df.copy()
data["Fe"] = data["Hs"] ** 2 * data["Tp"]
predicted_labels, predicted_labels_df = self.kma.fit_predict(
data=data,
directional_variables=["Dir"],
regression_guided={"vars": ["Fe"], "alpha": [0.6]},
)
self.assertIsInstance(predicted_labels, pd.DataFrame)
self.assertEqual(len(predicted_labels), 1000)
self.assertIsInstance(predicted_labels_df, pd.DataFrame)
self.assertEqual(predicted_labels_df.shape[0], 1000)


if __name__ == "__main__":
unittest.main()