From 0634066ecd55d1d1ee6ddc212d73a23c3282f59a Mon Sep 17 00:00:00 2001 From: Asim Waheed Date: Tue, 21 Apr 2026 18:31:36 -0400 Subject: [PATCH] refactor(examples): standardize all pipelines to use load_or_train All six pipelines replace manual if-exists/torch.load/torch.save blocks with load_or_train(), fixing the torch.device() type bug in --device defaults and expanding --dataset help strings to include utkface. run_adversarial_training adds post-hoc adversarial accuracy comparison. run_dp_sgd logs the accuracy trade-off. get_started.py rewritten to cover the full attack lifecycle with AmuletDataset and the new APIs. --- examples/attack_pipelines/run_evasion.py | 33 ++-- .../run_membership_inference.py | 35 ++--- .../run_adversarial_training.py | 74 +++++---- examples/defense_pipelines/run_dp_sgd.py | 67 ++++---- .../defense_pipelines/run_outlier_removal.py | 60 +++---- examples/extending_amulet/custom_metric.md | 2 +- examples/extending_amulet/custom_risk.md | 9 +- examples/get_started.py | 147 ++++++++++-------- 8 files changed, 231 insertions(+), 196 deletions(-) diff --git a/examples/attack_pipelines/run_evasion.py b/examples/attack_pipelines/run_evasion.py index 4df6f78..233682c 100644 --- a/examples/attack_pipelines/run_evasion.py +++ b/examples/attack_pipelines/run_evasion.py @@ -1,6 +1,7 @@ import sys sys.path.append("../../") + import argparse import logging from pathlib import Path @@ -14,6 +15,7 @@ get_accuracy, initialize_model, load_data, + load_or_train, train_classifier, ) @@ -31,7 +33,7 @@ def parse_args() -> argparse.Namespace: "--dataset", type=str, default="celeba", - help="Options: cifar10, fmnist, lfw, census, celeba, cifar100.", + help="Options: cifar10, cifar100, fmnist, mnist, lfw, census, celeba, utkface.", ) parser.add_argument( "--model", type=str, default="vgg", help="Options: resnet, vgg, cnn, linearnet." @@ -54,7 +56,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--device", type=str, - default=torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu"), + default="cuda:0" if torch.cuda.is_available() else "cpu", help="Device on which to run PyTorch", ) parser.add_argument( @@ -93,30 +95,25 @@ def main(args: argparse.Namespace) -> None: # Set up filename and directories to save/load models models_path = root_dir / "saved_models" filename = f"{args.dataset}_{args.model}_{args.model_capacity}_{args.training_size * 100}_{args.batch_size}_{args.epochs}_{args.exp_id}.pt" - target_model_path = models_path / "target" - target_model_filename = target_model_path / filename + target_model_filename = models_path / "target" / filename # Train or Load Target Model criterion = torch.nn.CrossEntropyLoss() - if target_model_filename.exists(): - log.info("Target model loaded from %s", target_model_filename) - target_model = torch.load(target_model_filename).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - else: - log.info("Training target model") - target_model = initialize_model( + def _init_target(): + return initialize_model( args.model, args.model_capacity, data.num_features, data.num_classes, log ).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - target_model = train_classifier( - target_model, train_loader, criterion, optimizer, args.epochs, args.device + + def _train_target(model): + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + return train_classifier( + model, train_loader, criterion, optimizer, args.epochs, args.device ) - log.info("Target model trained") - # Save model - create_dir(target_model_path, log) - torch.save(target_model, target_model_filename) + target_model = load_or_train( + target_model_filename, _init_target, _train_target, log, "target model" + ) test_accuracy_target = get_accuracy(target_model, test_loader, args.device) log.info("Test accuracy of target model: %s", test_accuracy_target) diff --git a/examples/attack_pipelines/run_membership_inference.py b/examples/attack_pipelines/run_membership_inference.py index 2ed3732..5b58d3f 100644 --- a/examples/attack_pipelines/run_membership_inference.py +++ b/examples/attack_pipelines/run_membership_inference.py @@ -1,6 +1,7 @@ import sys sys.path.append("../../") + import argparse import logging from pathlib import Path @@ -16,6 +17,7 @@ get_accuracy, initialize_model, load_data, + load_or_train, train_classifier, ) @@ -56,7 +58,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--device", type=str, - default=torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu"), + default="cuda:0" if torch.cuda.is_available() else "cpu", help="Device on which to run PyTorch", ) parser.add_argument( @@ -64,7 +66,7 @@ def parse_args() -> argparse.Namespace: ) parser.add_argument( "--pkeep", - type=int, + type=float, default=0.5, help="Proportion of data to keep for training.", ) @@ -81,7 +83,7 @@ def main(args: argparse.Namespace) -> None: log_dir = root_dir / "logs" create_dir(log_dir) logging.basicConfig( - level=logging.INFO, filename=log_dir / "evasion.log", filemode="w" + level=logging.INFO, filename=log_dir / "membership_inference.log", filemode="w" ) log = logging.getLogger("All") log.addHandler(logging.StreamHandler()) @@ -113,30 +115,25 @@ def main(args: argparse.Namespace) -> None: # Set up filename and directories to save/load models models_path = root_dir / "saved_models" filename = f"{args.dataset}_{args.model}_{args.model_capacity}_{args.training_size * 100}_{args.batch_size}_{args.epochs}_{args.exp_id}.pt" - target_model_path = models_path / "target" - target_model_filename = target_model_path / filename + target_model_filename = models_path / "target" / filename # Train or Load Target Model criterion = torch.nn.CrossEntropyLoss() - if target_model_filename.exists(): - log.info("Target model loaded from %s", target_model_filename) - target_model = torch.load(target_model_filename).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - else: - log.info("Training target model") - target_model = initialize_model( + def _init_target(): + return initialize_model( args.model, args.model_capacity, data.num_features, data.num_classes, log ).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - target_model = train_classifier( - target_model, train_loader, criterion, optimizer, args.epochs, args.device + + def _train_target(model): + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + return train_classifier( + model, train_loader, criterion, optimizer, args.epochs, args.device ) - log.info("Target model trained") - # Save model - create_dir(target_model_path, log) - torch.save(target_model, target_model_filename) + target_model = load_or_train( + target_model_filename, _init_target, _train_target, log, "target model" + ) test_accuracy_target = get_accuracy(target_model, test_loader, args.device) log.info("Test accuracy of target model: %s", test_accuracy_target) diff --git a/examples/defense_pipelines/run_adversarial_training.py b/examples/defense_pipelines/run_adversarial_training.py index bf66822..dea045f 100644 --- a/examples/defense_pipelines/run_adversarial_training.py +++ b/examples/defense_pipelines/run_adversarial_training.py @@ -1,6 +1,7 @@ import sys sys.path.append("../../") + import argparse import logging from pathlib import Path @@ -8,12 +9,14 @@ import torch from torch.utils.data import DataLoader +from amulet.evasion.attacks import EvasionPGD from amulet.evasion.defenses import AdversarialTrainingPGD from amulet.utils import ( create_dir, get_accuracy, initialize_model, load_data, + load_or_train, train_classifier, ) @@ -31,7 +34,7 @@ def parse_args() -> argparse.Namespace: "--dataset", type=str, default="celeba", - help="Options: cifar10, fmnist, lfw, census, celeba.", + help="Options: cifar10, cifar100, fmnist, mnist, lfw, census, celeba, utkface.", ) parser.add_argument( "--model", type=str, default="vgg", help="Options: vgg, linearnet." @@ -54,7 +57,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--device", type=str, - default=torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu"), + default="cuda:0" if torch.cuda.is_available() else "cpu", help="Device on which to run PyTorch", ) parser.add_argument( @@ -93,45 +96,41 @@ def main(args: argparse.Namespace) -> None: # Set up filename and directories to save/load models models_path = root_dir / "saved_models" filename = f"{args.dataset}_{args.model}_{args.model_capacity}_{args.training_size * 100}_{args.batch_size}_{args.epochs}_{args.exp_id}.pt" - target_model_path = models_path / "target" - target_model_filename = target_model_path / filename + target_model_filename = models_path / "target" / filename # Train or Load Target Model criterion = torch.nn.CrossEntropyLoss() - if target_model_filename.exists(): - log.info("Target model loaded from %s", target_model_filename) - target_model = torch.load(target_model_filename).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - else: - log.info("Training target model") - target_model = initialize_model( + def _init_target(): + return initialize_model( args.model, args.model_capacity, data.num_features, data.num_classes, log ).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - target_model = train_classifier( - target_model, train_loader, criterion, optimizer, args.epochs, args.device + + def _train_target(model): + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + return train_classifier( + model, train_loader, criterion, optimizer, args.epochs, args.device ) - log.info("Target model trained") - # Save model - create_dir(target_model_path, log) - torch.save(target_model, target_model_filename) + target_model = load_or_train( + target_model_filename, _init_target, _train_target, log, "target model" + ) test_accuracy_target = get_accuracy(target_model, test_loader, args.device) log.info("Test accuracy of target model: %s", test_accuracy_target) # Train or load model with Adversarial Training - defended_model_path = ( - models_path / "adversarial_training" / f"epsilon_{args.epsilon}" + defended_model_filename = ( + models_path / "adversarial_training" / f"epsilon_{args.epsilon}" / filename ) - defended_model_filename = defended_model_path / filename - if defended_model_filename.exists(): - log.info("Defended model loaded from %s", defended_model_filename) - defended_model = torch.load(defended_model_filename) - else: - log.info("Retraining Model with Adversarial Training") + def _init_defended(): + return initialize_model( + args.model, args.model_capacity, data.num_features, data.num_classes, log + ).to(args.device) + + def _train_defended(_model): + optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) adv_training = AdversarialTrainingPGD( target_model, criterion, @@ -141,15 +140,30 @@ def main(args: argparse.Namespace) -> None: args.epochs, args.epsilon, ) - defended_model = adv_training.train_robust() + return adv_training.train_robust() - # Save model - create_dir(defended_model_path, log) - torch.save(defended_model, defended_model_filename) + defended_model = load_or_train( + defended_model_filename, _init_defended, _train_defended, log, "defended model" + ) test_accuracy_defended = get_accuracy(defended_model, test_loader, args.device) log.info("Test accuracy of defended model: %s", test_accuracy_defended) + # Show the defense benefit: compare adversarial accuracy before and after + evasion = EvasionPGD( + target_model, test_loader, args.device, args.batch_size, args.epsilon + ) + adv_loader = evasion.attack() + adv_accuracy_target = get_accuracy(target_model, adv_loader, args.device) + log.info("Adversarial accuracy (undefended): %s", adv_accuracy_target) + + evasion_def = EvasionPGD( + defended_model, test_loader, args.device, args.batch_size, args.epsilon + ) + adv_loader_def = evasion_def.attack() + adv_accuracy_defended = get_accuracy(defended_model, adv_loader_def, args.device) + log.info("Adversarial accuracy (defended): %s", adv_accuracy_defended) + if __name__ == "__main__": args = parse_args() diff --git a/examples/defense_pipelines/run_dp_sgd.py b/examples/defense_pipelines/run_dp_sgd.py index 07a4ec2..188a1f6 100644 --- a/examples/defense_pipelines/run_dp_sgd.py +++ b/examples/defense_pipelines/run_dp_sgd.py @@ -1,6 +1,7 @@ import sys sys.path.append("../../") + import argparse import logging from pathlib import Path @@ -14,6 +15,7 @@ get_accuracy, initialize_model, load_data, + load_or_train, train_classifier, ) @@ -31,7 +33,7 @@ def parse_args() -> argparse.Namespace: "--dataset", type=str, default="celeba", - help="Options: cifar10, fmnist, lfw, census, celeba.", + help="Options: cifar10, cifar100, fmnist, mnist, lfw, census, celeba, utkface.", ) parser.add_argument( "--model", type=str, default="vgg", help="Options: vgg, linearnet." @@ -54,7 +56,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--device", type=str, - default=torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu"), + default="cuda:0" if torch.cuda.is_available() else "cpu", help="Device on which to run PyTorch", ) parser.add_argument( @@ -110,44 +112,35 @@ def main(args: argparse.Namespace) -> None: # Set up filename and directories to save/load models models_path = root_dir / "saved_models" filename = f"{args.dataset}_{args.model}_{args.model_capacity}_{args.training_size * 100}_{args.batch_size}_{args.epochs}_{args.exp_id}.pt" - target_model_path = models_path / "target" - target_model_filename = target_model_path / filename + target_model_filename = models_path / "target" / filename # Train or Load Target Model criterion = torch.nn.CrossEntropyLoss() - if target_model_filename.exists(): - log.info("Target model loaded from %s", target_model_filename) - target_model = torch.load(target_model_filename).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - else: - log.info("Training target model") - target_model = initialize_model( + def _init_target(): + return initialize_model( args.model, args.model_capacity, data.num_features, data.num_classes, log ).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - target_model = train_classifier( - target_model, train_loader, criterion, optimizer, args.epochs, args.device + + def _train_target(model): + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + return train_classifier( + model, train_loader, criterion, optimizer, args.epochs, args.device ) - log.info("Target model trained") - # Save model - create_dir(target_model_path, log) - torch.save(target_model, target_model_filename) + target_model = load_or_train( + target_model_filename, _init_target, _train_target, log, "target model" + ) test_accuracy_target = get_accuracy(target_model, test_loader, args.device) log.info("Test accuracy of target model: %s", test_accuracy_target) - # Train or load model with DP Training - defended_model_path = models_path / "dp_sgd" / f"delta_{args.delta}" - defended_model_filename = defended_model_path / filename + # Train or load model with DP Training. Opacus cannot handle batch norm so + # the DP model is initialised without BN layers. + defended_model_filename = models_path / "dp_sgd" / f"delta_{args.delta}" / filename - if defended_model_filename.exists(): - log.info("Defended model loaded from %s", defended_model_filename) - defended_model = torch.load(defended_model_filename) - else: - log.info("Retraining Model with dp Training") - defended_model = initialize_model( + def _init_defended(): + return initialize_model( args.model, args.model_capacity, data.num_features, @@ -155,9 +148,11 @@ def main(args: argparse.Namespace) -> None: log, batch_norm=False, ).to(args.device) - optimizer = torch.optim.Adam(defended_model.parameters(), lr=1e-3) + + def _train_defended(model): + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) dp_training = DPSGD( - defended_model, + model, criterion, optimizer, train_loader, @@ -168,14 +163,20 @@ def main(args: argparse.Namespace) -> None: args.secure_rng, args.epochs, ) - defended_model = dp_training.train_private() + return dp_training.train_private() - # Save model - create_dir(defended_model_path, log) - torch.save(defended_model, defended_model_filename) + defended_model = load_or_train( + defended_model_filename, _init_defended, _train_defended, log, "defended model" + ) test_accuracy_defended = get_accuracy(defended_model, test_loader, args.device) log.info("Test accuracy of defended model: %s", test_accuracy_defended) + log.info( + "Accuracy trade-off: baseline %.4f -> DP-SGD (delta=%s) %.4f", + test_accuracy_target, + args.delta, + test_accuracy_defended, + ) if __name__ == "__main__": diff --git a/examples/defense_pipelines/run_outlier_removal.py b/examples/defense_pipelines/run_outlier_removal.py index a474089..46287cb 100644 --- a/examples/defense_pipelines/run_outlier_removal.py +++ b/examples/defense_pipelines/run_outlier_removal.py @@ -1,6 +1,7 @@ import sys sys.path.append("../../") + import argparse import logging from pathlib import Path @@ -14,6 +15,7 @@ get_accuracy, initialize_model, load_data, + load_or_train, train_classifier, ) @@ -31,7 +33,7 @@ def parse_args() -> argparse.Namespace: "--dataset", type=str, default="celeba", - help="Options: cifar10, fmnist, lfw, census, celeba.", + help="Options: cifar10, cifar100, fmnist, mnist, lfw, census, celeba, utkface.", ) parser.add_argument( "--model", type=str, default="vgg", help="Options: vgg, linearnet." @@ -54,7 +56,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--device", type=str, - default=torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu"), + default="cuda:0" if torch.cuda.is_available() else "cpu", help="Device on which to run PyTorch", ) parser.add_argument( @@ -95,41 +97,39 @@ def main(args: argparse.Namespace) -> None: filename = f"{args.dataset}_{args.model}_{args.model_capacity}_{args.training_size * 100}_{args.batch_size}_{args.epochs}_{args.exp_id}.pt" # Train or Load Target Model - target_model_path = models_path / "target" - target_model_filename = target_model_path / filename + target_model_filename = models_path / "target" / filename criterion = torch.nn.CrossEntropyLoss() - if target_model_filename.exists(): - log.info("Target model loaded from %s", target_model_filename) - target_model = torch.load(target_model_filename).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - else: - log.info("Training target model") - target_model = initialize_model( + def _init_target(): + return initialize_model( args.model, args.model_capacity, data.num_features, data.num_classes, log ).to(args.device) - optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - target_model = train_classifier( - target_model, train_loader, criterion, optimizer, args.epochs, args.device + + def _train_target(model): + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + return train_classifier( + model, train_loader, criterion, optimizer, args.epochs, args.device ) - log.info("Target model trained") - # Save model - create_dir(target_model_path, log) - torch.save(target_model, target_model_filename) + target_model = load_or_train( + target_model_filename, _init_target, _train_target, log, "target model" + ) test_accuracy_target = get_accuracy(target_model, test_loader, args.device) log.info("Test accuracy of target model: %s", test_accuracy_target) # Train or Load model with Outlier Removal - defended_model_path = models_path / "outlier_removal" / f"percent_{args.percent}" - defended_model_filename = defended_model_path / filename - - if defended_model_filename.exists(): - log.info("Defended model loaded from %s", defended_model_filename) - defended_model = torch.load(defended_model_filename) - else: - log.info("Retraining Model after Outlier Removal") + defended_model_filename = ( + models_path / "outlier_removal" / f"percent_{args.percent}" / filename + ) + + def _init_defended(): + return initialize_model( + args.model, args.model_capacity, data.num_features, data.num_classes, log + ).to(args.device) + + def _train_defended(_model): + optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) outlier_removal = OutlierRemoval( target_model, criterion, @@ -141,11 +141,11 @@ def main(args: argparse.Namespace) -> None: batch_size=args.batch_size, percent=args.percent, ) - defended_model = outlier_removal.train_robust() + return outlier_removal.train_robust() - # Save model - create_dir(defended_model_path, log) - torch.save(defended_model, defended_model_filename) + defended_model = load_or_train( + defended_model_filename, _init_defended, _train_defended, log, "defended model" + ) test_accuracy_outlier_removed = get_accuracy( defended_model, test_loader, args.device diff --git a/examples/extending_amulet/custom_metric.md b/examples/extending_amulet/custom_metric.md index ba9d090..14160d3 100644 --- a/examples/extending_amulet/custom_metric.md +++ b/examples/extending_amulet/custom_metric.md @@ -8,7 +8,7 @@ Metrics in Amulet are implemented as standalone functions and grouped by risk. Create a new metrics directory under the corresponding risk: -``` +```text amulet/test_time_adaptation/metrics/ ``` diff --git a/examples/extending_amulet/custom_risk.md b/examples/extending_amulet/custom_risk.md index 99b6122..dbc406c 100644 --- a/examples/extending_amulet/custom_risk.md +++ b/examples/extending_amulet/custom_risk.md @@ -7,7 +7,7 @@ A risk encapsulates one or more attacks and defines how a model can be evaluated Create a new directory under `amulet/` for the risk: -``` +```text amulet/test_time_adaptation/ amulet/test_time_adaptation/attacks/ ``` @@ -15,18 +15,19 @@ amulet/test_time_adaptation/attacks/ ## Step 2: Implement the Attack Add a new attack file under the `attacks` subdirectory. +All attacks should inherit from the `RiskAttack` base class (or a risk-specific base class if one exists). **File:** `amulet/test_time_adaptation/attacks/test_time_data_poisoning.py` ```python import torch import torch.nn as nn +from .test_time_adaptation_attack import TestTimeAdaptationAttack -class TestTimeDataPoisoning: +class TestTimeDataPoisoning(TestTimeAdaptationAttack): def __init__(self, target_model: nn.Module, test_loader, device): - self.model = target_model + super().__init__(target_model, device) self.test_loader = test_loader - self.device = device def attack(self) -> torch.utils.data.TensorDataset: # Adapt model parameters using adversarial test-time inputs diff --git a/examples/get_started.py b/examples/get_started.py index 1468c26..7020fcb 100644 --- a/examples/get_started.py +++ b/examples/get_started.py @@ -1,10 +1,14 @@ """ Simple end-to-end guide to evaluate your model with Amulet, with and without a defense. -""" -import sys +Trains a classifier on CIFAR-10, runs a PGD evasion attack, then defends with +adversarial training and reruns the attack to show the accuracy trade-off. + +Run from the repo root: + uv run python examples/get_started.py +""" -sys.path.append("../") +import logging from pathlib import Path import torch @@ -19,62 +23,83 @@ train_classifier, ) -# Set up constants -root_dir = Path("../") -random_seed = 123 -device = f"cuda:{0}" if torch.cuda.is_available() else "cpu" -epochs = 10 - -# Set random seeds for reproducibility -torch.manual_seed(random_seed) - -# Load dataset and create data loaders -dataset = "celeba" # Possible options: ['cifar10', 'fmnist', 'lfw', 'census', 'celeba'] - -data = load_data(root_dir, dataset) -train_loader = DataLoader(dataset=data.train_set, batch_size=256, shuffle=False) -test_loader = DataLoader(dataset=data.test_set, batch_size=256, shuffle=False) - -# Train Target Model -criterion = torch.nn.CrossEntropyLoss() -target_model = initialize_model( - model_arch="vgg", - model_capacity="m1", - num_features=data.num_features, - num_classes=data.num_classes, -).to(device) -optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) - -target_model = train_classifier( - target_model, train_loader, criterion, optimizer, epochs, device -) -# Test Model -test_accuracy_target = get_accuracy(target_model, test_loader, device) -print("Test accuracy of target model: %s", test_accuracy_target) - -# Run Evasion -evasion = EvasionPGD(target_model, test_loader, device, batch_size=256, epsilon=32) -adversarial_test_loader = evasion.attack() -adv_accuracy = get_accuracy(target_model, adversarial_test_loader, device) -print("Adversarial accuracy of target model: %s", adv_accuracy) - -# Defend model with Adversarial Training -adv_training = AdversarialTrainingPGD( - target_model, - criterion, - optimizer, - train_loader, - device, - epochs, - epsilon=32, -) -defended_model = adv_training.train_robust() -test_accuracy_defended = get_accuracy(defended_model, test_loader, device) -print("Test accuracy of defended model: %s", test_accuracy_defended) - -# Run Evasion against defended model -evasion = EvasionPGD(defended_model, test_loader, device, batch_size=256, epsilon=32) -adversarial_test_loader = evasion.attack() -adv_accuracy = get_accuracy(defended_model, adversarial_test_loader, device) -print("Adversarial accuracy of defended model: %s", adv_accuracy) +def main() -> None: + logging.basicConfig(level=logging.INFO, format="%(message)s") + log = logging.getLogger(__name__) + + root_dir = Path(__file__).parent.parent + device = "cuda:0" if torch.cuda.is_available() else "cpu" + random_seed = 123 + epochs = 10 + batch_size = 256 + epsilon = 0.1 + + torch.manual_seed(random_seed) + + # cifar10 and fmnist download automatically; no external data needed. + dataset = "cifar10" # Options: cifar10, fmnist, lfw, census, celeba + data = load_data(root_dir, dataset) + train_loader = DataLoader( + dataset=data.train_set, batch_size=batch_size, shuffle=False + ) + test_loader = DataLoader( + dataset=data.test_set, batch_size=batch_size, shuffle=False + ) + + criterion = torch.nn.CrossEntropyLoss() + target_model = initialize_model( + model_arch="vgg", + model_capacity="m1", + num_features=data.num_features, + num_classes=data.num_classes, + ).to(device) + optimizer = torch.optim.Adam(target_model.parameters(), lr=1e-3) + target_model = train_classifier( + target_model, train_loader, criterion, optimizer, epochs, device + ) + + test_acc = get_accuracy(target_model, test_loader, device) + log.info("Clean test accuracy: %.4f", test_acc) + + # Attack undefended model + evasion = EvasionPGD( + target_model, test_loader, device, batch_size=batch_size, epsilon=epsilon + ) + adv_loader = evasion.attack() + adv_acc = get_accuracy(target_model, adv_loader, device) + log.info("Adversarial accuracy (undefended): %.4f", adv_acc) + + # Defend with adversarial training, then rerun attack + adv_training = AdversarialTrainingPGD( + target_model, + criterion, + optimizer, + train_loader, + device, + epochs, + epsilon=epsilon, + ) + defended_model = adv_training.train_robust() + + test_acc_defended = get_accuracy(defended_model, test_loader, device) + log.info("Clean test accuracy (defended): %.4f", test_acc_defended) + + evasion_def = EvasionPGD( + defended_model, test_loader, device, batch_size=batch_size, epsilon=epsilon + ) + adv_loader_def = evasion_def.attack() + adv_acc_defended = get_accuracy(defended_model, adv_loader_def, device) + log.info("Adversarial accuracy (defended): %.4f", adv_acc_defended) + + log.info( + "Summary: clean %.4f -> %.4f | adversarial %.4f -> %.4f", + test_acc, + test_acc_defended, + adv_acc, + adv_acc_defended, + ) + + +if __name__ == "__main__": + main()