From 4f6a5661523c3d89357389934bbf6276ae9a2c51 Mon Sep 17 00:00:00 2001 From: Iden Kalemaj Date: Thu, 19 Mar 2026 12:48:59 -0700 Subject: [PATCH] LIA Attack with custom y1 generation (#115) Summary: To support extending LIA to other models besides binary classification models we add support for: - custom generation of synthetic `y1` labels - custom score computation functions Reviewed By: lucamelis Differential Revision: D92169628 --- .../analysis/lia/lia_analysis_node.py | 50 ++++++++-- .../analysis/tests/test_lia_analysis_node.py | 95 +++++++++++++++++++ privacy_guard/attacks/lia_attack.py | 46 ++++++++- .../attacks/tests/test_lia_attack.py | 93 ++++++++++++++++++ 4 files changed, 271 insertions(+), 13 deletions(-) diff --git a/privacy_guard/analysis/lia/lia_analysis_node.py b/privacy_guard/analysis/lia/lia_analysis_node.py index fa2748b..711f256 100644 --- a/privacy_guard/analysis/lia/lia_analysis_node.py +++ b/privacy_guard/analysis/lia/lia_analysis_node.py @@ -16,7 +16,7 @@ import logging from dataclasses import dataclass -from typing import List, Tuple +from typing import Callable, List, Tuple import numpy as np import torch @@ -72,7 +72,34 @@ def __init__( with_timer: bool = False, power: float = 0.0, use_fnr_and_tnr: bool = False, + score_computation_function: Callable[ + [np.ndarray, np.ndarray, np.ndarray], + np.ndarray, + ] + | None = None, ) -> None: + """ + Args: + analysis_input: LIA analysis input data + delta: privacy parameter delta for epsilon computation + num_bootstrap_resampling_times: number of bootstrap samples for CI estimation + cap_eps: whether to cap epsilon at a finite upper bound + show_progress: whether to show a progress bar + with_timer: whether to record timing statistics + power: exponent applied to prob_diff_label in the score function + use_fnr_and_tnr: whether to use FNR/TNR in addition to FPR/TPR thresholds + score_computation_function: optional function to compute per-sample scores. + Signature: (received_labels, y1_probs, predictions) -> scores + received_labels: labels received by the adversary, + np.ndarray of shape (num_samples,) + y1_probs: predictions used for generating synthetic labels y1, + np.ndarray of shape (num_samples,) + predictions: target model predictions, + np.ndarray of shape (num_samples,) + Returns np.ndarray of shape (num_samples,). + The train/test split is applied after this function returns. + If None, uses the default log-likelihood ratio score. + """ if power < 0: raise ValueError("Power used for score function must be non-negative") @@ -85,6 +112,7 @@ def __init__( self._timer_stats: dict[str, float] = {} self._power = power self._use_fnr_and_tnr = use_fnr_and_tnr + self.score_computation_function = score_computation_function def compute_scores(self, i: int) -> Tuple[torch.Tensor, torch.Tensor]: """ @@ -97,20 +125,24 @@ def compute_scores(self, i: int) -> Tuple[torch.Tensor, torch.Tensor]: Tuple[torch.Tensor, torch.Tensor]: scores for samples with training labels and reconstructed labels """ - true_bits = self._analysis_input.true_bits[i] - received_labels = self._analysis_input.received_labels[i] y1_probs = self._analysis_input.predictions_y1_generation predictions = self._analysis_input.predictions - prob_train = np.where(received_labels == 1, predictions, 1 - predictions) - prob_reconstruct = np.where(received_labels == 1, y1_probs, 1 - y1_probs) - prob_diff_label = np.where(received_labels == 1, 1 - y1_probs, y1_probs) + if self.score_computation_function is not None: + scores = self.score_computation_function( + received_labels, y1_probs, predictions + ) + else: + prob_train = np.where(received_labels == 1, predictions, 1 - predictions) + prob_reconstruct = np.where(received_labels == 1, y1_probs, 1 - y1_probs) + prob_diff_label = np.where(received_labels == 1, 1 - y1_probs, y1_probs) - scores = ( - np.log(prob_train + 1e-8) - np.log(prob_reconstruct + 1e-8) - ) * prob_diff_label**self._power + scores = ( + np.log(prob_train + 1e-8) - np.log(prob_reconstruct + 1e-8) + ) * prob_diff_label**self._power + true_bits = self._analysis_input.true_bits[i] scores_train = torch.tensor(scores[true_bits == 0]) scores_test = torch.tensor(scores[true_bits == 1]) diff --git a/privacy_guard/analysis/tests/test_lia_analysis_node.py b/privacy_guard/analysis/tests/test_lia_analysis_node.py index 79a04c6..a5d154e 100644 --- a/privacy_guard/analysis/tests/test_lia_analysis_node.py +++ b/privacy_guard/analysis/tests/test_lia_analysis_node.py @@ -512,3 +512,98 @@ def test_miscalibration_statistics_computation(self) -> None: places=10, msg="prediction_y1_generation_mean should match the mean of predictions_y1_generation", ) + + def test_score_computation_function_is_called(self) -> None: + """Test that a provided score_computation_function is used for score computation.""" + + def constant_scores( + received_labels: np.ndarray, + y1_probs: np.ndarray, + predictions: np.ndarray, + ) -> np.ndarray: + """Return constant scores to verify the function is invoked.""" + return np.ones(len(received_labels)) * 42.0 + + analysis_node = LIAAnalysisNode( + analysis_input=self.analysis_input, + delta=1e-6, + num_bootstrap_resampling_times=2, + score_computation_function=constant_scores, + ) + + scores_train, scores_test = analysis_node.compute_scores(0) + + # Verify the function's output is used and splitting is done correctly + expected_train_count = int(np.sum(self.true_bits[0] == 0)) + expected_test_count = int(np.sum(self.true_bits[0] == 1)) + self.assertEqual(scores_train.shape[0], expected_train_count) + self.assertEqual(scores_test.shape[0], expected_test_count) + self.assertTrue(torch.all(scores_train == 42.0)) + self.assertTrue(torch.all(scores_test == 42.0)) + + def test_score_computation_function_receives_correct_args(self) -> None: + """Test that score_computation_function receives correct arguments.""" + captured_args: dict[str, object] = {} + + def capturing_scores( + received_labels: np.ndarray, + y1_probs: np.ndarray, + predictions: np.ndarray, + ) -> np.ndarray: + """Capture arguments for verification.""" + captured_args["received_labels"] = received_labels + captured_args["y1_probs"] = y1_probs + captured_args["predictions"] = predictions + return np.zeros(len(received_labels)) + + analysis_node = LIAAnalysisNode( + analysis_input=self.analysis_input, + delta=1e-6, + num_bootstrap_resampling_times=2, + score_computation_function=capturing_scores, + ) + + analysis_node.compute_scores(0) + + # Verify the function received the correct arguments + np.testing.assert_array_equal( + captured_args["received_labels"], self.received_labels[0] + ) + np.testing.assert_array_equal(captured_args["y1_probs"], self.y1_preds) + np.testing.assert_array_equal(captured_args["predictions"], self.predictions) + + def test_default_score_computation_function_is_none(self) -> None: + """Test that score_computation_function defaults to None.""" + analysis_node = LIAAnalysisNode( + analysis_input=self.analysis_input, + delta=1e-6, + num_bootstrap_resampling_times=2, + ) + + self.assertIsNone(analysis_node.score_computation_function) + + def test_score_computation_function_end_to_end(self) -> None: + """Test that score_computation_function integrates with run_analysis.""" + + def constant_scores( + received_labels: np.ndarray, + y1_probs: np.ndarray, + predictions: np.ndarray, + ) -> np.ndarray: + """Return constant scores for deterministic analysis output.""" + return np.ones(len(received_labels)) * 42.0 + + analysis_node = LIAAnalysisNode( + analysis_input=self.analysis_input, + delta=1e-6, + num_bootstrap_resampling_times=2, + score_computation_function=constant_scores, + ) + + outputs = analysis_node.compute_outputs() + + # Analysis should complete successfully with the provided function + self.assertIsInstance(outputs, dict) + self.assertIsInstance(outputs["eps"], float) + self.assertIsInstance(outputs["accuracy"], float) + self.assertIsInstance(outputs["auc"], float) diff --git a/privacy_guard/attacks/lia_attack.py b/privacy_guard/attacks/lia_attack.py index 164412a..5d7327e 100644 --- a/privacy_guard/attacks/lia_attack.py +++ b/privacy_guard/attacks/lia_attack.py @@ -14,7 +14,7 @@ # pyre-strict -from typing import Dict, List +from typing import Callable, Dict, List import numpy as np import pandas as pd @@ -120,6 +120,8 @@ def __init__( attack_input: Dict[str, pd.DataFrame], row_aggregation: AggregationType, y1_generation: str = "calibration", + y1_generation_function: Callable[[np.ndarray, np.ndarray, int], np.ndarray] + | None = None, num_resampling_times: int = 100, ) -> None: """ @@ -127,12 +129,22 @@ def __init__( attack_input: dictionary containing dataframes for the attack, must contain keys "df_train_and_calib" and "df_aggregated" row_aggregation: specifies aggregation strategy for aggregating rows for each user y1_generation: strategy for generating the labels y1 (reconstructed labels) + y1_generation_function: optional function to generate synthetic y1 labels. + Signature: (predictions_y1_generation, labels, num_resampling_times) -> y1_labels + predictions_y1_generation: predictions used for synthetic label generation, + np.ndarray of shape (num_samples,) + labels: true training labels (y0), + np.ndarray of shape (num_samples,) + num_resampling_times: number of independent resampling iterations, int + Returns np.ndarray of shape (num_resampling_times, num_samples). + If None, uses Binomial sampling from predictions_y1_generation. num_resampling_times: Number of times to instantiate the LIA game (for confidence interval estimation) """ self.attack_input = attack_input self.row_aggregation = row_aggregation self.y1_generation = y1_generation self.num_resampling_times = num_resampling_times + self.y1_generation_function = y1_generation_function def get_y1_predictions(self, df_attack: pd.DataFrame) -> np.ndarray: """ @@ -177,6 +189,31 @@ def get_y1_predictions(self, df_attack: pd.DataFrame) -> np.ndarray: return predictions_y1_generation + def _generate_y1_labels( + self, predictions_y1_generation: np.ndarray, labels: np.ndarray + ) -> np.ndarray: + """ + Generate y1 labels for the attack. + args: + predictions_y1_generation: predictions used for generating y1 + labels: true labels from the attack dataframe + returns: + y1: y1 labels + """ + + if self.y1_generation_function is None: + # generate binary labels using Binomial distribution + random_floats = np.random.rand( + self.num_resampling_times, len(predictions_y1_generation) + ) + y1_all_reps = (random_floats < predictions_y1_generation).astype(int) + else: + y1_all_reps = self.y1_generation_function( + predictions_y1_generation, labels, self.num_resampling_times + ) + + return y1_all_reps + def run_attack(self) -> LIAAnalysisInput: """ Run LIA attack. @@ -189,12 +226,13 @@ def run_attack(self) -> LIAAnalysisInput: y0 = np.asarray(df_attack["label"].values) predictions = np.asarray(df_attack["predictions"].values) - predictions_y1_generation = np.asarray(self.get_y1_predictions(df_attack)) true_bits_all_reps = np.random.randint( 2, size=(self.num_resampling_times, len(df_attack)) ) - random_floats = np.random.rand(self.num_resampling_times, len(df_attack)) - y1_all_reps = (random_floats < predictions_y1_generation).astype(int) + predictions_y1_generation = np.asarray(self.get_y1_predictions(df_attack)) + y1_all_reps = self._generate_y1_labels( + predictions_y1_generation, df_attack["label"].values + ) received_labels_all_reps = np.where(true_bits_all_reps == 0, y0, y1_all_reps) # Create analysis input object diff --git a/privacy_guard/attacks/tests/test_lia_attack.py b/privacy_guard/attacks/tests/test_lia_attack.py index 86b24b2..83c53ac 100644 --- a/privacy_guard/attacks/tests/test_lia_attack.py +++ b/privacy_guard/attacks/tests/test_lia_attack.py @@ -478,3 +478,96 @@ def test_run_attack_analysis_input_structure(self) -> None: self.assertEqual( analysis_input.received_labels[i, j], analysis_input.y1[i, j] ) + + def test_y1_generation_function_is_called(self) -> None: + """Test that a provided y1_generation_function is used for label generation.""" + + def deterministic_y1_generation( + predictions_y1: np.ndarray, + labels: np.ndarray, + num_resampling_times: int, + ) -> np.ndarray: + """Return all-ones labels to verify the function is invoked.""" + return np.ones((num_resampling_times, len(labels)), dtype=int) + + lia_attack = LIAAttack( + attack_input=self.attack_input, + row_aggregation=AggregationType.MAX, + y1_generation="calibration", + y1_generation_function=deterministic_y1_generation, + num_resampling_times=5, + ) + + analysis_input = lia_attack.run_attack() + + # All y1 values should be 1 since our function returns all-ones + self.assertTrue(np.all(analysis_input.y1 == 1)) + self.assertEqual( + analysis_input.y1.shape, (5, len(self.attack_input["df_aggregated"])) + ) + + def test_y1_generation_function_receives_correct_args(self) -> None: + """Test that y1_generation_function receives correct arguments.""" + captured_args: dict[str, object] = {} + + def capturing_y1_generation( + predictions_y1: np.ndarray, + labels: np.ndarray, + num_resampling_times: int, + ) -> np.ndarray: + """Capture arguments for verification.""" + captured_args["predictions_y1"] = predictions_y1 + captured_args["labels"] = labels + captured_args["num_resampling_times"] = num_resampling_times + return np.zeros((num_resampling_times, len(labels)), dtype=int) + + num_resampling = 7 + lia_attack = LIAAttack( + attack_input=self.attack_input, + row_aggregation=AggregationType.MAX, + y1_generation="calibration", + y1_generation_function=capturing_y1_generation, + num_resampling_times=num_resampling, + ) + + df_attack = self.attack_input["df_aggregated"] + lia_attack.run_attack() + + # Verify the function received the correct arguments + self.assertEqual(captured_args["num_resampling_times"], num_resampling) + assert_array_equal( + captured_args["predictions_y1"], + np.asarray(df_attack["predictions_calib"].values), + ) + assert_array_equal( + captured_args["labels"], + df_attack["label"], + ) + + def test_default_y1_generation_function_is_none(self) -> None: + """Test that y1_generation_function defaults to None.""" + lia_attack = LIAAttack( + attack_input=self.attack_input, + row_aggregation=AggregationType.MAX, + ) + + self.assertIsNone(lia_attack.y1_generation_function) + + def test_generate_y1_labels_without_function(self) -> None: + """Test that _generate_y1_labels uses Binomial sampling when no function is provided.""" + lia_attack = LIAAttack( + attack_input=self.attack_input, + row_aggregation=AggregationType.MAX, + y1_generation="calibration", + num_resampling_times=1000, + ) + + df_attack = self.attack_input["df_aggregated"] + predictions_y1 = np.asarray(lia_attack.get_y1_predictions(df_attack)) + y1_all_reps = lia_attack._generate_y1_labels( + predictions_y1, df_attack["label"].values + ) + + # Should produce binary values + self.assertTrue(np.all(np.isin(y1_all_reps, [0, 1]))) + self.assertEqual(y1_all_reps.shape, (1000, len(df_attack)))