Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
### Fixed
- (General) Automatic Credentials Discovery-based credential retrieval for
Google cloud storage (GCS).
- (`sklearn`) Temporary directory race condition in estimators.

## 11.0.0.0 - 2025-12-19

Expand Down
21 changes: 0 additions & 21 deletions khiops/sklearn/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,6 @@ def fit(self, X, y=None, **kwargs):

# Create temporary directory and tables
computation_dir = self._create_computation_dir("fit")
initial_runner_temp_dir = kh.get_runner().root_temp_dir
kh.get_runner().root_temp_dir = computation_dir

# Create the dataset, fit the model and reset in case of any failure
try:
Expand All @@ -377,7 +375,6 @@ def fit(self, X, y=None, **kwargs):
# Cleanup and restore the runner's temporary dir
finally:
self._cleanup_computation_dir(computation_dir)
kh.get_runner().root_temp_dir = initial_runner_temp_dir

# If on "fitted" state then:
# - self.model_ must be a DictionaryDomain
Expand Down Expand Up @@ -963,15 +960,13 @@ def _simplify(
computation_dir = self._create_computation_dir("simplify")
output_dir = self._get_output_dir(computation_dir)
simplify_log_file_path = fs.get_child_path(output_dir, "khiops_simplify_cc.log")
initial_runner_temp_dir = kh.get_runner().root_temp_dir
full_coclustering_file_path = fs.get_child_path(
output_dir, "FullCoclustering.khcj"
)
simplified_coclustering_file_path = fs.get_child_path(
output_dir, "Coclustering.khcj"
)
self.model_report_.write_khiops_json_file(full_coclustering_file_path)
kh.get_runner().root_temp_dir = computation_dir
try:
# - simplify_coclustering, then
# - prepare_coclustering_deployment
Expand Down Expand Up @@ -1040,7 +1035,6 @@ def _simplify(
)
finally:
self._cleanup_computation_dir(computation_dir)
kh.get_runner().root_temp_dir = initial_runner_temp_dir
return simplified_cc

def simplify(
Expand Down Expand Up @@ -1101,8 +1095,6 @@ def predict(self, X):
"""
# Create temporary directory
computation_dir = self._create_computation_dir("predict")
initial_runner_temp_dir = kh.get_runner().root_temp_dir
kh.get_runner().root_temp_dir = computation_dir

# Create the input dataset
ds = Dataset(X)
Expand All @@ -1119,7 +1111,6 @@ def predict(self, X):
# Cleanup and restore the runner's temporary dir
finally:
self._cleanup_computation_dir(computation_dir)
kh.get_runner().root_temp_dir = initial_runner_temp_dir

# Transform to numpy.array
y_pred = y_pred.to_numpy()
Expand Down Expand Up @@ -1557,8 +1548,6 @@ def predict(self, X):
"""
# Create temporary directory
computation_dir = self._create_computation_dir("predict")
initial_runner_temp_dir = kh.get_runner().root_temp_dir
kh.get_runner().root_temp_dir = computation_dir

try:
# Create the input dataset
Expand All @@ -1575,10 +1564,6 @@ def predict(self, X):
# Cleanup and restore the runner's temporary dir
finally:
self._cleanup_computation_dir(computation_dir)
kh.get_runner().root_temp_dir = initial_runner_temp_dir

# Restore the runner's temporary dir
kh.get_runner().root_temp_dir = initial_runner_temp_dir

# Return pd.Series in the monotable + pandas case
assert isinstance(y_pred, (str, pd.DataFrame)), "Expected str or DataFrame"
Expand Down Expand Up @@ -1997,8 +1982,6 @@ def predict_proba(self, X):
"""
# Create temporary directory and tables
computation_dir = self._create_computation_dir("predict_proba")
initial_runner_temp_dir = kh.get_runner().root_temp_dir
kh.get_runner().root_temp_dir = computation_dir

# Create the input dataset

Expand All @@ -2015,7 +1998,6 @@ def predict_proba(self, X):
# Cleanup and restore the runner's temporary dir
finally:
self._cleanup_computation_dir(computation_dir)
kh.get_runner().root_temp_dir = initial_runner_temp_dir

# - Reorder the columns to that of self.classes_
# - Transform to np.ndarray
Expand Down Expand Up @@ -2649,8 +2631,6 @@ def transform(self, X):
"""
# Create temporary directory
computation_dir = self._create_computation_dir("transform")
initial_runner_temp_dir = kh.get_runner().root_temp_dir
kh.get_runner().root_temp_dir = computation_dir

# Create and transform the dataset
try:
Expand All @@ -2665,7 +2645,6 @@ def transform(self, X):
# Cleanup and restore the runner's temporary dir
finally:
self._cleanup_computation_dir(computation_dir)
kh.get_runner().root_temp_dir = initial_runner_temp_dir
return X_transformed.to_numpy(copy=False)

def _transform_prepare_deployment_for_transform(self, ds):
Expand Down
113 changes: 113 additions & 0 deletions tests/test_sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
######################################################################################
"""Tests parameter transfer between Khiops sklearn and core APIs"""
import contextlib
import io
import os
import shutil
import unittest
import warnings
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from itertools import product

import numpy as np
import pandas as pd
from sklearn.exceptions import NotFittedError
from sklearn.utils.estimator_checks import check_estimator
from sklearn.utils.validation import check_is_fitted

import khiops.core as kh
import khiops.core.internals.filesystems as fs
from khiops.core.internals.runner import KhiopsLocalRunner
from khiops.sklearn.estimators import (
KhiopsClassifier,
KhiopsCoclustering,
Expand Down Expand Up @@ -1773,6 +1779,27 @@ def test_sklearn_check_estimator(self):
print("Done")


def no_mpi(func):
"""Disable MPI (in setting proc number to 1) to save resources"""

def inner(self):
# set up a single cpu runner
initial_runner = kh.get_runner()
initial_proc_number = os.environ["KHIOPS_PROC_NUMBER"]
os.environ["KHIOPS_PROC_NUMBER"] = "1"
single_cpu_runner = KhiopsLocalRunner()
kh.set_runner(single_cpu_runner)

# call the initial test function
func(self)

# restore the runner
os.environ["KHIOPS_PROC_NUMBER"] = initial_proc_number
kh.set_runner(initial_runner)

return inner


class KhiopsSklearnVariousTests(unittest.TestCase):
"""Miscelanous sklearn classes tests"""

Expand Down Expand Up @@ -1827,3 +1854,89 @@ def test_export_operations_raise_when_not_fitted(self):
with self.subTest(export_operation=export_operation, estimator=estimator):
with self.assertRaises(NotFittedError):
getattr(estimator, export_operation)("report.khj")

@no_mpi
def test_concurrency_safe_operations(self):
"""Ensure no race condition occurs when running concurrent operations"""

# Define all the function calls that will be submitted to the threads
def predict_func(clf, X):
return clf.predict(X)

def predict_proba_func(clf, X):
return clf.predict_proba(X)

def encoder_fit_transform_func(khe, X, y):
return khe.fit_transform(X, y)

def estimator_fit_func(khcc, X, id_column):
return khcc.fit(X, id_column=id_column)

def coclustering_simplify_func(khcc):
return khcc.simplify()

def coclustering_predict_func(khcc, X):
return khcc.predict(X)

clf = KhiopsClassifier(n_trees=0)
adult_df = pd.read_csv(
f"{kh.get_samples_dir()}/Adult/Adult.txt", sep="\t", header=0
)
X = adult_df.drop("class", axis=1)
clf.fit(X, adult_df["class"])

# Test `predict`, `predict_proba` of `KhiopsPredictor` and its children
# (`KhiopsClassifier` and `KhiopsRegressor`)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(predict_func, clf, X): i for i in range(5)}
for future in as_completed(futures):
print(future.result())
futures = {executor.submit(predict_proba_func, clf, X): i for i in range(5)}
for future in as_completed(futures):
print(future.result())

# Test `transform` of `KhiopsEncoder`
khe = KhiopsEncoder()

y = adult_df["class"]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(encoder_fit_transform_func, khe, X, y): i
for i in range(5)
}
for future in as_completed(futures):
print(future.result())

# Test `fit`, `simplify` and `predict` of
# `KhiopsCoclustering` and `KhiopsEstimator`
splice_data_dir = fs.get_child_path(
kh.get_runner().samples_dir, "SpliceJunction"
)
splice_data_file_path = fs.get_child_path(
splice_data_dir, "SpliceJunctionDNA.txt"
)

# Read the splice junction secondary datatable
with io.BytesIO(fs.read(splice_data_file_path)) as splice_data_file:
splice_df = pd.read_csv(splice_data_file, sep="\t")

khcc = KhiopsCoclustering()

with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(estimator_fit_func, khcc, splice_df, "SampleId"): i
for i in range(5)
}
for future in as_completed(futures):
print(future.result())
futures = {
executor.submit(coclustering_simplify_func, khcc): i for i in range(5)
}
for future in as_completed(futures):
print(future.result())
futures = {
executor.submit(coclustering_predict_func, khcc, splice_df): i
for i in range(5)
}
for future in as_completed(futures):
print(future.result())
Loading