-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprediction_handler.py
More file actions
182 lines (117 loc) · 5.5 KB
/
prediction_handler.py
File metadata and controls
182 lines (117 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""Utility Script for various predictive methods of TabPFN """
# Setup Imports
import pandas as pd
import numpy as np
from MultiLabelPFN.src.tabicl.prior.dataset import PriorDataset
import scipy
from sklearn.metrics import (
precision_recall_curve,
auc,
average_precision_score,
roc_curve
)
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
def remove_empty_columns(arr):
df = pd.DataFrame(arr)
df.replace(0, np.nan, inplace=True)
df.replace(0.0, np.nan, inplace=True)
df.dropna(how='all', axis=1, inplace=True)
df.replace(np.nan, 0.0, inplace=True)
return np.array(df)
def calc_threshold(classifier, prior_config):
dataset = PriorDataset(**prior_config)
batch_X, batch_y, _, _, _ = dataset.get_batch()
batch_thresholds = []
for d in range(batch_X.shape[0]):
ds_X = batch_X[d]
ds_y = batch_y[d]
ds_X = remove_empty_columns(ds_X)
ds_y = remove_empty_columns(ds_y)
X_train, X_test, y_train, y_test = train_test_split(ds_X, ds_y, test_size=0.33, random_state=42)
trained_model_pfn = classifier.fit(X_train, y_train)
y_pred_proba = np.array(trained_model_pfn.predict_proba(X_test))
#print(y_pred_proba)
if len(y_pred_proba.shape) == 3:
y_pred_proba = y_pred_proba[...,1].T
thresholds = []
for r in range(y_test.shape[1]):
fpr, tpr, thrs = roc_curve(y_test[:,r], y_pred_proba[:,r])
opti = tpr - fpr
thr_i = np.argmax(opti)
if not thrs[thr_i] == np.inf:
thresholds.append(thrs[thr_i])
batch_thresholds.append(np.mean(thresholds))
overall_mean_threshold = np.mean(batch_thresholds)
return overall_mean_threshold
"""
Cross validation methods
"""
def cv_predict(model, X, Y, cv, mode="single", method="predict"):
"""
Cross validation returning prediction probabilities of all k folds and storing them
:param model: model used for the cross validation
:param X: feature dataframe
:param Y: labels dataframe
:param cv: cross validator
:param mode: "single" for cross validation of a single model or "ensemble" for cross validation of an ensemble method
:param method: "predict" for return of labels or "predict_proba" for the return of prediction probabilities
:return: y_pred: predicted probabilities or labels of examples in X in shape (T, L)
y_true: true labels of examples in X in shape (T, L)
"""
if mode not in ["single", "ensemble"]:
raise Exception("Mode not valid. Please Select 'single' for normal estimators or 'ensemble' for ensembles")
if method not in ["predict", "predict_proba"]:
raise Exception("Method not valid. Please Select 'predict' for label prediction or 'predict_proba' for prediction probabilities")
if method == "predict":
if mode == "single":
y_pred = np.zeros((X.shape[0], Y.shape[1]))
y_true = np.zeros((X.shape[0], Y.shape[1]))# (n_samples, n_labels, n_classes)
elif mode == "ensemble":
y_pred = np.zeros((model.n_jobs, X.shape[0], Y.shape[1])) # (n_samples, n_labels, n_classes)
y_true = np.zeros((model.n_jobs, X.shape[0], Y.shape[1])) # (n_samples, n_labels, n_classes)
#print(method)
elif method == "predict_proba":
if mode == "single":
y_pred = np.zeros((X.shape[0], Y.shape[1], 2))
y_true = np.zeros((X.shape[0], Y.shape[1])) # (n_samples, n_labels, n_classes)
elif mode == "ensemble":
y_pred = np.zeros((model.n_jobs, X.shape[0], Y.shape[1], 2)) # (n_jobs, n_samples, n_labels, n_classes)
y_true = np.zeros((model.n_jobs, X.shape[0], Y.shape[1])) # (n_jobs, n_samples, n_labels, n_classes)
X_arr = np.array(X)
Y_arr = np.array(Y)
counter = 0
for train_idx, test_idx in cv.split(X):
counter += 1
print("CV {}".format(counter))
model.fit(pd.DataFrame(X_arr[train_idx]), pd.DataFrame(Y_arr[train_idx]))
if mode == "single":
if method == "predict":
y_pred_tmp = model.predict(X_arr[test_idx])
if isinstance(y_pred_tmp, scipy.sparse.spmatrix):
y_pred_tmp = y_pred_tmp.todense()
else:
y_pred_tmp = model.predict_proba(X_arr[test_idx])
y_pred_tmp = np.array(y_pred_tmp)
if method == "predict_proba" and len(y_pred_tmp.shape) == 2:
y_pred_tmp_tmp = np.zeros((y_pred_tmp.shape[0], y_pred_tmp.shape[1], 2))
y_pred_tmp_tmp[:,:,1] = y_pred_tmp
y_pred_tmp = y_pred_tmp_tmp
if y_pred[test_idx].shape != np.array(y_pred_tmp).shape:
y_pred[test_idx] = np.stack(y_pred_tmp, axis=1)
else:
y_pred[test_idx] = y_pred_tmp
y_true[test_idx] = Y_arr[test_idx]
else:
if method == "predict":
y_pred_tmp = model.predict(X_arr[test_idx])
else:
y_pred_tmp = model.predict_proba(X_arr[test_idx])
if y_pred[:,test_idx].shape != np.array(y_pred_tmp).shape:
y_pred[:, test_idx] = np.stack(y_pred_tmp, axis=1)
else:
y_pred[:,test_idx] = model.predict_proba(X_arr[test_idx])
y_true[:,test_idx] = Y_arr[test_idx]
#if method == "predict_proba":
# y_pred = y_pred[..., 1]
return y_pred, y_true