diff --git a/examples/lightning_coxph.py b/examples/lightning_coxph.py new file mode 100644 index 0000000..0368b0e --- /dev/null +++ b/examples/lightning_coxph.py @@ -0,0 +1,213 @@ +''' +A minimal example of how to fit a cox_PH model with pytorch lightning independent of torchtuples +Original author: Rohan Shad @rohanshad +''' +from typing import Tuple +import numpy as np +import pandas as pd +import torch + +import pytorch_lightning as pl +from torch import nn +from torch.utils.data import DataLoader, TensorDataset + +from pycox.datasets import metabric +from pycox.evaluation import EvalSurv +from pycox.models import coxph + +# For preprocessing +from sklearn.preprocessing import StandardScaler +from sklearn_pandas import DataFrameMapper + +# Lightning Dataset Module + +class MetaBrick(pl.LightningDataModule): + ''' + Prepares metabric dataset for either discrete time or cox proportional models. + batch_size (int) - batch size, default = 256 + num_durations (int) - number of timepoints to discretize data into (for discrete time models only), default = 10 + num_workers (int) - number of cpu workers to load data, default = 0 + discretize (bool) - Whether or not to discretize data (set as True only for discrete time models), default = False + ''' + def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0, discretize: bool = False): + super().__init__() + self.batch_size = batch_size + self.num_durations = num_durations + self.num_workers = num_workers + self.discretize = discretize + + def setup(self, stage=None): + ''' + Get the METABRICK dataset split into a training dataframe and a testing dataframe. + Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. + ''' + + # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + df_val = df_train.sample(frac=0.2) + df_train = df_train.drop(df_val.index) + + self.x_train, self.x_val, self.x_test = self._preprocess_features(df_train, df_val, df_test) + + if stage == 'fit' or stage is None: + # Setup targets (duration, event) + self.y_train = torch.from_numpy(np.concatenate(self._get_target(df_train), axis=1)) + self.y_val = torch.from_numpy(np.concatenate(self._get_target(df_val), axis=1)) + + # Create training and validation datasets + self.train_set = TensorDataset(self.x_train, self.y_train) + self.val_set = TensorDataset(self.x_val, self.y_val) + + # Input and output dimensions for building net + self.in_dims = self.x_train.shape[1] + self.out_dims = 1 + + if stage == 'test' or stage is None: + # Returns correctly preprocessed target y_test {torch.Tensor} and entire df_test {pd.DataFrame} for metric calculations + self.y_test = torch.from_numpy(np.concatenate(self._get_target(df_test), axis=1)) + self.df_test = df_test + + + def train_dataloader(self): + ''' + Build training dataloader + num_workers set to 0 by default because of some thread issue + ''' + train_loader = DataLoader( + dataset=self.train_set, + batch_size=self.batch_size, + shuffle=True, + num_workers=self.num_workers + ) + return train_loader + + @classmethod + def _preprocess_features(cls, df_train: pd.DataFrame, df_val: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + ''' + Preprocess the covariates of the training, validation, and test set and return a tensor for the + taining covariates and test covariates. + ''' + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_val = x_mapper.transform(df_val).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + + return torch.from_numpy(x_train), torch.from_numpy(x_val), torch.from_numpy(x_test) + + def _get_target(cls, df : pd.DataFrame) -> np.ndarray: + ''' + Takes pandas datframe and converts the duration, event targets into np.arrays + ''' + duration = df['duration'].to_numpy().reshape(len(df['duration']),1) + event = df['event'].to_numpy().reshape(len(df['event']),1) + + return duration, event + +# Survival model class + +class SurvModel(pl.LightningModule): + ''' + Defines model, optimizers, forward step, and training step. + Define validation step as def validation_step if needed + Configured to use CoxPH loss from coxph.CoxPHLoss() + ''' + + def __init__(self, lr, in_features, out_features): + super().__init__() + + self.save_hyperparameters() + self.lr = lr + self.in_features = in_features + self.out_features = out_features + + # Define Model Here (in this case MLP) + self.net = nn.Sequential( + nn.Linear(self.in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, self.out_features), + ) + + # Define loss function: + self.loss_func = coxph.CoxPHLoss() + + def forward(self, x): + batch_size, data = x.size() + x = self.net(x) + return x + + # Training step and validation step usually defined, this dataset only had train + test so left out val. + def training_step(self, batch, batch_idx): + x, target = batch + output = self.forward(x) + + # target variable contains duration and event as a concatenated tensor + loss = self.loss_func(output, target[:,0], target[:,1]) + + # progress bar logging metrics (add custom metric definitions later if useful?) + self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) + return loss + + def configure_optimizers(self): + optimizer = torch.optim.Adam( + self.parameters(), + lr = self.lr + ) + return optimizer + + +def main(): + # Load Lightning DataModule + dat = MetaBrick(num_workers=0, discretize=False) + dat.setup('fit') #allows for input / output features to be configured in the model + + # Load Lightning Module + model = SurvModel(lr=1e-2, in_features=dat.in_dims, out_features=dat.out_dims) + trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) + + # Train model + trainer.fit(model,dat) + + + # Test model + dat = MetaBrick(num_workers=0, discretize=False) + trainer.test(model,datamodule=dat) + + # Load final model & freeze + print('Running in Evaluation Mode...') + model.freeze() + + # Setup test data (prepared from lightning module) + dat.setup('test') + + # Predict survival on testing dataset + output = model(dat.x_test) + + cum_haz = coxph.compute_cumulative_baseline_hazards(output, durations=dat.y_test[:,0], events=dat.y_test[:,1]) + surv = coxph.output2surv(output, cum_haz[0]) + + # The surv_df dataframe needs to be transposed to format: {rows}: duration, {cols}: each individual + surv_df = pd.DataFrame(surv.transpose(1,0).numpy()) + print(surv_df) + + ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values, censor_surv='km') + time_grid = np.linspace(dat.df_test.duration.values.min(), dat.df_test.duration.values.max(), 100) + + print(f"Concordance: {ev.concordance_td()}") + print(f"Brier Score: {ev.integrated_brier_score(time_grid)}") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/lightning_logistic_hazard.py b/examples/lightning_logistic_hazard.py index 7f5be3f..a1d4270 100644 --- a/examples/lightning_logistic_hazard.py +++ b/examples/lightning_logistic_hazard.py @@ -99,7 +99,6 @@ def _preprocess_features(cls, df_train: pd.DataFrame, df_test: pd.DataFrame) -> # Survival model class - class SurvModel(pl.LightningModule): ''' Defines model, optimizers, forward step, and training step. @@ -182,5 +181,4 @@ def main(): print(f"Concordance: {ev.concordance_td()}") if __name__ == '__main__': - main() - + main() \ No newline at end of file diff --git a/examples/torch_cox.py b/examples/torch_cox.py new file mode 100644 index 0000000..ce69113 --- /dev/null +++ b/examples/torch_cox.py @@ -0,0 +1,148 @@ +''' +A minimal example of how to fit a LogisticHazard model with a vanilla torch training loop. +The point of this example is to make it simple to use the LogisticHazard models in other frameworks +that are not based on torchtuples. +''' +from typing import Tuple + +import numpy as np +import pandas as pd + +import torch +from torch import nn +from torch.utils.data import DataLoader, TensorDataset + +from pycox.datasets import metabric +from pycox.evaluation import EvalSurv +from pycox.models import coxph + +# For preprocessing +from sklearn.preprocessing import StandardScaler +from sklearn_pandas import DataFrameMapper +import torchtuples as tt + +def get_metabrick_train_val_test() -> Tuple[pd.DataFrame]: + """Get the METABRICK dataset split into a trainin dataframe and a testing dataframe.""" + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + df_val = df_train.sample(frac=0.2) + df_train = df_train.drop(df_val.index) + return df_train, df_val, df_test + + +def preprocess_features(df_train: pd.DataFrame, df_val: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + """Preprocess the covariates of the training and test set and return a tensor for the + taining covariates and test covariates. + """ + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_val = x_mapper.transform(df_val).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + + return torch.from_numpy(x_train), torch.from_numpy(x_val), torch.from_numpy(x_test) + + +def make_mlp(in_features: int, out_features: int) -> nn.Module: + """Make a simple torch net""" + net = nn.Sequential( + nn.Linear(in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, out_features), + ) + return net + +def get_target(df : pd.DataFrame) -> np.ndarray: + ''' + Takes pandas datframe and converts the duration / event targets into np.array + ''' + duration = df['duration'].to_numpy().reshape(len(df['duration']),1) + event = df['event'].to_numpy().reshape(len(df['event']),1) + + return duration, event + +def main() -> None: + # Get the metabrick dataset split in a train and test set + #np.random.seed(1234) + #torch.manual_seed(123) + df_train, df_val, df_test = get_metabrick_train_val_test() + + # Preprocess features + x_train, x_val, x_test = preprocess_features(df_train, df_val, df_test) + y_train = torch.from_numpy(np.concatenate(get_target(df_train), axis=1)) + y_test = torch.from_numpy(np.concatenate(get_target(df_test), axis=1)) + + y_val = torch.from_numpy(np.array(get_target(df_val))) + + #Probably have to change this to something like test_target? + durations_test, events_test = get_target(df_test) + val = x_val, y_val + + # Make an MLP nerual network + in_features = x_train.shape[1] + out_features = 1 + net = make_mlp(in_features, out_features) + + batch_size = 256 + epochs = 20 + + train_dataset = TensorDataset(x_train, y_train) + train_dataloader = DataLoader(train_dataset, batch_size, shuffle=True) + + # if verbose: + # print('Durations and events in order:') + # print(y_train[:,0]) + # print(y_train[:,1] + + # Set optimizer and loss function (optimization criterion) + optimizer = torch.optim.Adam(net.parameters(), lr=0.01) + loss_func = coxph.CoxPHLoss() + for epoch in range(epochs): + running_loss = 0.0 + for i, data in enumerate(train_dataloader): + x, target = data + optimizer.zero_grad() + output = net(x) + loss = loss_func(output, target[:,0], target[:,1]) # need x, durations, events + loss.backward() + optimizer.step() + running_loss += loss.item() + print(f"epoch: {epoch} -- loss: {running_loss / i}") + + # Predict survival for the test set + # Set net in evaluation mode and turn off gradients + + + net.eval() + with torch.set_grad_enabled(False): + output = net(x_test) + #Cumulative Hazards Calculation + cum_haz = coxph.compute_cumulative_baseline_hazards(output, durations=y_test[:,0], events=y_test[:,1]) + + surv = coxph.output2surv(output, cum_haz[0]) + # The dataframe needs to be transposed to format: {rows}: duration, {cols}: each individual + surv_df = pd.DataFrame(surv.transpose(1,0).numpy()) + print(surv_df) + + # print the test set concordance index + ev = EvalSurv(surv_df, df_test.duration.values, df_test.event.values, censor_surv='km') + time_grid = np.linspace(df_test.duration.values.min(), df_test.duration.values.max(), 100) + + print(f"Concordance: {ev.concordance_td()}") + print(f"Brier Score: {ev.integrated_brier_score(time_grid)}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/pycox/datasets/from_kkbox.py b/pycox/datasets/from_kkbox.py index 7239f06..53d657e 100644 --- a/pycox/datasets/from_kkbox.py +++ b/pycox/datasets/from_kkbox.py @@ -10,7 +10,7 @@ class _DatasetKKBoxChurn(_DatasetLoader): This is the version of the data set presented by Kvamme et al. (2019) [1], but the preferred version is the `kkbox` version which included administrative censoring labels and and extra categorical variable. - Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), + Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), with credentials (https://github.com/Kaggle/kaggle-api). The data set contains churn information from KKBox, an Asian music streaming service. Churn is @@ -46,7 +46,7 @@ class _DatasetKKBoxChurn(_DatasetLoader): is_cancel: If the customer has canceled the subscription. Subscription cancellation does not imply the user has churned. A user may cancel service subscription due to change of service plans or - other reasons. + other reasons. city: City of customer. gender: @@ -87,7 +87,7 @@ def read_df(self, subset='train', log_trans=True): The survival data set contains no covariates, but can be useful for extending the dataset with more covariates from Kaggle. - + Keyword Arguments: subset {str} -- Which subset to use ('train', 'val', 'test'). Can also set 'survival' which will give df with survival information without @@ -106,13 +106,13 @@ def read_df(self, subset='train', log_trans=True): return pd.read_feather(path) else: raise ValueError(f"Need 'subset' to be 'train', 'val', or 'test'. Got {subset}") - + if not path.exists(): print(f""" The KKBox dataset not locally available. - If you want to download, call 'kkbox_v1.download_kkbox()', but note that + If you want to download, call 'kkbox_v1.download_kkbox()', but note that this might take around 10 min! - NOTE: You need Kaggle credentials! Follow instructions at + NOTE: You need Kaggle credentials! Follow instructions at https://github.com/Kaggle/kaggle-api#api-credentials """) return None @@ -130,9 +130,9 @@ def log_min_p(col, df): def download_kkbox(self): - """Download KKBox data set. + """Download KKBox data set. This is likely to take around 10 min!!! - NOTE: You need Kaggle credentials! Follow instructions at + NOTE: You need Kaggle credentials! Follow instructions at https://github.com/Kaggle/kaggle-api#api-credentials """ self._download() @@ -149,7 +149,7 @@ def _download(self): self._make_train_test_split() print('Cleaning up...') self._clean_up() - print("Done! You can now call `df = kkbox_v1.read_df()`.") + print("Done! You can now call `df = kkbox.read_df()`.") def _setup_download_dir(self): if self._path_dir.exists(): @@ -159,6 +159,7 @@ def _setup_download_dir(self): def _7z_from_kaggle(self): import subprocess + import py7zr try: import kaggle except OSError as e: @@ -175,8 +176,9 @@ def _7z_from_kaggle(self): path=self._path_dir, force=True) for file in files: print(f"Extracting '{file}'...") - subprocess.check_output(['7z', 'x', str(self._path_dir / (file + '.csv.7z')), - f"-o{self._path_dir}"]) + archive = py7zr.SevenZipFile(self._path_dir / f"{file}.csv.7z", mode="r") + archive.extractall(path=self._path_dir) + archive.close() print(f"Finished extracting '{file}'.") def _csv_to_feather_with_types(self): @@ -214,12 +216,12 @@ def _make_survival_data(self): def days_without_membership(df): diff = (df['next_trans_date'] - df['membership_expire_date']).dt.total_seconds() return diff / (60 * 60 * 24) - + trans = (trans .sort_values(['msno', 'transaction_date']) .assign(next_trans_date=(lambda x: x.groupby('msno')['transaction_date'].shift(-1))) .assign(churn30=lambda x: days_without_membership(x) > 30)) - + # Remove entries with membership_expire_date < transaction_date trans = trans.loc[lambda x: x['transaction_date'] <= x['membership_expire_date']] assert (trans.loc[lambda x: x['churn30']==True].groupby(['msno', 'transaction_date'])['msno'].count().max() == 1) @@ -229,10 +231,10 @@ def days_without_membership(df): .assign(max_trans_date=lambda x: x.groupby('msno')['transaction_date'].transform('max')) .assign(final_churn=(lambda x: (x['max_trans_date'] <= last_churn_date) & - (x['transaction_date'] == x['max_trans_date']) & + (x['transaction_date'] == x['max_trans_date']) & (x['membership_expire_date'] <= last_churn_date) ))) - + # Churn: From training set trans = (trans .merge(train, how='left', on='msno') @@ -240,14 +242,14 @@ def days_without_membership(df): .drop('is_churn', axis=1) .assign(train_churn=lambda x: (x['max_trans_date'] == x['transaction_date']) & x['train_churn']) .assign(churn=lambda x: x['train_churn'] | x['churn30'] | x['final_churn'])) - + # Split individuals on churn trans = (trans .join(trans .sort_values(['msno', 'transaction_date']) .groupby('msno')[['churn30', 'membership_expire_date']].shift(1) .rename(columns={'churn30': 'new_start', 'membership_expire_date': 'prev_mem_exp_date'}))) - + def number_of_new_starts(df): return (df .assign(new_start=lambda x: x['new_start'].astype('float')) @@ -255,7 +257,7 @@ def number_of_new_starts(df): .groupby('msno') ['new_start'].cumsum().fillna(0.) .astype('int')) - + def days_between_subs(df): diff = (df['transaction_date'] - df['prev_mem_exp_date']).dt diff = diff.total_seconds() / (60 * 60 * 24) @@ -266,22 +268,22 @@ def days_between_subs(df): trans = (trans .assign(n_prev_churns=lambda x: number_of_new_starts(x), days_between_subs=lambda x: days_between_subs(x))) - + # Set start times trans = (trans .assign(start_date=trans.groupby(['msno', 'n_prev_churns'])['transaction_date'].transform('min')) .assign(first_churn=lambda x: (x['n_prev_churns'] == 0) & (x['churn'] == True))) - + # Get only last transactions (per chrun) indivs = (trans .assign(censored=lambda x: x.groupby('msno')['churn'].transform('sum') == 0) - .assign(last_censored=(lambda x: + .assign(last_censored=(lambda x: (x['censored'] == True) & (x['transaction_date'] == x['max_trans_date']) )) .loc[lambda x: x['last_censored'] | x['churn']] .merge(members[['msno', 'registration_init_time']], how='left', on='msno')) - + def time_diff_days(df, last, first): return (df[last] - df[first]).dt.total_seconds() / (60 * 60 * 24) @@ -290,7 +292,7 @@ def time_diff_days(df, last, first): days_since_reg_init=lambda x: time_diff_days(x, 'start_date', 'registration_init_time'))) # When multiple transactions on last day, remove all but the last - indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] + indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] assert indivs.shape == indivs.drop_duplicates(['msno', 'transaction_date']).shape assert (indivs['churn'] != indivs['censored']).all() @@ -300,8 +302,8 @@ def time_diff_days(df, last, first): indivs = (indivs .assign(churn_type=lambda x: 1*x['churn30'] + 2*x['final_churn'] + 4*x['train_churn']) - .assign(churn_type=lambda x: - np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', + .assign(churn_type=lambda x: + np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', 'train_and_final', 'train_30_and_final'])[x['churn_type']]) .drop(dropcols, axis=1)) @@ -326,7 +328,7 @@ def _make_survival_covariates(self): individs = pd.read_feather(self.path_survival) members = pd.read_feather(self._path_dir / 'members.feather') trans = (individs - .merge(pd.read_feather(self._path_dir / 'transactions.feather'), + .merge(pd.read_feather(self._path_dir / 'transactions.feather'), how='left', left_on=['msno', 'start_date'], right_on=['msno', 'transaction_date']) .drop('transaction_date', axis=1) # same as start_date .drop_duplicates(['msno', 'start_date'], keep='last') # keep last transaction on start_date (by idx) @@ -338,7 +340,7 @@ def get_age_at_start(df): # Not important what the date is, though it is reasonalbe to use the last. age_diff = (fixed_date - df['start_date']).dt.total_seconds() / (60*60*24*365) return np.round(df['bd'] - age_diff) - + trans = (trans .merge(members.drop(['registration_init_time'], axis=1), how='left', on='msno') .assign(age_at_start=lambda x: get_age_at_start(x)) @@ -346,11 +348,11 @@ def get_age_at_start(df): .assign(strange_age=lambda x: (x['age_at_start'] <= 0) | (x['age_at_start'] >= 100), age_at_start=lambda x: x['age_at_start'].clip(lower=0, upper=100))) - # days_between_subs + # days_between_subs # There are None for (not new start), so we can just set them to zero, and we don't need to include another variable (as it allready exists). trans = trans.assign(days_between_subs=lambda x: x['days_between_subs'].fillna(0.)) - # days_since_reg_init + # days_since_reg_init # We remove negative entries, set Nans to -1, and add a categorical value for missing. pd.testing.assert_frame_equal(trans.loc[lambda x: x['days_since_reg_init'].isnull()], trans.loc[lambda x: x['age_at_start'].isnull()]) @@ -361,7 +363,7 @@ def get_age_at_start(df): .assign(nan_days_since_reg_init=lambda x: x['days_since_reg_init'].isnull()) .assign(days_since_reg_init=lambda x: x['days_since_reg_init'].fillna(-1))) - # age_at_start + # age_at_start # This is Nan when days_since_reg_init is nan. This is because registration_init_time is nan when bd is nan. # We have removed negative entries, so we set Nans to -1, but don't add dummy because its equal to days_since_reg_init dummy. trans = trans.assign(age_at_start=lambda x: x['age_at_start'].fillna(-1.)) @@ -373,9 +375,9 @@ def get_age_at_start(df): .drop('first_churn', axis=1) .assign(no_prev_churns=lambda x: x['n_prev_churns'] == 0)) - # Drop variables that are not useful + # Drop variables that are not useful trans = (trans - .drop(['start_date', 'registration_init_time', 'churn_type', + .drop(['start_date', 'registration_init_time', 'churn_type', 'membership_expire_date', 'new_start'], axis=1)) @@ -386,7 +388,7 @@ def get_age_at_start(df): # ### Log transform variables # log_cols = ['actual_amount_paid', 'days_between_subs', 'days_since_reg_init', 'payment_plan_days', # 'plan_list_price'] - + # log_min_p = lambda x: np.log(x - x.min() + 1) # trans_log = trans.assign(**{col: log_min_p(trans[col]) for col in self.log_cols}) # assert trans_log[self.log_cols].pipe(np.isfinite).all().all() @@ -396,7 +398,7 @@ def get_age_at_start(df): float_cols = ['n_prev_churns', 'days_between_subs', 'days_since_reg_init', 'payment_plan_days', 'plan_list_price', 'age_at_start', 'actual_amount_paid', 'is_auto_renew', 'is_cancel', 'strange_age', 'nan_days_since_reg_init', 'no_prev_churns', 'duration', 'event'] - trans_log = trans_log.assign(**{col: trans_log[col].astype('float32') for col in float_cols}) + trans_log = trans_log.assign(**{col: trans_log[col].astype('float32') for col in float_cols}) # cov_file = join(data_dir, 'covariates.feather') trans_log.reset_index(drop=True).to_feather(self._path_dir / 'covariates.feather') @@ -433,7 +435,7 @@ def _clean_up(self): file.unlink() except IsADirectoryError: warnings.warn(f"Encountered directory in {self._path_dir}") - + def delete_local_copy(self): for path in [self.path_train, self.path_test, self.path_val, self.path_survival]: path.unlink() @@ -448,7 +450,7 @@ class _DatasetKKBoxAdmin(_DatasetKKBoxChurn): This is the version of the data set presented by Kvamme and Borgan (2019) [1]. - Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), + Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), with credentials (https://github.com/Kaggle/kaggle-api). The data set contains churn information from KKBox, an Asian music streaming service. Churn is @@ -484,7 +486,7 @@ class _DatasetKKBoxAdmin(_DatasetKKBoxChurn): is_cancel: If the customer has canceled the subscription. Subscription cancellation does not imply the user has churned. A user may cancel service subscription due to change of service plans or - other reasons. + other reasons. city: City of customer. gender: @@ -529,7 +531,7 @@ def read_df(self, log_trans=True, no_covs=False): The survival data set contains no covariates, but can be useful for extending the dataset with more covariates from Kaggle. - + Keyword Arguments: log_trans {bool} -- If covariates in 'kkbox.log_cols' (from Kvamme paper) should be transformed with 'z = log(x - min(x) + 1)'. (default: {True}) @@ -547,9 +549,9 @@ def read_df(self, log_trans=True, no_covs=False): if not path.exists(): print(f""" The KKBox dataset not locally available. - If you want to download, call 'kkbox.download_kkbox()', but note that + If you want to download, call 'kkbox.download_kkbox()', but note that this might take around 10 min! - NOTE: You need Kaggle credentials! Follow instructions at + NOTE: You need Kaggle credentials! Follow instructions at https://github.com/Kaggle/kaggle-api#api-credentials """) return None @@ -570,7 +572,7 @@ def log_min_p(col, df): df = df.rename(columns=dict(duration_lcd='duration', churn_lcd='event', duration_censor_lcd='censor_duration')) return df - + def _make_train_test_split(self, seed=1234): pass @@ -611,7 +613,7 @@ def days_without_membership(df): .assign(max_trans_date=lambda x: x.groupby('msno')['transaction_date'].transform('max')) .assign(final_churn=(lambda x: (x['max_trans_date'] <= LAST_CHURN_DATE) & - (x['transaction_date'] == x['max_trans_date']) & + (x['transaction_date'] == x['max_trans_date']) & (x['membership_expire_date'] <= LAST_CHURN_DATE) ))) # Churn: From training set @@ -668,8 +670,8 @@ def time_diff_days(df, last, first): # Add administrative censoring durations. # We add to types: # - Censoring based on the fixed data `LAST_CHURN_DATE` (lcd). - # - Censoring based on member ship expiration, where churned - # are censored at `last_churn_date`. + # - Censoring based on member ship expiration, where churned + # are censored at `last_churn_date`. indivs = (indivs .assign(duration_censor=lambda x: x['duration'], last_churn_date=pd.to_datetime(LAST_CHURN_DATE), @@ -695,7 +697,7 @@ def time_diff_days(df, last, first): assert (tmp['duration_lcd'] == tmp['duration_censor_lcd']).all(), 'Need all censor durations to be equal' # When multiple transactions on last day, remove all but the last - indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] + indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] assert indivs.shape == indivs.drop_duplicates(['msno', 'transaction_date']).shape assert (indivs['churn'] != indivs['censored']).all() @@ -706,8 +708,8 @@ def time_diff_days(df, last, first): indivs = (indivs .assign(churn_type=lambda x: 1*x['churn30'] + 2*x['final_churn'] + 4*x['train_churn']) - .assign(churn_type=lambda x: - np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', + .assign(churn_type=lambda x: + np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', 'train_and_final', 'train_30_and_final'])[x['churn_type']]) .drop(dropcols, axis=1)) @@ -730,7 +732,7 @@ def _make_survival_covariates(self): members = pd.read_feather(self._path_dir / 'members.feather') trans = (individs - .merge(pd.read_feather(self._path_dir / 'transactions.feather'), + .merge(pd.read_feather(self._path_dir / 'transactions.feather'), how='left', left_on=['msno', 'start_date'], right_on=['msno', 'transaction_date']) .drop(['transaction_date'], axis=1) # same as start_date .drop_duplicates(['msno', 'start_date'], keep='last') # keep last transaction on start_date (by idx) @@ -749,10 +751,10 @@ def get_age_at_start(df): .drop(['bd'], axis=1) .assign(strange_age=lambda x: (x['age_at_start'] <= 0) | (x['age_at_start'] >= 100), age_at_start=lambda x: x['age_at_start'].clip(lower=0, upper=100))) - # days_between_subs + # days_between_subs # There are None for (not new start), so we can just set them to zero, and we don't need to include another variable (as it allready exists). trans = trans.assign(days_between_subs=lambda x: x['days_between_subs'].fillna(0.)) - # days_since_reg_init + # days_since_reg_init # We remove negative entries, set Nans to -1, and add a categorical value for missing. pd.testing.assert_frame_equal(trans.loc[lambda x: x['days_since_reg_init'].isnull()], trans.loc[lambda x: x['age_at_start'].isnull()]) @@ -762,25 +764,25 @@ def get_age_at_start(df): .loc[lambda x: (x['days_since_reg_init'] >= 0) | x['days_since_reg_init'].isnull()] .assign(nan_days_since_reg_init=lambda x: x['days_since_reg_init'].isnull()) .assign(days_since_reg_init=lambda x: x['days_since_reg_init'].fillna(-1))) - # age_at_start + # age_at_start # This is Nan when days_since_reg_init is nan. This is because registration_init_time is nan when bd is nan. # We have removed negative entries, so we set Nans to -1, but don't add dummy because its equal to days_since_reg_init dummy. trans = trans.assign(age_at_start=lambda x: x['age_at_start'].fillna(-1.)) - # We use n_prev_churns == 0 as an indicator that there are no previous churn + # We use n_prev_churns == 0 as an indicator that there are no previous churn trans = (trans .assign(no_prev_churns=lambda x: x['n_prev_churns'] == 0)) - # Drop variables that are not useful + # Drop variables that are not useful trans = (trans - .drop(['start_date', 'registration_init_time', 'churn_type', + .drop(['start_date', 'registration_init_time', 'churn_type', 'membership_expire_date'], axis=1)) bool_cols = ['is_auto_renew', 'is_cancel'] cat_cols = ['payment_method_id', 'city', 'gender', 'registered_via'] int_cols = ['days_between_subs', 'days_since_reg_init', 'age_at_start'] - trans = trans.assign(**{col: trans[col].astype('bool') for col in bool_cols}) - trans = trans.assign(**{col: trans[col].cat.remove_unused_categories() for col in cat_cols}) - trans = trans.assign(**{col: trans[col].round().astype('int') for col in int_cols}) + trans = trans.assign(**{col: trans[col].astype('bool') for col in bool_cols}) + trans = trans.assign(**{col: trans[col].cat.remove_unused_categories() for col in cat_cols}) + trans = trans.assign(**{col: trans[col].round().astype('int') for col in int_cols}) trans = trans.assign(index_survival=trans.index.values).reset_index(drop=True) last_cols = ['duration', 'churn', 'duration_censor', 'duration_lcd', 'churn_lcd', 'duration_censor_lcd'] diff --git a/pycox/evaluation/concordance.py b/pycox/evaluation/concordance.py index 3ef002c..bc8173e 100644 --- a/pycox/evaluation/concordance.py +++ b/pycox/evaluation/concordance.py @@ -77,7 +77,7 @@ def concordance_td(durations, events, surv, surv_idx, method='adj_antolini'): Arguments: durations {np.array[n]} -- Event times (or censoring times.) events {np.array[n]} -- Event indicators (0 is censoring). - surv {np.array[n_times, n]} -- Survival function (each row is a duraratoin, and each col + surv {np.array[n_times, n]} -- Survival function (each row is a duration, and each col is an individual). surv_idx {np.array[n_test]} -- Mapping of survival_func s.t. 'surv_idx[i]' gives index in 'surv' corresponding to the event time of individual 'i'. diff --git a/setup.py b/setup.py index 040670f..2bce4ff 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ 'numba>=0.44', 'scikit-learn>=0.21.2', 'requests>=2.22.0', + 'py7zr>=0.11.3', ] setup(