Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 95 additions & 42 deletions flexml/_model_tuner.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import numpy as np
import pandas as pd
import optuna
from time import time
import joblib
from joblib.parallel import BatchCompletionCallBack
from contextlib import contextmanager
from tqdm import tqdm
from typing import Optional, Union
from time import time
import numpy as np
import pandas as pd
from sklearn.model_selection import ParameterGrid, GridSearchCV, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.base import clone
import optuna
from flexml.config import TUNING_METRIC_TRANSFORMATIONS
from flexml.structures.custom_score import CustomScore
from flexml.logger import get_logger
from flexml.helpers import evaluate_model_perf
from copy import deepcopy
from tqdm import tqdm


class TqdmBatchCompletionCallback(BatchCompletionCallBack):
Expand Down Expand Up @@ -233,8 +233,8 @@ def grid_search(
self,
pipeline: Pipeline,
param_grid: dict,
eval_metric: str,
cv: list,
eval_metric: Union[str, CustomScore],
cv: list,
n_jobs: int = -1,
verbose: int = 0
) -> Optional[dict]:
Expand All @@ -249,7 +249,7 @@ def grid_search(
param_grid : dict
The dictionary that contains the hyperparameters and their possible values

eval_metric : str
eval_metric : str or CustomScore
The evaluation metric that will be used to evaluate the model. It can be one of the following:

* 'R2' for R^2 score
Expand All @@ -270,6 +270,8 @@ def grid_search(

* 'F1 Score' for F1 score

* Or a custom CustomScore object

cv : list of tuples
A list of (train_idx, test_idx) tuples where each tuple contains numpy arrays of indices
for the training and test sets for that fold. For example:
Expand Down Expand Up @@ -309,6 +311,18 @@ def grid_search(
model_stats = self._setup_tuning("GridSearchCV", pipeline, param_grid, n_iter=None, n_jobs=n_jobs)
param_grid = model_stats['tuning_param_grid']

# Handle custom metrics
is_custom_metric = isinstance(eval_metric, CustomScore)
if is_custom_metric:
eval_metric_name = eval_metric.name
custom_scorer = eval_metric.get_scorer()
scoring = {eval_metric_name: custom_scorer}
refit_metric = eval_metric_name
else:
eval_metric_name = eval_metric
scoring = self.eval_metrics_in_tuning_format
refit_metric = eval_metric

try:
t_start = time()

Expand All @@ -321,8 +335,8 @@ def grid_search(
search = GridSearchCV(
pipeline,
param_grid,
scoring=self.eval_metrics_in_tuning_format,
refit=eval_metric,
scoring=scoring,
refit=refit_metric,
cv=cv,
n_jobs=n_jobs,
verbose=verbose
Expand All @@ -339,21 +353,25 @@ def grid_search(
t_end = time()
time_taken = round(t_end - t_start, 2)

scores = {
metric: (
-search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
if metric in self.reverse_signed_eval_metrics else
search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
)
for metric in list(self.eval_metrics_in_tuning_format.keys())
}
if is_custom_metric:
mean_score = search_result.cv_results_[f'mean_test_{eval_metric_name}'][search_result.best_index_]
scores = {eval_metric_name: round(mean_score, 6)}
else:
scores = {
metric: (
-search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
if metric in self.reverse_signed_eval_metrics else
search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
)
for metric in list(self.eval_metrics_in_tuning_format.keys())
}
mean_score = search_result.cv_results_[f'mean_test_{eval_metric_name}'][search_result.best_index_]

model_stats['tuned_model'] = search_result.best_estimator_.named_steps['model']
mean_score = search_result.cv_results_[f'mean_test_{eval_metric}'][search_result.best_index_]
model_stats['tuned_model_score'] = round(mean_score, 6)
model_stats['model_perf'] = scores
model_stats['time_taken_sec'] = time_taken
model_stats['tuned_model_evaluation_metric'] = eval_metric
model_stats['tuned_model_evaluation_metric'] = eval_metric_name
return model_stats
except Exception as e:
self.logger.error(f"Error while tuning the model with GridSearchCV, Error: {e}")
Expand All @@ -363,7 +381,7 @@ def randomized_search(
self,
pipeline: Pipeline,
param_grid: dict,
eval_metric: str,
eval_metric: Union[str, CustomScore],
cv: list,
n_iter: int = 10,
n_jobs: int = -1,
Expand All @@ -380,7 +398,7 @@ def randomized_search(
param_grid : dict
The dictionary that contains the hyperparameters and their possible values

eval_metric : str
eval_metric : str or CustomScore
The evaluation metric that will be used to evaluate the model. It can be one of the following:

* 'R2' for R^2 score
Expand All @@ -401,6 +419,8 @@ def randomized_search(

* 'F1 Score' for F1 score

* Or a custom CustomScore object

cv : list of tuples
A list of (train_idx, test_idx) tuples where each tuple contains numpy arrays of indices
for the training and test sets for that fold. For example:
Expand Down Expand Up @@ -432,6 +452,18 @@ def randomized_search(
model_stats = self._setup_tuning("randomized_search", pipeline, param_grid, n_iter=n_iter, n_jobs=n_jobs)
param_grid = model_stats['tuning_param_grid']

# Handle custom metrics
is_custom_metric = isinstance(eval_metric, CustomScore)
if is_custom_metric:
eval_metric_name = eval_metric.name
custom_scorer = eval_metric.get_scorer()
scoring = {eval_metric_name: custom_scorer}
refit_metric = eval_metric_name
else:
eval_metric_name = eval_metric
scoring = self.eval_metrics_in_tuning_format
refit_metric = eval_metric

t_start = time()

# Calculate total fits
Expand All @@ -443,8 +475,8 @@ def randomized_search(
estimator=pipeline,
param_distributions=param_grid,
n_iter=n_iter,
scoring=self.eval_metrics_in_tuning_format,
refit=eval_metric,
scoring=scoring,
refit=refit_metric,
cv=cv,
n_jobs=n_jobs,
verbose=verbose
Expand All @@ -461,29 +493,33 @@ def randomized_search(
t_end = time()
time_taken = round(t_end - t_start, 2)

scores = {
metric: (
-search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
if metric in self.reverse_signed_eval_metrics else
search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
)
for metric in list(self.eval_metrics_in_tuning_format.keys())
}
if is_custom_metric:
mean_score = search_result.cv_results_[f'mean_test_{eval_metric_name}'][search_result.best_index_]
scores = {eval_metric_name: round(mean_score, 6)}
else:
scores = {
metric: (
-search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
if metric in self.reverse_signed_eval_metrics else
search_result.cv_results_[f'mean_test_{metric}'][search_result.best_index_]
)
for metric in list(self.eval_metrics_in_tuning_format.keys())
}
mean_score = search_result.cv_results_[f'mean_test_{eval_metric_name}'][search_result.best_index_]

model_stats['tuned_model'] = search_result.best_estimator_.named_steps['model']
mean_score = search_result.cv_results_[f'mean_test_{eval_metric}'][search_result.best_index_]
model_stats['tuned_model_score'] = round(mean_score, 6)
model_stats['model_perf'] = scores
model_stats['time_taken_sec'] = time_taken
model_stats['tuned_model_evaluation_metric'] = eval_metric
model_stats['tuned_model_evaluation_metric'] = eval_metric_name
return model_stats


def optuna_search(
self,
pipeline: Pipeline,
param_grid: dict,
eval_metric: str,
eval_metric: Union[str, CustomScore],
cv: list,
n_iter: int = 10,
timeout: Optional[int] = None,
Expand All @@ -501,7 +537,7 @@ def optuna_search(
param_grid : dict
The dictionary that contains the hyperparameters and their possible values

eval_metric : str
eval_metric : str or CustomScore
The evaluation metric that will be used to evaluate the model. It can be one of the following:

* 'R2' for R^2 score
Expand All @@ -522,6 +558,8 @@ def optuna_search(

* 'F1 Score' for F1 score

* Or a custom CustomScore object

cv : list of tuples
A list of (train_idx, test_idx) tuples where each tuple contains numpy arrays of indices
for the training and test sets for that fold. For example:
Expand Down Expand Up @@ -571,6 +609,15 @@ def optuna_search(
model_stats = self._setup_tuning("optuna", pipeline, param_grid, n_iter=n_iter, n_jobs=n_jobs, prefix_param_grid_flag=False)
param_grid = model_stats['tuning_param_grid']

# Handle custom metrics
is_custom_metric = isinstance(eval_metric, CustomScore)
if is_custom_metric:
eval_metric_name = eval_metric.name
study_direction = eval_metric.direction
else:
eval_metric_name = eval_metric
study_direction = "maximize" if eval_metric in ['R2', 'Accuracy', 'Precision', 'Recall', 'F1 Score', 'ROC-AUC'] else "minimize"

# Set verbosity levels
if verbose == 0:
optuna.logging.set_verbosity(optuna.logging.CRITICAL)
Expand All @@ -583,8 +630,6 @@ def optuna_search(
elif verbose == 4:
optuna.logging.set_verbosity(optuna.logging.DEBUG)

study_direction = "maximize" if eval_metric in ['R2', 'Accuracy', 'Precision', 'Recall', 'F1 Score', 'ROC-AUC'] else "minimize"

def objective(trial):
# Generate parameters for the trial
params = pipeline.named_steps['model'].get_params()
Expand Down Expand Up @@ -617,17 +662,25 @@ def objective(trial):

new_pipeline.fit(X_train, y_train)

# Get predictions based on whether we need probabilities or labels
if self.ml_problem_type == "Classification" and hasattr(new_pipeline, 'predict_proba'):
y_pred = new_pipeline.predict_proba(X_test)
else:
y_pred = new_pipeline.predict(X_test)

# Evaluate performance
scores.append(evaluate_model_perf(self.ml_problem_type, y_test, y_pred))
if is_custom_metric:
scores.append(evaluate_model_perf(
self.ml_problem_type,
y_test,
y_pred,
custom_score=eval_metric
))
else:
scores.append(evaluate_model_perf(self.ml_problem_type, y_test, y_pred))

# Calculate the mean score across all folds
avg_metrics = {k: np.mean([m[k] if m[k] is not None else -1 for m in scores]) for k in scores[0]}
mean_score = avg_metrics.get(eval_metric, float('inf'))
mean_score = avg_metrics.get(eval_metric_name, float('inf'))

# Update the best score and model
if model_stats['tuned_model_score'] is None or (study_direction == "maximize" and mean_score > model_stats['tuned_model_score']) or (study_direction == "minimize" and mean_score < model_stats['tuned_model_score']):
Expand Down
Loading