From f7bf4ed8d9afc692844abc64a530b733501a2a8f Mon Sep 17 00:00:00 2001 From: Rohan Shad Date: Tue, 26 Jan 2021 10:41:42 -0800 Subject: [PATCH 1/8] torch lightning example for MLP logistic hazard model --- examples/lightning_logistic_hazard.py | 187 ++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 examples/lightning_logistic_hazard.py diff --git a/examples/lightning_logistic_hazard.py b/examples/lightning_logistic_hazard.py new file mode 100644 index 0000000..10eb24a --- /dev/null +++ b/examples/lightning_logistic_hazard.py @@ -0,0 +1,187 @@ +"""A minimal example of how to fit a LogisticHazard model with pytorch lightning +The point of this example is to make it simple to use the LogisticHazard models in other frameworks +that are not based on torchtuples. + +""" +from typing import Tuple + +import numpy as np +import pandas as pd + +import torch + +import pytorch_lightning as pl +from torch import nn +from torch.utils.data import DataLoader, TensorDataset + +from pycox.datasets import metabric +from pycox.evaluation import EvalSurv +from pycox.models import logistic_hazard + +# For preprocessing +from sklearn.preprocessing import StandardScaler +from sklearn_pandas import DataFrameMapper + +# Lightning Dataset Module +class metabrick(pl.LightningDataModule): + def __init__(self, batch_size = 256, num_durations=10, num_workers=0, verbose = False): + super().__init__() + self.batch_size = batch_size + self.num_durations = num_durations + self.verbose = verbose + self.num_workers = num_workers + + def setup(self, stage=None): + ''' + Get the METABRICK dataset split into a training dataframe and a testing dataframe. + Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. + ''' + + #Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + + self.x_train, self.x_test = self.__preprocess_features(df_train, df_test) + self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) + + if stage == 'fit' or stage is None: + #Pre-process features and targets + self.y_train = self.labtrans.fit_transform(df_train.duration.values, df_train.event.values) + self.y_train_duration = torch.from_numpy(self.y_train[0]) + self.y_train_event = torch.from_numpy(self.y_train[1]) + + #Create training dataset + self.train_set = TensorDataset(self.x_train, self.y_train_duration, self.y_train_event) + + #Input and output dimensions for building net + self.in_dims = self.x_train.shape[1] + self.out_dims = self.labtrans.out_features + + if stage == 'test' or stage is None: + #Return test dataframe + self.df_test = df_test + + def train_dataloader(self): + ''' + Build training dataloader + num_workers set to 0 by default because of some thread issue + ''' + train_loader = DataLoader( + dataset = self.train_set, + batch_size = self.batch_size, + shuffle = True, + num_workers = self.num_workers + ) + return train_loader + + @classmethod + def __preprocess_features(cls,df_train: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + ''' + Preprocess the covariates of the training and test set and return a tensor for the + taining covariates and test covariates. + ''' + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + return torch.from_numpy(x_train), torch.from_numpy(x_test) + +# Survival model class +class surv_model(pl.LightningModule): + def __init__(self, lr, in_features, out_features): + super().__init__() + ''' + Potentially allow for variable to specify appropriate loss function here? + ie: loss_func = logistic_hazard.NLLLogisticHazardLoss() + self.loss = loss_func + ''' + self.save_hyperparameters() + self.lr = lr + self.in_features = in_features + self.out_features = out_features + + #Define Model Here (in this case MLP) + self.net = nn.Sequential( + nn.Linear(self.in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, self.out_features), + ) + + # Define loss function: + self.loss_func = logistic_hazard.NLLLogistiHazardLoss() + + def forward(self, x): + batch_size, data = x.size() + x = self.net(x) + return x + + #Training step and validation step usually defined, this dataset only had train + test so left out val. + def training_step(self, batch, batch_idx): + x, duration, event = batch + output = self.forward(x) + loss = self.loss_func(output,duration,event) + + # progress bar logging metrics (add custom metric definitions later if useful?) + self.log('loss', loss, on_step = True, on_epoch=True, prog_bar = True) + return loss + + # def test_step(self, batch, batch_idx): + # x, duration, event = batch + # output = self.forward(x) + # surv = logistic_hazard.output2surv(output) + + # return surv + # # surv_df = pd.DataFrame(surv.numpy().transpose(), labtrans.cuts) + # # ev = EvalSurv(surv_df, duration.numpy().transpose(), event.numpy().transpose()) + # # print(ev) + + def configure_optimizers(self): + optimizer = torch.optim.Adam( + self.parameters(), + lr = self.lr + ) + return optimizer + +def main(): + #Load Lightning DataModule + dat = metabrick(num_workers=0) + dat.setup('fit') #allows for input / output features to be configured in the model + + #Load Lightning Module + model = surv_model(lr=1e-3, in_features=dat.in_dims, out_features=dat.out_dims) + trainer = pl.Trainer(gpus = 0, num_sanity_val_steps = 0, max_epochs = 20, fast_dev_run = False) + + #Train model + trainer.fit(model,dat) + + #Load model from best checkpoint & freeze + print('Running in Evaluation Mode...') + model.freeze() + + #Setup test data (prepared from lightning module) + dat.setup('test') + + #Predict survival on testing dataset + output = model(dat.x_test) + surv = logistic_hazard.output2surv(output) + surv_df = pd.DataFrame(surv.numpy().transpose(), dat.labtrans.cuts) + ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values) + + #Print evaluation metrics + print(f"Concordance: {ev.concordance_td()}") + +if __name__ == '__main__': + main() + From a0fa547b94d2cd3af63d56487eb97ec77d6f5848 Mon Sep 17 00:00:00 2001 From: Rohan Shad Date: Sun, 31 Jan 2021 11:15:08 -0800 Subject: [PATCH 2/8] Suggested PEP8 style edits, cleanup for PR #66. Added line to requirements.txt --- examples/lightning_logistic_hazard.py | 315 +++++++++++++------------- requirements-dev.txt | 1 + 2 files changed, 158 insertions(+), 158 deletions(-) diff --git a/examples/lightning_logistic_hazard.py b/examples/lightning_logistic_hazard.py index 10eb24a..7f5be3f 100644 --- a/examples/lightning_logistic_hazard.py +++ b/examples/lightning_logistic_hazard.py @@ -1,8 +1,10 @@ -"""A minimal example of how to fit a LogisticHazard model with pytorch lightning +''' +A minimal example of how to fit a LogisticHazard model with pytorch lightning The point of this example is to make it simple to use the LogisticHazard models in other frameworks that are not based on torchtuples. -""" +Original author: Rohan Shad @rohanshad +''' from typing import Tuple import numpy as np @@ -22,166 +24,163 @@ from sklearn.preprocessing import StandardScaler from sklearn_pandas import DataFrameMapper -# Lightning Dataset Module -class metabrick(pl.LightningDataModule): - def __init__(self, batch_size = 256, num_durations=10, num_workers=0, verbose = False): - super().__init__() - self.batch_size = batch_size - self.num_durations = num_durations - self.verbose = verbose - self.num_workers = num_workers - - def setup(self, stage=None): - ''' - Get the METABRICK dataset split into a training dataframe and a testing dataframe. - Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. - ''' - - #Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') - df_train = metabric.read_df() - df_test = df_train.sample(frac=0.2) - df_train = df_train.drop(df_test.index) - - self.x_train, self.x_test = self.__preprocess_features(df_train, df_test) - self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) - - if stage == 'fit' or stage is None: - #Pre-process features and targets - self.y_train = self.labtrans.fit_transform(df_train.duration.values, df_train.event.values) - self.y_train_duration = torch.from_numpy(self.y_train[0]) - self.y_train_event = torch.from_numpy(self.y_train[1]) - - #Create training dataset - self.train_set = TensorDataset(self.x_train, self.y_train_duration, self.y_train_event) - - #Input and output dimensions for building net - self.in_dims = self.x_train.shape[1] - self.out_dims = self.labtrans.out_features - - if stage == 'test' or stage is None: - #Return test dataframe - self.df_test = df_test - - def train_dataloader(self): - ''' - Build training dataloader - num_workers set to 0 by default because of some thread issue - ''' - train_loader = DataLoader( - dataset = self.train_set, - batch_size = self.batch_size, - shuffle = True, - num_workers = self.num_workers - ) - return train_loader - - @classmethod - def __preprocess_features(cls,df_train: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: - ''' - Preprocess the covariates of the training and test set and return a tensor for the - taining covariates and test covariates. - ''' - cols_standardize = ["x0", "x1", "x2", "x3", "x8"] - cols_leave = ["x4", "x5", "x6", "x7"] - - standardize = [([col], StandardScaler()) for col in cols_standardize] - leave = [(col, None) for col in cols_leave] - x_mapper = DataFrameMapper(standardize + leave) - - x_train = x_mapper.fit_transform(df_train).astype("float32") - x_test = x_mapper.transform(df_test).astype("float32") - return torch.from_numpy(x_train), torch.from_numpy(x_test) +# Lightning Dataset Module + + +class MetaBrick(pl.LightningDataModule): + def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0): + super().__init__() + self.batch_size = batch_size + self.num_durations = num_durations + self.num_workers = num_workers + + def setup(self, stage=None): + ''' + Get the METABRICK dataset split into a training dataframe and a testing dataframe. + Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. + ''' + + # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + + self.x_train, self.x_test = self._preprocess_features(df_train, df_test) + self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) + + if stage == 'fit' or stage is None: + # Pre-process features and targets + self.y_train = self.labtrans.fit_transform( + df_train.duration.values, df_train.event.values) + self.y_train_duration = torch.from_numpy(self.y_train[0]) + self.y_train_event = torch.from_numpy(self.y_train[1]) + + # Create training dataset + self.train_set = TensorDataset( + self.x_train, self.y_train_duration, self.y_train_event) + + # Input and output dimensions for building net + self.in_dims = self.x_train.shape[1] + self.out_dims = self.labtrans.out_features + + if stage == 'test' or stage is None: + # Return test dataframe + self.df_test = df_test + + def train_dataloader(self): + ''' + Build training dataloader + num_workers set to 0 by default because of some thread issue + ''' + train_loader = DataLoader( + dataset=self.train_set, + batch_size=self.batch_size, + shuffle=True, + num_workers=self.num_workers + ) + return train_loader + + @classmethod + def _preprocess_features(cls, df_train: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + ''' + Preprocess the covariates of the training and test set and return a tensor for the + taining covariates and test covariates. + ''' + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + return torch.from_numpy(x_train), torch.from_numpy(x_test) # Survival model class -class surv_model(pl.LightningModule): - def __init__(self, lr, in_features, out_features): - super().__init__() - ''' - Potentially allow for variable to specify appropriate loss function here? - ie: loss_func = logistic_hazard.NLLLogisticHazardLoss() - self.loss = loss_func - ''' - self.save_hyperparameters() - self.lr = lr - self.in_features = in_features - self.out_features = out_features - - #Define Model Here (in this case MLP) - self.net = nn.Sequential( - nn.Linear(self.in_features, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, self.out_features), - ) - - # Define loss function: - self.loss_func = logistic_hazard.NLLLogistiHazardLoss() - - def forward(self, x): - batch_size, data = x.size() - x = self.net(x) - return x - - #Training step and validation step usually defined, this dataset only had train + test so left out val. - def training_step(self, batch, batch_idx): - x, duration, event = batch - output = self.forward(x) - loss = self.loss_func(output,duration,event) - - # progress bar logging metrics (add custom metric definitions later if useful?) - self.log('loss', loss, on_step = True, on_epoch=True, prog_bar = True) - return loss - - # def test_step(self, batch, batch_idx): - # x, duration, event = batch - # output = self.forward(x) - # surv = logistic_hazard.output2surv(output) - - # return surv - # # surv_df = pd.DataFrame(surv.numpy().transpose(), labtrans.cuts) - # # ev = EvalSurv(surv_df, duration.numpy().transpose(), event.numpy().transpose()) - # # print(ev) - - def configure_optimizers(self): - optimizer = torch.optim.Adam( - self.parameters(), - lr = self.lr - ) - return optimizer + + +class SurvModel(pl.LightningModule): + ''' + Defines model, optimizers, forward step, and training step. + Define validation step as def validation_step if needed + Configured to use NLL logistic hazard loss from logistic_hazard.NLLLogisticHazardLoss() + ''' + + def __init__(self, lr, in_features, out_features): + super().__init__() + + self.save_hyperparameters() + self.lr = lr + self.in_features = in_features + self.out_features = out_features + + # Define Model Here (in this case MLP) + self.net = nn.Sequential( + nn.Linear(self.in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, self.out_features), + ) + + # Define loss function: + self.loss_func = logistic_hazard.NLLLogistiHazardLoss() + + def forward(self, x): + batch_size, data = x.size() + x = self.net(x) + return x + + # Training step and validation step usually defined, this dataset only had train + test so left out val. + def training_step(self, batch, batch_idx): + x, duration, event = batch + output = self.forward(x) + loss = self.loss_func(output, duration, event) + + # progress bar logging metrics (add custom metric definitions later if useful?) + self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) + return loss + + def configure_optimizers(self): + optimizer = torch.optim.Adam( + self.parameters(), + lr = self.lr + ) + return optimizer def main(): - #Load Lightning DataModule - dat = metabrick(num_workers=0) - dat.setup('fit') #allows for input / output features to be configured in the model - - #Load Lightning Module - model = surv_model(lr=1e-3, in_features=dat.in_dims, out_features=dat.out_dims) - trainer = pl.Trainer(gpus = 0, num_sanity_val_steps = 0, max_epochs = 20, fast_dev_run = False) - - #Train model - trainer.fit(model,dat) - - #Load model from best checkpoint & freeze - print('Running in Evaluation Mode...') - model.freeze() - - #Setup test data (prepared from lightning module) - dat.setup('test') - - #Predict survival on testing dataset - output = model(dat.x_test) - surv = logistic_hazard.output2surv(output) - surv_df = pd.DataFrame(surv.numpy().transpose(), dat.labtrans.cuts) - ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values) - - #Print evaluation metrics - print(f"Concordance: {ev.concordance_td()}") + # Load Lightning DataModule + dat = MetaBrick(num_workers=0) + dat.setup('fit') #allows for input / output features to be configured in the model + + # Load Lightning Module + model = SurvModel(lr=1e-3, in_features=dat.in_dims, out_features=dat.out_dims) + trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) + + # Train model + trainer.fit(model,dat) + + # Load final model & freeze + print('Running in Evaluation Mode...') + model.freeze() + + # Setup test data (prepared from lightning module) + dat.setup('test') + + # Predict survival on testing dataset + output = model(dat.x_test) + surv = logistic_hazard.output2surv(output) + surv_df = pd.DataFrame(surv.numpy().transpose(), dat.labtrans.cuts) + ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values) + + # Print evaluation metrics + print(f"Concordance: {ev.concordance_td()}") if __name__ == '__main__': - main() + main() diff --git a/requirements-dev.txt b/requirements-dev.txt index 1f53d7a..355a373 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ pytest>=4.0.2 lifelines>=0.22.8 sklearn-pandas>=1.8.0 +pytorch-lightning>=1.0.4 \ No newline at end of file From 64f1c93f0e0fda2510b74621cd7758c3a73ed05a Mon Sep 17 00:00:00 2001 From: Haavard Kvamme Date: Tue, 2 Feb 2021 14:33:58 +0100 Subject: [PATCH 3/8] Use py7zr for uncompressin kkbox data (#69) Co-authored-by: Haavard Kvamme --- pycox/datasets/from_kkbox.py | 116 ++++++++++++++++++----------------- setup.py | 1 + 2 files changed, 60 insertions(+), 57 deletions(-) diff --git a/pycox/datasets/from_kkbox.py b/pycox/datasets/from_kkbox.py index 7239f06..53d657e 100644 --- a/pycox/datasets/from_kkbox.py +++ b/pycox/datasets/from_kkbox.py @@ -10,7 +10,7 @@ class _DatasetKKBoxChurn(_DatasetLoader): This is the version of the data set presented by Kvamme et al. (2019) [1], but the preferred version is the `kkbox` version which included administrative censoring labels and and extra categorical variable. - Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), + Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), with credentials (https://github.com/Kaggle/kaggle-api). The data set contains churn information from KKBox, an Asian music streaming service. Churn is @@ -46,7 +46,7 @@ class _DatasetKKBoxChurn(_DatasetLoader): is_cancel: If the customer has canceled the subscription. Subscription cancellation does not imply the user has churned. A user may cancel service subscription due to change of service plans or - other reasons. + other reasons. city: City of customer. gender: @@ -87,7 +87,7 @@ def read_df(self, subset='train', log_trans=True): The survival data set contains no covariates, but can be useful for extending the dataset with more covariates from Kaggle. - + Keyword Arguments: subset {str} -- Which subset to use ('train', 'val', 'test'). Can also set 'survival' which will give df with survival information without @@ -106,13 +106,13 @@ def read_df(self, subset='train', log_trans=True): return pd.read_feather(path) else: raise ValueError(f"Need 'subset' to be 'train', 'val', or 'test'. Got {subset}") - + if not path.exists(): print(f""" The KKBox dataset not locally available. - If you want to download, call 'kkbox_v1.download_kkbox()', but note that + If you want to download, call 'kkbox_v1.download_kkbox()', but note that this might take around 10 min! - NOTE: You need Kaggle credentials! Follow instructions at + NOTE: You need Kaggle credentials! Follow instructions at https://github.com/Kaggle/kaggle-api#api-credentials """) return None @@ -130,9 +130,9 @@ def log_min_p(col, df): def download_kkbox(self): - """Download KKBox data set. + """Download KKBox data set. This is likely to take around 10 min!!! - NOTE: You need Kaggle credentials! Follow instructions at + NOTE: You need Kaggle credentials! Follow instructions at https://github.com/Kaggle/kaggle-api#api-credentials """ self._download() @@ -149,7 +149,7 @@ def _download(self): self._make_train_test_split() print('Cleaning up...') self._clean_up() - print("Done! You can now call `df = kkbox_v1.read_df()`.") + print("Done! You can now call `df = kkbox.read_df()`.") def _setup_download_dir(self): if self._path_dir.exists(): @@ -159,6 +159,7 @@ def _setup_download_dir(self): def _7z_from_kaggle(self): import subprocess + import py7zr try: import kaggle except OSError as e: @@ -175,8 +176,9 @@ def _7z_from_kaggle(self): path=self._path_dir, force=True) for file in files: print(f"Extracting '{file}'...") - subprocess.check_output(['7z', 'x', str(self._path_dir / (file + '.csv.7z')), - f"-o{self._path_dir}"]) + archive = py7zr.SevenZipFile(self._path_dir / f"{file}.csv.7z", mode="r") + archive.extractall(path=self._path_dir) + archive.close() print(f"Finished extracting '{file}'.") def _csv_to_feather_with_types(self): @@ -214,12 +216,12 @@ def _make_survival_data(self): def days_without_membership(df): diff = (df['next_trans_date'] - df['membership_expire_date']).dt.total_seconds() return diff / (60 * 60 * 24) - + trans = (trans .sort_values(['msno', 'transaction_date']) .assign(next_trans_date=(lambda x: x.groupby('msno')['transaction_date'].shift(-1))) .assign(churn30=lambda x: days_without_membership(x) > 30)) - + # Remove entries with membership_expire_date < transaction_date trans = trans.loc[lambda x: x['transaction_date'] <= x['membership_expire_date']] assert (trans.loc[lambda x: x['churn30']==True].groupby(['msno', 'transaction_date'])['msno'].count().max() == 1) @@ -229,10 +231,10 @@ def days_without_membership(df): .assign(max_trans_date=lambda x: x.groupby('msno')['transaction_date'].transform('max')) .assign(final_churn=(lambda x: (x['max_trans_date'] <= last_churn_date) & - (x['transaction_date'] == x['max_trans_date']) & + (x['transaction_date'] == x['max_trans_date']) & (x['membership_expire_date'] <= last_churn_date) ))) - + # Churn: From training set trans = (trans .merge(train, how='left', on='msno') @@ -240,14 +242,14 @@ def days_without_membership(df): .drop('is_churn', axis=1) .assign(train_churn=lambda x: (x['max_trans_date'] == x['transaction_date']) & x['train_churn']) .assign(churn=lambda x: x['train_churn'] | x['churn30'] | x['final_churn'])) - + # Split individuals on churn trans = (trans .join(trans .sort_values(['msno', 'transaction_date']) .groupby('msno')[['churn30', 'membership_expire_date']].shift(1) .rename(columns={'churn30': 'new_start', 'membership_expire_date': 'prev_mem_exp_date'}))) - + def number_of_new_starts(df): return (df .assign(new_start=lambda x: x['new_start'].astype('float')) @@ -255,7 +257,7 @@ def number_of_new_starts(df): .groupby('msno') ['new_start'].cumsum().fillna(0.) .astype('int')) - + def days_between_subs(df): diff = (df['transaction_date'] - df['prev_mem_exp_date']).dt diff = diff.total_seconds() / (60 * 60 * 24) @@ -266,22 +268,22 @@ def days_between_subs(df): trans = (trans .assign(n_prev_churns=lambda x: number_of_new_starts(x), days_between_subs=lambda x: days_between_subs(x))) - + # Set start times trans = (trans .assign(start_date=trans.groupby(['msno', 'n_prev_churns'])['transaction_date'].transform('min')) .assign(first_churn=lambda x: (x['n_prev_churns'] == 0) & (x['churn'] == True))) - + # Get only last transactions (per chrun) indivs = (trans .assign(censored=lambda x: x.groupby('msno')['churn'].transform('sum') == 0) - .assign(last_censored=(lambda x: + .assign(last_censored=(lambda x: (x['censored'] == True) & (x['transaction_date'] == x['max_trans_date']) )) .loc[lambda x: x['last_censored'] | x['churn']] .merge(members[['msno', 'registration_init_time']], how='left', on='msno')) - + def time_diff_days(df, last, first): return (df[last] - df[first]).dt.total_seconds() / (60 * 60 * 24) @@ -290,7 +292,7 @@ def time_diff_days(df, last, first): days_since_reg_init=lambda x: time_diff_days(x, 'start_date', 'registration_init_time'))) # When multiple transactions on last day, remove all but the last - indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] + indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] assert indivs.shape == indivs.drop_duplicates(['msno', 'transaction_date']).shape assert (indivs['churn'] != indivs['censored']).all() @@ -300,8 +302,8 @@ def time_diff_days(df, last, first): indivs = (indivs .assign(churn_type=lambda x: 1*x['churn30'] + 2*x['final_churn'] + 4*x['train_churn']) - .assign(churn_type=lambda x: - np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', + .assign(churn_type=lambda x: + np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', 'train_and_final', 'train_30_and_final'])[x['churn_type']]) .drop(dropcols, axis=1)) @@ -326,7 +328,7 @@ def _make_survival_covariates(self): individs = pd.read_feather(self.path_survival) members = pd.read_feather(self._path_dir / 'members.feather') trans = (individs - .merge(pd.read_feather(self._path_dir / 'transactions.feather'), + .merge(pd.read_feather(self._path_dir / 'transactions.feather'), how='left', left_on=['msno', 'start_date'], right_on=['msno', 'transaction_date']) .drop('transaction_date', axis=1) # same as start_date .drop_duplicates(['msno', 'start_date'], keep='last') # keep last transaction on start_date (by idx) @@ -338,7 +340,7 @@ def get_age_at_start(df): # Not important what the date is, though it is reasonalbe to use the last. age_diff = (fixed_date - df['start_date']).dt.total_seconds() / (60*60*24*365) return np.round(df['bd'] - age_diff) - + trans = (trans .merge(members.drop(['registration_init_time'], axis=1), how='left', on='msno') .assign(age_at_start=lambda x: get_age_at_start(x)) @@ -346,11 +348,11 @@ def get_age_at_start(df): .assign(strange_age=lambda x: (x['age_at_start'] <= 0) | (x['age_at_start'] >= 100), age_at_start=lambda x: x['age_at_start'].clip(lower=0, upper=100))) - # days_between_subs + # days_between_subs # There are None for (not new start), so we can just set them to zero, and we don't need to include another variable (as it allready exists). trans = trans.assign(days_between_subs=lambda x: x['days_between_subs'].fillna(0.)) - # days_since_reg_init + # days_since_reg_init # We remove negative entries, set Nans to -1, and add a categorical value for missing. pd.testing.assert_frame_equal(trans.loc[lambda x: x['days_since_reg_init'].isnull()], trans.loc[lambda x: x['age_at_start'].isnull()]) @@ -361,7 +363,7 @@ def get_age_at_start(df): .assign(nan_days_since_reg_init=lambda x: x['days_since_reg_init'].isnull()) .assign(days_since_reg_init=lambda x: x['days_since_reg_init'].fillna(-1))) - # age_at_start + # age_at_start # This is Nan when days_since_reg_init is nan. This is because registration_init_time is nan when bd is nan. # We have removed negative entries, so we set Nans to -1, but don't add dummy because its equal to days_since_reg_init dummy. trans = trans.assign(age_at_start=lambda x: x['age_at_start'].fillna(-1.)) @@ -373,9 +375,9 @@ def get_age_at_start(df): .drop('first_churn', axis=1) .assign(no_prev_churns=lambda x: x['n_prev_churns'] == 0)) - # Drop variables that are not useful + # Drop variables that are not useful trans = (trans - .drop(['start_date', 'registration_init_time', 'churn_type', + .drop(['start_date', 'registration_init_time', 'churn_type', 'membership_expire_date', 'new_start'], axis=1)) @@ -386,7 +388,7 @@ def get_age_at_start(df): # ### Log transform variables # log_cols = ['actual_amount_paid', 'days_between_subs', 'days_since_reg_init', 'payment_plan_days', # 'plan_list_price'] - + # log_min_p = lambda x: np.log(x - x.min() + 1) # trans_log = trans.assign(**{col: log_min_p(trans[col]) for col in self.log_cols}) # assert trans_log[self.log_cols].pipe(np.isfinite).all().all() @@ -396,7 +398,7 @@ def get_age_at_start(df): float_cols = ['n_prev_churns', 'days_between_subs', 'days_since_reg_init', 'payment_plan_days', 'plan_list_price', 'age_at_start', 'actual_amount_paid', 'is_auto_renew', 'is_cancel', 'strange_age', 'nan_days_since_reg_init', 'no_prev_churns', 'duration', 'event'] - trans_log = trans_log.assign(**{col: trans_log[col].astype('float32') for col in float_cols}) + trans_log = trans_log.assign(**{col: trans_log[col].astype('float32') for col in float_cols}) # cov_file = join(data_dir, 'covariates.feather') trans_log.reset_index(drop=True).to_feather(self._path_dir / 'covariates.feather') @@ -433,7 +435,7 @@ def _clean_up(self): file.unlink() except IsADirectoryError: warnings.warn(f"Encountered directory in {self._path_dir}") - + def delete_local_copy(self): for path in [self.path_train, self.path_test, self.path_val, self.path_survival]: path.unlink() @@ -448,7 +450,7 @@ class _DatasetKKBoxAdmin(_DatasetKKBoxChurn): This is the version of the data set presented by Kvamme and Borgan (2019) [1]. - Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), + Requires installation of the Kaggle API (https://github.com/Kaggle/kaggle-api), with credentials (https://github.com/Kaggle/kaggle-api). The data set contains churn information from KKBox, an Asian music streaming service. Churn is @@ -484,7 +486,7 @@ class _DatasetKKBoxAdmin(_DatasetKKBoxChurn): is_cancel: If the customer has canceled the subscription. Subscription cancellation does not imply the user has churned. A user may cancel service subscription due to change of service plans or - other reasons. + other reasons. city: City of customer. gender: @@ -529,7 +531,7 @@ def read_df(self, log_trans=True, no_covs=False): The survival data set contains no covariates, but can be useful for extending the dataset with more covariates from Kaggle. - + Keyword Arguments: log_trans {bool} -- If covariates in 'kkbox.log_cols' (from Kvamme paper) should be transformed with 'z = log(x - min(x) + 1)'. (default: {True}) @@ -547,9 +549,9 @@ def read_df(self, log_trans=True, no_covs=False): if not path.exists(): print(f""" The KKBox dataset not locally available. - If you want to download, call 'kkbox.download_kkbox()', but note that + If you want to download, call 'kkbox.download_kkbox()', but note that this might take around 10 min! - NOTE: You need Kaggle credentials! Follow instructions at + NOTE: You need Kaggle credentials! Follow instructions at https://github.com/Kaggle/kaggle-api#api-credentials """) return None @@ -570,7 +572,7 @@ def log_min_p(col, df): df = df.rename(columns=dict(duration_lcd='duration', churn_lcd='event', duration_censor_lcd='censor_duration')) return df - + def _make_train_test_split(self, seed=1234): pass @@ -611,7 +613,7 @@ def days_without_membership(df): .assign(max_trans_date=lambda x: x.groupby('msno')['transaction_date'].transform('max')) .assign(final_churn=(lambda x: (x['max_trans_date'] <= LAST_CHURN_DATE) & - (x['transaction_date'] == x['max_trans_date']) & + (x['transaction_date'] == x['max_trans_date']) & (x['membership_expire_date'] <= LAST_CHURN_DATE) ))) # Churn: From training set @@ -668,8 +670,8 @@ def time_diff_days(df, last, first): # Add administrative censoring durations. # We add to types: # - Censoring based on the fixed data `LAST_CHURN_DATE` (lcd). - # - Censoring based on member ship expiration, where churned - # are censored at `last_churn_date`. + # - Censoring based on member ship expiration, where churned + # are censored at `last_churn_date`. indivs = (indivs .assign(duration_censor=lambda x: x['duration'], last_churn_date=pd.to_datetime(LAST_CHURN_DATE), @@ -695,7 +697,7 @@ def time_diff_days(df, last, first): assert (tmp['duration_lcd'] == tmp['duration_censor_lcd']).all(), 'Need all censor durations to be equal' # When multiple transactions on last day, remove all but the last - indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] + indivs = indivs.loc[lambda x: x['transaction_date'] != x['next_trans_date']] assert indivs.shape == indivs.drop_duplicates(['msno', 'transaction_date']).shape assert (indivs['churn'] != indivs['censored']).all() @@ -706,8 +708,8 @@ def time_diff_days(df, last, first): indivs = (indivs .assign(churn_type=lambda x: 1*x['churn30'] + 2*x['final_churn'] + 4*x['train_churn']) - .assign(churn_type=lambda x: - np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', + .assign(churn_type=lambda x: + np.array(['censoring', '30days', 'final', '30days_and_final', 'train', 'train_and_30', 'train_and_final', 'train_30_and_final'])[x['churn_type']]) .drop(dropcols, axis=1)) @@ -730,7 +732,7 @@ def _make_survival_covariates(self): members = pd.read_feather(self._path_dir / 'members.feather') trans = (individs - .merge(pd.read_feather(self._path_dir / 'transactions.feather'), + .merge(pd.read_feather(self._path_dir / 'transactions.feather'), how='left', left_on=['msno', 'start_date'], right_on=['msno', 'transaction_date']) .drop(['transaction_date'], axis=1) # same as start_date .drop_duplicates(['msno', 'start_date'], keep='last') # keep last transaction on start_date (by idx) @@ -749,10 +751,10 @@ def get_age_at_start(df): .drop(['bd'], axis=1) .assign(strange_age=lambda x: (x['age_at_start'] <= 0) | (x['age_at_start'] >= 100), age_at_start=lambda x: x['age_at_start'].clip(lower=0, upper=100))) - # days_between_subs + # days_between_subs # There are None for (not new start), so we can just set them to zero, and we don't need to include another variable (as it allready exists). trans = trans.assign(days_between_subs=lambda x: x['days_between_subs'].fillna(0.)) - # days_since_reg_init + # days_since_reg_init # We remove negative entries, set Nans to -1, and add a categorical value for missing. pd.testing.assert_frame_equal(trans.loc[lambda x: x['days_since_reg_init'].isnull()], trans.loc[lambda x: x['age_at_start'].isnull()]) @@ -762,25 +764,25 @@ def get_age_at_start(df): .loc[lambda x: (x['days_since_reg_init'] >= 0) | x['days_since_reg_init'].isnull()] .assign(nan_days_since_reg_init=lambda x: x['days_since_reg_init'].isnull()) .assign(days_since_reg_init=lambda x: x['days_since_reg_init'].fillna(-1))) - # age_at_start + # age_at_start # This is Nan when days_since_reg_init is nan. This is because registration_init_time is nan when bd is nan. # We have removed negative entries, so we set Nans to -1, but don't add dummy because its equal to days_since_reg_init dummy. trans = trans.assign(age_at_start=lambda x: x['age_at_start'].fillna(-1.)) - # We use n_prev_churns == 0 as an indicator that there are no previous churn + # We use n_prev_churns == 0 as an indicator that there are no previous churn trans = (trans .assign(no_prev_churns=lambda x: x['n_prev_churns'] == 0)) - # Drop variables that are not useful + # Drop variables that are not useful trans = (trans - .drop(['start_date', 'registration_init_time', 'churn_type', + .drop(['start_date', 'registration_init_time', 'churn_type', 'membership_expire_date'], axis=1)) bool_cols = ['is_auto_renew', 'is_cancel'] cat_cols = ['payment_method_id', 'city', 'gender', 'registered_via'] int_cols = ['days_between_subs', 'days_since_reg_init', 'age_at_start'] - trans = trans.assign(**{col: trans[col].astype('bool') for col in bool_cols}) - trans = trans.assign(**{col: trans[col].cat.remove_unused_categories() for col in cat_cols}) - trans = trans.assign(**{col: trans[col].round().astype('int') for col in int_cols}) + trans = trans.assign(**{col: trans[col].astype('bool') for col in bool_cols}) + trans = trans.assign(**{col: trans[col].cat.remove_unused_categories() for col in cat_cols}) + trans = trans.assign(**{col: trans[col].round().astype('int') for col in int_cols}) trans = trans.assign(index_survival=trans.index.values).reset_index(drop=True) last_cols = ['duration', 'churn', 'duration_censor', 'duration_lcd', 'churn_lcd', 'duration_censor_lcd'] diff --git a/setup.py b/setup.py index 040670f..2bce4ff 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ 'numba>=0.44', 'scikit-learn>=0.21.2', 'requests>=2.22.0', + 'py7zr>=0.11.3', ] setup( From 5483489d21f3441e53f78f9f8898ce607f41c632 Mon Sep 17 00:00:00 2001 From: Rohan Shad Date: Tue, 2 Feb 2021 09:23:54 -0800 Subject: [PATCH 4/8] tabs vs spaces --- examples/lightning_logistic_hazard.py | 292 +++++++++++++------------- 1 file changed, 146 insertions(+), 146 deletions(-) diff --git a/examples/lightning_logistic_hazard.py b/examples/lightning_logistic_hazard.py index 7f5be3f..26bc935 100644 --- a/examples/lightning_logistic_hazard.py +++ b/examples/lightning_logistic_hazard.py @@ -28,159 +28,159 @@ class MetaBrick(pl.LightningDataModule): - def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0): - super().__init__() - self.batch_size = batch_size - self.num_durations = num_durations - self.num_workers = num_workers - - def setup(self, stage=None): - ''' - Get the METABRICK dataset split into a training dataframe and a testing dataframe. - Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. - ''' - - # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') - df_train = metabric.read_df() - df_test = df_train.sample(frac=0.2) - df_train = df_train.drop(df_test.index) - - self.x_train, self.x_test = self._preprocess_features(df_train, df_test) - self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) - - if stage == 'fit' or stage is None: - # Pre-process features and targets - self.y_train = self.labtrans.fit_transform( - df_train.duration.values, df_train.event.values) - self.y_train_duration = torch.from_numpy(self.y_train[0]) - self.y_train_event = torch.from_numpy(self.y_train[1]) - - # Create training dataset - self.train_set = TensorDataset( - self.x_train, self.y_train_duration, self.y_train_event) - - # Input and output dimensions for building net - self.in_dims = self.x_train.shape[1] - self.out_dims = self.labtrans.out_features - - if stage == 'test' or stage is None: - # Return test dataframe - self.df_test = df_test - - def train_dataloader(self): - ''' - Build training dataloader - num_workers set to 0 by default because of some thread issue - ''' - train_loader = DataLoader( - dataset=self.train_set, - batch_size=self.batch_size, - shuffle=True, - num_workers=self.num_workers - ) - return train_loader - - @classmethod - def _preprocess_features(cls, df_train: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: - ''' - Preprocess the covariates of the training and test set and return a tensor for the - taining covariates and test covariates. - ''' - cols_standardize = ["x0", "x1", "x2", "x3", "x8"] - cols_leave = ["x4", "x5", "x6", "x7"] - - standardize = [([col], StandardScaler()) for col in cols_standardize] - leave = [(col, None) for col in cols_leave] - x_mapper = DataFrameMapper(standardize + leave) - - x_train = x_mapper.fit_transform(df_train).astype("float32") - x_test = x_mapper.transform(df_test).astype("float32") - return torch.from_numpy(x_train), torch.from_numpy(x_test) + def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0): + super().__init__() + self.batch_size = batch_size + self.num_durations = num_durations + self.num_workers = num_workers + + def setup(self, stage=None): + ''' + Get the METABRICK dataset split into a training dataframe and a testing dataframe. + Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. + ''' + + # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + + self.x_train, self.x_test = self._preprocess_features(df_train, df_test) + self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) + + if stage == 'fit' or stage is None: + # Pre-process features and targets + self.y_train = self.labtrans.fit_transform( + df_train.duration.values, df_train.event.values) + self.y_train_duration = torch.from_numpy(self.y_train[0]) + self.y_train_event = torch.from_numpy(self.y_train[1]) + + # Create training dataset + self.train_set = TensorDataset( + self.x_train, self.y_train_duration, self.y_train_event) + + # Input and output dimensions for building net + self.in_dims = self.x_train.shape[1] + self.out_dims = self.labtrans.out_features + + if stage == 'test' or stage is None: + # Return test dataframe + self.df_test = df_test + + def train_dataloader(self): + ''' + Build training dataloader + num_workers set to 0 by default because of some thread issue + ''' + train_loader = DataLoader( + dataset=self.train_set, + batch_size=self.batch_size, + shuffle=True, + num_workers=self.num_workers + ) + return train_loader + + @classmethod + def _preprocess_features(cls, df_train: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + ''' + Preprocess the covariates of the training and test set and return a tensor for the + taining covariates and test covariates. + ''' + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + return torch.from_numpy(x_train), torch.from_numpy(x_test) # Survival model class class SurvModel(pl.LightningModule): - ''' - Defines model, optimizers, forward step, and training step. - Define validation step as def validation_step if needed - Configured to use NLL logistic hazard loss from logistic_hazard.NLLLogisticHazardLoss() - ''' - - def __init__(self, lr, in_features, out_features): - super().__init__() - - self.save_hyperparameters() - self.lr = lr - self.in_features = in_features - self.out_features = out_features - - # Define Model Here (in this case MLP) - self.net = nn.Sequential( - nn.Linear(self.in_features, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, self.out_features), - ) - - # Define loss function: - self.loss_func = logistic_hazard.NLLLogistiHazardLoss() - - def forward(self, x): - batch_size, data = x.size() - x = self.net(x) - return x - - # Training step and validation step usually defined, this dataset only had train + test so left out val. - def training_step(self, batch, batch_idx): - x, duration, event = batch - output = self.forward(x) - loss = self.loss_func(output, duration, event) - - # progress bar logging metrics (add custom metric definitions later if useful?) - self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) - return loss - - def configure_optimizers(self): - optimizer = torch.optim.Adam( - self.parameters(), - lr = self.lr - ) - return optimizer + ''' + Defines model, optimizers, forward step, and training step. + Define validation step as def validation_step if needed + Configured to use NLL logistic hazard loss from logistic_hazard.NLLLogisticHazardLoss() + ''' + + def __init__(self, lr, in_features, out_features): + super().__init__() + + self.save_hyperparameters() + self.lr = lr + self.in_features = in_features + self.out_features = out_features + + # Define Model Here (in this case MLP) + self.net = nn.Sequential( + nn.Linear(self.in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, self.out_features), + ) + + # Define loss function: + self.loss_func = logistic_hazard.NLLLogistiHazardLoss() + + def forward(self, x): + batch_size, data = x.size() + x = self.net(x) + return x + + # Training step and validation step usually defined, this dataset only had train + test so left out val. + def training_step(self, batch, batch_idx): + x, duration, event = batch + output = self.forward(x) + loss = self.loss_func(output, duration, event) + + # progress bar logging metrics (add custom metric definitions later if useful?) + self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) + return loss + + def configure_optimizers(self): + optimizer = torch.optim.Adam( + self.parameters(), + lr = self.lr + ) + return optimizer def main(): - # Load Lightning DataModule - dat = MetaBrick(num_workers=0) - dat.setup('fit') #allows for input / output features to be configured in the model - - # Load Lightning Module - model = SurvModel(lr=1e-3, in_features=dat.in_dims, out_features=dat.out_dims) - trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) - - # Train model - trainer.fit(model,dat) - - # Load final model & freeze - print('Running in Evaluation Mode...') - model.freeze() - - # Setup test data (prepared from lightning module) - dat.setup('test') - - # Predict survival on testing dataset - output = model(dat.x_test) - surv = logistic_hazard.output2surv(output) - surv_df = pd.DataFrame(surv.numpy().transpose(), dat.labtrans.cuts) - ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values) - - # Print evaluation metrics - print(f"Concordance: {ev.concordance_td()}") + # Load Lightning DataModule + dat = MetaBrick(num_workers=0) + dat.setup('fit') #allows for input / output features to be configured in the model + + # Load Lightning Module + model = SurvModel(lr=1e-3, in_features=dat.in_dims, out_features=dat.out_dims) + trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) + + # Train model + trainer.fit(model,dat) + + # Load final model & freeze + print('Running in Evaluation Mode...') + model.freeze() + + # Setup test data (prepared from lightning module) + dat.setup('test') + + # Predict survival on testing dataset + output = model(dat.x_test) + surv = logistic_hazard.output2surv(output) + surv_df = pd.DataFrame(surv.numpy().transpose(), dat.labtrans.cuts) + ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values) + + # Print evaluation metrics + print(f"Concordance: {ev.concordance_td()}") if __name__ == '__main__': - main() + main() From 138b58637d35120f14b4a2fb523cbcac9572316a Mon Sep 17 00:00:00 2001 From: rohanshad Date: Sat, 6 Feb 2021 13:30:46 -0800 Subject: [PATCH 5/8] Initial attempts at cox_ph refactor --- examples/torch_cox.py | 168 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 examples/torch_cox.py diff --git a/examples/torch_cox.py b/examples/torch_cox.py new file mode 100644 index 0000000..b129f31 --- /dev/null +++ b/examples/torch_cox.py @@ -0,0 +1,168 @@ +''' +A minimal example of how to fit a LogisticHazard model with a vanilla torch training loop. +The point of this example is to make it simple to use the LogisticHazard models in other frameworks +that are not based on torchtuples. +''' +from typing import Tuple + +import numpy as np +import pandas as pd + +import torch +from torch import nn +from torch.utils.data import DataLoader, TensorDataset + +from pycox.datasets import metabric +from pycox.evaluation import EvalSurv +from pycox.models import cox + +# For preprocessing +from sklearn.preprocessing import StandardScaler +from sklearn_pandas import DataFrameMapper +import torchtuples as tt + +def get_metabrick_train_val_test() -> Tuple[pd.DataFrame]: + """Get the METABRICK dataset split into a trainin dataframe and a testing dataframe.""" + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + df_val = df_train.sample(frac=0.2) + df_train = df_train.drop(df_val.index) + return df_train, df_val, df_test + + +def preprocess_features(df_train: pd.DataFrame, df_val: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + """Preprocess the covariates of the training and test set and return a tensor for the + taining covariates and test covariates. + """ + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_val = x_mapper.transform(df_val).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + + return torch.from_numpy(x_train), torch.from_numpy(x_val), torch.from_numpy(x_test) + + +def make_mlp(in_features: int, out_features: int) -> nn.Module: + """Make a simple torch net""" + net = nn.Sequential( + nn.Linear(in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, out_features), + ) + return net + +def get_target(df : pd.DataFrame) -> np.ndarray: + ''' + Takes pandas datframe and converts the duration / event targets into np.array + ''' + duration = df['duration'].to_numpy().reshape(len(df['duration']),1) + event = df['event'].to_numpy().reshape(len(df['event']),1) + + return duration, event + +def compute_baseline_hazards(input,target): + if (input is None) and (target is None): + raise ValueError("Need to give a 'input' and 'target' to this function.") + input, target + + print(target.size()) + durations, events = target + df = pd.DataFrame({duration_col: durations, event_col: events}) + + if sample is not None: + if sample >= 1: + df = df.sample(n=sample) + else: + df = df.sample(frac=sample) + + input = tt.tuplefy(input).to_numpy().iloc[df.index.values] + base_haz = self._compute_baseline_hazards(input, df, max_duration, batch_size, + eval_=eval_, num_workers=num_workers) + if set_hazards: + self.compute_baseline_cumulative_hazards(set_hazards=True, baseline_hazards_=base_haz) + return base_haz + +def main() -> None: + # Get the metabrick dataset split in a train and test set + np.random.seed(1234) + torch.manual_seed(123) + df_train, df_val, df_test = get_metabrick_train_val_test() + + # Preprocess features + x_train, x_val, x_test = preprocess_features(df_train, df_val, df_test) + + y_train = torch.from_numpy(np.concatenate(get_target(df_train), axis=1)) + + + y_val = torch.from_numpy(np.array(get_target(df_val))) + + #Probably have to change this to something like test_target? + durations_test, events_test = get_target(df_test) + val = x_val, y_val + + # num_durations = 10 + # labtrans = logistic_hazard.LabTransDiscreteTime(num_durations) + # y_train = labtrans.fit_transform(df_train.duration.values, df_train.event.values) + # y_train_duration = torch.from_numpy(y_train[0]) + # y_train_event = torch.from_numpy(y_train[1]) + + # Make an MLP nerual network + in_features = x_train.shape[1] + out_features = 1 + net = make_mlp(in_features, out_features) + + batch_size = 256 + epochs = 20 + + train_dataset = TensorDataset(x_train, y_train) + train_dataloader = DataLoader(train_dataset, batch_size, shuffle=True) + + # if verbose: + # print('Durations and events in order:') + # print(y_train[:,0]) + # print(y_train[:,1] + + # Set optimizer and loss function (optimization criterion) + optimizer = torch.optim.Adam(net.parameters(), lr=0.01) + loss_func = cox.CoxPHLoss() + for epoch in range(epochs): + running_loss = 0.0 + for i, data in enumerate(train_dataloader): + x, target = data + optimizer.zero_grad() + output = net(x) + loss = loss_func(output, target[:,0], target[:,1]) # need x, durations, events + loss.backward() + optimizer.step() + running_loss += loss.item() + print(f"epoch: {epoch} -- loss: {running_loss / i}") + + # Predict survival for the test set + # Set net in evaluation mode and turn off gradients + net.eval() + with torch.set_grad_enabled(False): + output = net(x_test) + #Network trains but can't seem to calculate baseline hazards etc + surv = compute_baseline_hazards(input=x_train, target=np.concatenate(get_target(df_train)) + surv_df = pd.DataFrame(surv.numpy().transpose(), labtrans.cuts) + + # Pring the test set concordance index + ev = EvalSurv(surv_df, df_test.duration.values, df_test.event.values) + print(f"Concordance: {ev.concordance_td()}") + + +if __name__ == "__main__": + main() \ No newline at end of file From 49796ab15616ea03f880b51dc94170427448ba2d Mon Sep 17 00:00:00 2001 From: Rohan Shad Date: Tue, 9 Feb 2021 10:00:07 -0800 Subject: [PATCH 6/8] Added torch vanilla and torch lightning examples for cox_ph on metabric. --- examples/lightning_coxph.py | 4 ++++ examples/torch_cox.py | 23 +---------------------- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/examples/lightning_coxph.py b/examples/lightning_coxph.py index 3bdca1b..c198192 100644 --- a/examples/lightning_coxph.py +++ b/examples/lightning_coxph.py @@ -212,6 +212,7 @@ def main(): # Predict survival on testing dataset output = model(dat.x_test) + print(dat.y_test[:,0].size()) cum_haz = coxph.compute_cumulative_baseline_hazards(output, durations=dat.y_test[:,0], events=dat.y_test[:,1]) surv = coxph.output2surv(output, cum_haz[0]) @@ -220,6 +221,9 @@ def main(): print(surv_df) # Calculate the test set concordance index + print(dat.df_test.event.values.shape) + print(type(dat.df_test.event.to_numpy()[0])) + ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values, censor_surv='km') time_grid = np.linspace(dat.df_test.duration.values.min(), dat.df_test.duration.values.max(), 100) diff --git a/examples/torch_cox.py b/examples/torch_cox.py index 7fd1e06..ce69113 100644 --- a/examples/torch_cox.py +++ b/examples/torch_cox.py @@ -90,12 +90,6 @@ def main() -> None: durations_test, events_test = get_target(df_test) val = x_val, y_val - # num_durations = 10 - # labtrans = logistic_hazard.LabTransDiscreteTime(num_durations) - # y_train = labtrans.fit_transform(df_train.duration.values, df_train.event.values) - # y_train_duration = torch.from_numpy(y_train[0]) - # y_train_event = torch.from_numpy(y_train[1]) - # Make an MLP nerual network in_features = x_train.shape[1] out_features = 1 @@ -134,30 +128,15 @@ def main() -> None: net.eval() with torch.set_grad_enabled(False): output = net(x_test) - #Network trains but can't seem to calculate baseline hazards etc - #print(output[1],y_test[1]) - print(y_test.size()) - - - #baseline_haz = coxph.compute_baseline_hazards(output=output, durations=y_test[:,0], events=y_test[:,1]) - #baseline_haz = torch.from_numpy(np.asarray(baseline_haz)) - # print(baseline_haz.size()) - # print(durations.size()) - #print(np.asarray(baseline_haz)) - #Cumulative Hazards Calculation - cum_haz = coxph.compute_cumulative_baseline_hazards(output, durations=y_test[:,0], events=y_test[:,1]) - #print(cum_haz[0]) surv = coxph.output2surv(output, cum_haz[0]) # The dataframe needs to be transposed to format: {rows}: duration, {cols}: each individual - surv_df = pd.DataFrame(surv.transpose(1,0).numpy()) - print(surv_df) - # # Pring the test set concordance index + # print the test set concordance index ev = EvalSurv(surv_df, df_test.duration.values, df_test.event.values, censor_surv='km') time_grid = np.linspace(df_test.duration.values.min(), df_test.duration.values.max(), 100) From ab1212877f007cf84262a731db22267df85797a5 Mon Sep 17 00:00:00 2001 From: rohanshad Date: Mon, 15 Mar 2021 21:58:21 -0700 Subject: [PATCH 7/8] Minimal example for coxph integration with torch lightning --- examples/lightning_coxph.py | 381 +++++++++++++++++------------------- 1 file changed, 180 insertions(+), 201 deletions(-) diff --git a/examples/lightning_coxph.py b/examples/lightning_coxph.py index c198192..0368b0e 100644 --- a/examples/lightning_coxph.py +++ b/examples/lightning_coxph.py @@ -1,6 +1,5 @@ ''' A minimal example of how to fit a cox_PH model with pytorch lightning independent of torchtuples - Original author: Rohan Shad @rohanshad ''' from typing import Tuple @@ -23,212 +22,192 @@ # Lightning Dataset Module class MetaBrick(pl.LightningDataModule): - ''' - Prepares metabric dataset for either discrete time or cox proportional models. - batch_size (int) - batch size, default = 256 - num_durations (int) - number of timepoints to discretize data into (for discrete time models only), default = 10 - num_workers (int) - number of cpu workers to load data, default = 0 - discretize (bool) - Whether or not to discretize data (set as True only for discrete time models), default = False - ''' - def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0, discretize: bool = False): - super().__init__() - self.batch_size = batch_size - self.num_durations = num_durations - self.num_workers = num_workers - self.discretize = discretize - - def setup(self, stage=None): - ''' - Get the METABRICK dataset split into a training dataframe and a testing dataframe. - Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. - ''' - - # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') - df_train = metabric.read_df() - df_test = df_train.sample(frac=0.2) - df_train = df_train.drop(df_test.index) - df_val = df_train.sample(frac=0.2) - df_train = df_train.drop(df_val.index) - - self.x_train, self.x_val, self.x_test = self._preprocess_features(df_train, df_val, df_test) - - if stage == 'fit' or stage is None: - - if self.discretize: - print('Discretizing data...') - self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) - self.y_train = self.labtrans.fit_transform(df_train.duration.values, df_train.event.values) - self.y_train_duration = torch.from_numpy(self.y_train[0]) - self.y_train_event = torch.from_numpy(self.y_train[1]) - - # Create training dataset - self.train_set = TensorDataset( - self.x_train, self.y_train_duration, self.y_train_event) - - # Input and output dimensions for building net - self.in_dims = self.x_train.shape[1] - self.out_dims = self.labtrans.out_features - - else: - # Setup targets (duration, event) - self.y_train = torch.from_numpy(np.concatenate(self._get_target(df_train), axis=1)) - self.y_val = torch.from_numpy(np.concatenate(self._get_target(df_val), axis=1)) - - # Create training and validation datasets - self.train_set = TensorDataset(self.x_train, self.y_train) - self.val_set = TensorDataset(self.x_val, self.y_val) - - # Input and output dimensions for building net - self.in_dims = self.x_train.shape[1] - self.out_dims = 1 - - if stage == 'test' or stage is None: - if self.discretize: - # Returns df_test {pd.DataFrame} for metric calculation - self.df_test = df_test - else: - # Returns correctly preprocessed target y_test {torch.Tensor} and entire df_test {pd.DataFrame} for metric calculations - self.y_test = torch.from_numpy(np.concatenate(self._get_target(df_test), axis=1)) - self.df_test = df_test - - - def train_dataloader(self): - ''' - Build training dataloader - num_workers set to 0 by default because of some thread issue - ''' - train_loader = DataLoader( - dataset=self.train_set, - batch_size=self.batch_size, - shuffle=True, - num_workers=self.num_workers - ) - return train_loader - - @classmethod - def _preprocess_features(cls, df_train: pd.DataFrame, df_val: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: - ''' - Preprocess the covariates of the training, validation, and test set and return a tensor for the - taining covariates and test covariates. - ''' - cols_standardize = ["x0", "x1", "x2", "x3", "x8"] - cols_leave = ["x4", "x5", "x6", "x7"] - - standardize = [([col], StandardScaler()) for col in cols_standardize] - leave = [(col, None) for col in cols_leave] - x_mapper = DataFrameMapper(standardize + leave) - - x_train = x_mapper.fit_transform(df_train).astype("float32") - x_val = x_mapper.transform(df_val).astype("float32") - x_test = x_mapper.transform(df_test).astype("float32") - - return torch.from_numpy(x_train), torch.from_numpy(x_val), torch.from_numpy(x_test) - - def _get_target(cls, df : pd.DataFrame) -> np.ndarray: - ''' - Takes pandas datframe and converts the duration, event targets into np.arrays - ''' - duration = df['duration'].to_numpy().reshape(len(df['duration']),1) - event = df['event'].to_numpy().reshape(len(df['event']),1) - - return duration, event + ''' + Prepares metabric dataset for either discrete time or cox proportional models. + batch_size (int) - batch size, default = 256 + num_durations (int) - number of timepoints to discretize data into (for discrete time models only), default = 10 + num_workers (int) - number of cpu workers to load data, default = 0 + discretize (bool) - Whether or not to discretize data (set as True only for discrete time models), default = False + ''' + def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0, discretize: bool = False): + super().__init__() + self.batch_size = batch_size + self.num_durations = num_durations + self.num_workers = num_workers + self.discretize = discretize + + def setup(self, stage=None): + ''' + Get the METABRICK dataset split into a training dataframe and a testing dataframe. + Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. + ''' + + # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + df_val = df_train.sample(frac=0.2) + df_train = df_train.drop(df_val.index) + + self.x_train, self.x_val, self.x_test = self._preprocess_features(df_train, df_val, df_test) + + if stage == 'fit' or stage is None: + # Setup targets (duration, event) + self.y_train = torch.from_numpy(np.concatenate(self._get_target(df_train), axis=1)) + self.y_val = torch.from_numpy(np.concatenate(self._get_target(df_val), axis=1)) + + # Create training and validation datasets + self.train_set = TensorDataset(self.x_train, self.y_train) + self.val_set = TensorDataset(self.x_val, self.y_val) + + # Input and output dimensions for building net + self.in_dims = self.x_train.shape[1] + self.out_dims = 1 + + if stage == 'test' or stage is None: + # Returns correctly preprocessed target y_test {torch.Tensor} and entire df_test {pd.DataFrame} for metric calculations + self.y_test = torch.from_numpy(np.concatenate(self._get_target(df_test), axis=1)) + self.df_test = df_test + + + def train_dataloader(self): + ''' + Build training dataloader + num_workers set to 0 by default because of some thread issue + ''' + train_loader = DataLoader( + dataset=self.train_set, + batch_size=self.batch_size, + shuffle=True, + num_workers=self.num_workers + ) + return train_loader + + @classmethod + def _preprocess_features(cls, df_train: pd.DataFrame, df_val: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + ''' + Preprocess the covariates of the training, validation, and test set and return a tensor for the + taining covariates and test covariates. + ''' + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_val = x_mapper.transform(df_val).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + + return torch.from_numpy(x_train), torch.from_numpy(x_val), torch.from_numpy(x_test) + + def _get_target(cls, df : pd.DataFrame) -> np.ndarray: + ''' + Takes pandas datframe and converts the duration, event targets into np.arrays + ''' + duration = df['duration'].to_numpy().reshape(len(df['duration']),1) + event = df['event'].to_numpy().reshape(len(df['event']),1) + + return duration, event # Survival model class class SurvModel(pl.LightningModule): - ''' - Defines model, optimizers, forward step, and training step. - Define validation step as def validation_step if needed - Configured to use NLL logistic hazard loss from logistic_hazard.NLLLogisticHazardLoss() - ''' - - def __init__(self, lr, in_features, out_features): - super().__init__() - - self.save_hyperparameters() - self.lr = lr - self.in_features = in_features - self.out_features = out_features - - # Define Model Here (in this case MLP) - self.net = nn.Sequential( - nn.Linear(self.in_features, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, self.out_features), - ) - - # Define loss function: - self.loss_func = coxph.CoxPHLoss() - - def forward(self, x): - batch_size, data = x.size() - x = self.net(x) - return x - - # Training step and validation step usually defined, this dataset only had train + test so left out val. - def training_step(self, batch, batch_idx): - x, target = batch - output = self.forward(x) - - # target variable contains duration and event as a concatenated tensor - loss = self.loss_func(output, target[:,0], target[:,1]) - - # progress bar logging metrics (add custom metric definitions later if useful?) - self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) - return loss - - def configure_optimizers(self): - optimizer = torch.optim.Adam( - self.parameters(), - lr = self.lr - ) - return optimizer + ''' + Defines model, optimizers, forward step, and training step. + Define validation step as def validation_step if needed + Configured to use CoxPH loss from coxph.CoxPHLoss() + ''' + + def __init__(self, lr, in_features, out_features): + super().__init__() + + self.save_hyperparameters() + self.lr = lr + self.in_features = in_features + self.out_features = out_features + + # Define Model Here (in this case MLP) + self.net = nn.Sequential( + nn.Linear(self.in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, self.out_features), + ) + + # Define loss function: + self.loss_func = coxph.CoxPHLoss() + + def forward(self, x): + batch_size, data = x.size() + x = self.net(x) + return x + + # Training step and validation step usually defined, this dataset only had train + test so left out val. + def training_step(self, batch, batch_idx): + x, target = batch + output = self.forward(x) + + # target variable contains duration and event as a concatenated tensor + loss = self.loss_func(output, target[:,0], target[:,1]) + + # progress bar logging metrics (add custom metric definitions later if useful?) + self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) + return loss + + def configure_optimizers(self): + optimizer = torch.optim.Adam( + self.parameters(), + lr = self.lr + ) + return optimizer def main(): - # Load Lightning DataModule - dat = MetaBrick(num_workers=0, discretize=False) - dat.setup('fit') #allows for input / output features to be configured in the model - - # Load Lightning Module - model = SurvModel(lr=1e-2, in_features=dat.in_dims, out_features=dat.out_dims) - trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) - - # Train model - trainer.fit(model,dat) - - # Load final model & freeze - print('Running in Evaluation Mode...') - model.freeze() - - # Setup test data (prepared from lightning module) - dat.setup('test') - - # Predict survival on testing dataset - output = model(dat.x_test) - print(dat.y_test[:,0].size()) - cum_haz = coxph.compute_cumulative_baseline_hazards(output, durations=dat.y_test[:,0], events=dat.y_test[:,1]) - surv = coxph.output2surv(output, cum_haz[0]) - - # The surv_df dataframe needs to be transposed to format: {rows}: duration, {cols}: each individual - surv_df = pd.DataFrame(surv.transpose(1,0).numpy()) - print(surv_df) - - # Calculate the test set concordance index - print(dat.df_test.event.values.shape) - print(type(dat.df_test.event.to_numpy()[0])) - - ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values, censor_surv='km') - time_grid = np.linspace(dat.df_test.duration.values.min(), dat.df_test.duration.values.max(), 100) - - print(f"Concordance: {ev.concordance_td()}") - print(f"Brier Score: {ev.integrated_brier_score(time_grid)}") + # Load Lightning DataModule + dat = MetaBrick(num_workers=0, discretize=False) + dat.setup('fit') #allows for input / output features to be configured in the model + + # Load Lightning Module + model = SurvModel(lr=1e-2, in_features=dat.in_dims, out_features=dat.out_dims) + trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) + + # Train model + trainer.fit(model,dat) + + + # Test model + dat = MetaBrick(num_workers=0, discretize=False) + trainer.test(model,datamodule=dat) + + # Load final model & freeze + print('Running in Evaluation Mode...') + model.freeze() + + # Setup test data (prepared from lightning module) + dat.setup('test') + + # Predict survival on testing dataset + output = model(dat.x_test) + + cum_haz = coxph.compute_cumulative_baseline_hazards(output, durations=dat.y_test[:,0], events=dat.y_test[:,1]) + surv = coxph.output2surv(output, cum_haz[0]) + + # The surv_df dataframe needs to be transposed to format: {rows}: duration, {cols}: each individual + surv_df = pd.DataFrame(surv.transpose(1,0).numpy()) + print(surv_df) + + ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values, censor_surv='km') + time_grid = np.linspace(dat.df_test.duration.values.min(), dat.df_test.duration.values.max(), 100) + + print(f"Concordance: {ev.concordance_td()}") + print(f"Brier Score: {ev.integrated_brier_score(time_grid)}") if __name__ == '__main__': - main() \ No newline at end of file + main() \ No newline at end of file From b7061a693f1f18344f757145572018184c92c48d Mon Sep 17 00:00:00 2001 From: rohanshad Date: Mon, 15 Mar 2021 22:05:59 -0700 Subject: [PATCH 8/8] tabs vs spaces for lightning_logistic_hazard.py --- examples/lightning_logistic_hazard.py | 293 +++++++++++++------------- 1 file changed, 146 insertions(+), 147 deletions(-) diff --git a/examples/lightning_logistic_hazard.py b/examples/lightning_logistic_hazard.py index e3cc2b7..a1d4270 100644 --- a/examples/lightning_logistic_hazard.py +++ b/examples/lightning_logistic_hazard.py @@ -28,158 +28,157 @@ class MetaBrick(pl.LightningDataModule): - def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0): - super().__init__() - self.batch_size = batch_size - self.num_durations = num_durations - self.num_workers = num_workers - - def setup(self, stage=None): - ''' - Get the METABRICK dataset split into a training dataframe and a testing dataframe. - Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. - ''' - - # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') - df_train = metabric.read_df() - df_test = df_train.sample(frac=0.2) - df_train = df_train.drop(df_test.index) - - self.x_train, self.x_test = self._preprocess_features(df_train, df_test) - self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) - - if stage == 'fit' or stage is None: - # Pre-process features and targets - self.y_train = self.labtrans.fit_transform( - df_train.duration.values, df_train.event.values) - self.y_train_duration = torch.from_numpy(self.y_train[0]) - self.y_train_event = torch.from_numpy(self.y_train[1]) - - # Create training dataset - self.train_set = TensorDataset( - self.x_train, self.y_train_duration, self.y_train_event) - - # Input and output dimensions for building net - self.in_dims = self.x_train.shape[1] - self.out_dims = self.labtrans.out_features - - if stage == 'test' or stage is None: - # Return test dataframe - self.df_test = df_test - - def train_dataloader(self): - ''' - Build training dataloader - num_workers set to 0 by default because of some thread issue - ''' - train_loader = DataLoader( - dataset=self.train_set, - batch_size=self.batch_size, - shuffle=True, - num_workers=self.num_workers - ) - return train_loader - - @classmethod - def _preprocess_features(cls, df_train: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: - ''' - Preprocess the covariates of the training and test set and return a tensor for the - taining covariates and test covariates. - ''' - cols_standardize = ["x0", "x1", "x2", "x3", "x8"] - cols_leave = ["x4", "x5", "x6", "x7"] - - standardize = [([col], StandardScaler()) for col in cols_standardize] - leave = [(col, None) for col in cols_leave] - x_mapper = DataFrameMapper(standardize + leave) - - x_train = x_mapper.fit_transform(df_train).astype("float32") - x_test = x_mapper.transform(df_test).astype("float32") - return torch.from_numpy(x_train), torch.from_numpy(x_test) + def __init__(self, batch_size: int = 256, num_durations: int = 10, num_workers: int = 0): + super().__init__() + self.batch_size = batch_size + self.num_durations = num_durations + self.num_workers = num_workers + + def setup(self, stage=None): + ''' + Get the METABRICK dataset split into a training dataframe and a testing dataframe. + Preprocesses features and targets (duration and event), discretize time into 'num_duration' equidistant points. + ''' + + # Load and split dataset into train and test (if there's train and val this can be called within stage == 'fit') + df_train = metabric.read_df() + df_test = df_train.sample(frac=0.2) + df_train = df_train.drop(df_test.index) + + self.x_train, self.x_test = self._preprocess_features(df_train, df_test) + self.labtrans = logistic_hazard.LabTransDiscreteTime(self.num_durations) + + if stage == 'fit' or stage is None: + # Pre-process features and targets + self.y_train = self.labtrans.fit_transform( + df_train.duration.values, df_train.event.values) + self.y_train_duration = torch.from_numpy(self.y_train[0]) + self.y_train_event = torch.from_numpy(self.y_train[1]) + + # Create training dataset + self.train_set = TensorDataset( + self.x_train, self.y_train_duration, self.y_train_event) + + # Input and output dimensions for building net + self.in_dims = self.x_train.shape[1] + self.out_dims = self.labtrans.out_features + + if stage == 'test' or stage is None: + # Return test dataframe + self.df_test = df_test + + def train_dataloader(self): + ''' + Build training dataloader + num_workers set to 0 by default because of some thread issue + ''' + train_loader = DataLoader( + dataset=self.train_set, + batch_size=self.batch_size, + shuffle=True, + num_workers=self.num_workers + ) + return train_loader + + @classmethod + def _preprocess_features(cls, df_train: pd.DataFrame, df_test: pd.DataFrame) -> Tuple[torch.Tensor]: + ''' + Preprocess the covariates of the training and test set and return a tensor for the + taining covariates and test covariates. + ''' + cols_standardize = ["x0", "x1", "x2", "x3", "x8"] + cols_leave = ["x4", "x5", "x6", "x7"] + + standardize = [([col], StandardScaler()) for col in cols_standardize] + leave = [(col, None) for col in cols_leave] + x_mapper = DataFrameMapper(standardize + leave) + + x_train = x_mapper.fit_transform(df_train).astype("float32") + x_test = x_mapper.transform(df_test).astype("float32") + return torch.from_numpy(x_train), torch.from_numpy(x_test) # Survival model class - class SurvModel(pl.LightningModule): - ''' - Defines model, optimizers, forward step, and training step. - Define validation step as def validation_step if needed - Configured to use NLL logistic hazard loss from logistic_hazard.NLLLogisticHazardLoss() - ''' - - def __init__(self, lr, in_features, out_features): - super().__init__() - - self.save_hyperparameters() - self.lr = lr - self.in_features = in_features - self.out_features = out_features - - # Define Model Here (in this case MLP) - self.net = nn.Sequential( - nn.Linear(self.in_features, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, 32), - nn.ReLU(), - nn.BatchNorm1d(32), - nn.Dropout(0.1), - nn.Linear(32, self.out_features), - ) - - # Define loss function: - self.loss_func = logistic_hazard.NLLLogistiHazardLoss() - - def forward(self, x): - batch_size, data = x.size() - x = self.net(x) - return x - - # Training step and validation step usually defined, this dataset only had train + test so left out val. - def training_step(self, batch, batch_idx): - x, duration, event = batch - output = self.forward(x) - loss = self.loss_func(output, duration, event) - - # progress bar logging metrics (add custom metric definitions later if useful?) - self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) - return loss - - def configure_optimizers(self): - optimizer = torch.optim.Adam( - self.parameters(), - lr = self.lr - ) - return optimizer + ''' + Defines model, optimizers, forward step, and training step. + Define validation step as def validation_step if needed + Configured to use NLL logistic hazard loss from logistic_hazard.NLLLogisticHazardLoss() + ''' + + def __init__(self, lr, in_features, out_features): + super().__init__() + + self.save_hyperparameters() + self.lr = lr + self.in_features = in_features + self.out_features = out_features + + # Define Model Here (in this case MLP) + self.net = nn.Sequential( + nn.Linear(self.in_features, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, 32), + nn.ReLU(), + nn.BatchNorm1d(32), + nn.Dropout(0.1), + nn.Linear(32, self.out_features), + ) + + # Define loss function: + self.loss_func = logistic_hazard.NLLLogistiHazardLoss() + + def forward(self, x): + batch_size, data = x.size() + x = self.net(x) + return x + + # Training step and validation step usually defined, this dataset only had train + test so left out val. + def training_step(self, batch, batch_idx): + x, duration, event = batch + output = self.forward(x) + loss = self.loss_func(output, duration, event) + + # progress bar logging metrics (add custom metric definitions later if useful?) + self.log('loss', loss, on_step=True, on_epoch=True, prog_bar=True) + return loss + + def configure_optimizers(self): + optimizer = torch.optim.Adam( + self.parameters(), + lr = self.lr + ) + return optimizer def main(): - # Load Lightning DataModule - dat = MetaBrick(num_workers=0) - dat.setup('fit') #allows for input / output features to be configured in the model - - # Load Lightning Module - model = SurvModel(lr=1e-3, in_features=dat.in_dims, out_features=dat.out_dims) - trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) - - # Train model - trainer.fit(model,dat) - - # Load final model & freeze - print('Running in Evaluation Mode...') - model.freeze() - - # Setup test data (prepared from lightning module) - dat.setup('test') - - # Predict survival on testing dataset - output = model(dat.x_test) - surv = logistic_hazard.output2surv(output) - surv_df = pd.DataFrame(surv.numpy().transpose(), dat.labtrans.cuts) - ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values) - - # Print evaluation metrics - print(f"Concordance: {ev.concordance_td()}") + # Load Lightning DataModule + dat = MetaBrick(num_workers=0) + dat.setup('fit') #allows for input / output features to be configured in the model + + # Load Lightning Module + model = SurvModel(lr=1e-3, in_features=dat.in_dims, out_features=dat.out_dims) + trainer = pl.Trainer(gpus=0, num_sanity_val_steps=0, max_epochs=20, fast_dev_run=False) + + # Train model + trainer.fit(model,dat) + + # Load final model & freeze + print('Running in Evaluation Mode...') + model.freeze() + + # Setup test data (prepared from lightning module) + dat.setup('test') + + # Predict survival on testing dataset + output = model(dat.x_test) + surv = logistic_hazard.output2surv(output) + surv_df = pd.DataFrame(surv.numpy().transpose(), dat.labtrans.cuts) + ev = EvalSurv(surv_df, dat.df_test.duration.values, dat.df_test.event.values) + + # Print evaluation metrics + print(f"Concordance: {ev.concordance_td()}") if __name__ == '__main__': - main() \ No newline at end of file + main() \ No newline at end of file