Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions privacy_guard/analysis/lia/lia_analysis_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging
from dataclasses import dataclass
from typing import List, Tuple
from typing import Callable, List, Tuple

import numpy as np
import torch
Expand Down Expand Up @@ -72,7 +72,34 @@ def __init__(
with_timer: bool = False,
power: float = 0.0,
use_fnr_and_tnr: bool = False,
score_computation_function: Callable[
[np.ndarray, np.ndarray, np.ndarray],
np.ndarray,
]
| None = None,
) -> None:
"""
Args:
analysis_input: LIA analysis input data
delta: privacy parameter delta for epsilon computation
num_bootstrap_resampling_times: number of bootstrap samples for CI estimation
cap_eps: whether to cap epsilon at a finite upper bound
show_progress: whether to show a progress bar
with_timer: whether to record timing statistics
power: exponent applied to prob_diff_label in the score function
use_fnr_and_tnr: whether to use FNR/TNR in addition to FPR/TPR thresholds
score_computation_function: optional function to compute per-sample scores.
Signature: (received_labels, y1_probs, predictions) -> scores
received_labels: labels received by the adversary,
np.ndarray of shape (num_samples,)
y1_probs: predictions used for generating synthetic labels y1,
np.ndarray of shape (num_samples,)
predictions: target model predictions,
np.ndarray of shape (num_samples,)
Returns np.ndarray of shape (num_samples,).
The train/test split is applied after this function returns.
If None, uses the default log-likelihood ratio score.
"""
if power < 0:
raise ValueError("Power used for score function must be non-negative")

Expand All @@ -85,6 +112,7 @@ def __init__(
self._timer_stats: dict[str, float] = {}
self._power = power
self._use_fnr_and_tnr = use_fnr_and_tnr
self.score_computation_function = score_computation_function

def compute_scores(self, i: int) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Expand All @@ -97,20 +125,24 @@ def compute_scores(self, i: int) -> Tuple[torch.Tensor, torch.Tensor]:
Tuple[torch.Tensor, torch.Tensor]: scores for samples with training labels and reconstructed labels
"""

true_bits = self._analysis_input.true_bits[i]

received_labels = self._analysis_input.received_labels[i]
y1_probs = self._analysis_input.predictions_y1_generation
predictions = self._analysis_input.predictions

prob_train = np.where(received_labels == 1, predictions, 1 - predictions)
prob_reconstruct = np.where(received_labels == 1, y1_probs, 1 - y1_probs)
prob_diff_label = np.where(received_labels == 1, 1 - y1_probs, y1_probs)
if self.score_computation_function is not None:
scores = self.score_computation_function(
received_labels, y1_probs, predictions
)
else:
prob_train = np.where(received_labels == 1, predictions, 1 - predictions)
prob_reconstruct = np.where(received_labels == 1, y1_probs, 1 - y1_probs)
prob_diff_label = np.where(received_labels == 1, 1 - y1_probs, y1_probs)

scores = (
np.log(prob_train + 1e-8) - np.log(prob_reconstruct + 1e-8)
) * prob_diff_label**self._power
scores = (
np.log(prob_train + 1e-8) - np.log(prob_reconstruct + 1e-8)
) * prob_diff_label**self._power

true_bits = self._analysis_input.true_bits[i]
scores_train = torch.tensor(scores[true_bits == 0])
scores_test = torch.tensor(scores[true_bits == 1])

Expand Down
95 changes: 95 additions & 0 deletions privacy_guard/analysis/tests/test_lia_analysis_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,98 @@ def test_miscalibration_statistics_computation(self) -> None:
places=10,
msg="prediction_y1_generation_mean should match the mean of predictions_y1_generation",
)

def test_score_computation_function_is_called(self) -> None:
"""Test that a provided score_computation_function is used for score computation."""

def constant_scores(
received_labels: np.ndarray,
y1_probs: np.ndarray,
predictions: np.ndarray,
) -> np.ndarray:
"""Return constant scores to verify the function is invoked."""
return np.ones(len(received_labels)) * 42.0

analysis_node = LIAAnalysisNode(
analysis_input=self.analysis_input,
delta=1e-6,
num_bootstrap_resampling_times=2,
score_computation_function=constant_scores,
)

scores_train, scores_test = analysis_node.compute_scores(0)

# Verify the function's output is used and splitting is done correctly
expected_train_count = int(np.sum(self.true_bits[0] == 0))
expected_test_count = int(np.sum(self.true_bits[0] == 1))
self.assertEqual(scores_train.shape[0], expected_train_count)
self.assertEqual(scores_test.shape[0], expected_test_count)
self.assertTrue(torch.all(scores_train == 42.0))
self.assertTrue(torch.all(scores_test == 42.0))

def test_score_computation_function_receives_correct_args(self) -> None:
"""Test that score_computation_function receives correct arguments."""
captured_args: dict[str, object] = {}

def capturing_scores(
received_labels: np.ndarray,
y1_probs: np.ndarray,
predictions: np.ndarray,
) -> np.ndarray:
"""Capture arguments for verification."""
captured_args["received_labels"] = received_labels
captured_args["y1_probs"] = y1_probs
captured_args["predictions"] = predictions
return np.zeros(len(received_labels))

analysis_node = LIAAnalysisNode(
analysis_input=self.analysis_input,
delta=1e-6,
num_bootstrap_resampling_times=2,
score_computation_function=capturing_scores,
)

analysis_node.compute_scores(0)

# Verify the function received the correct arguments
np.testing.assert_array_equal(
captured_args["received_labels"], self.received_labels[0]
)
np.testing.assert_array_equal(captured_args["y1_probs"], self.y1_preds)
np.testing.assert_array_equal(captured_args["predictions"], self.predictions)

def test_default_score_computation_function_is_none(self) -> None:
"""Test that score_computation_function defaults to None."""
analysis_node = LIAAnalysisNode(
analysis_input=self.analysis_input,
delta=1e-6,
num_bootstrap_resampling_times=2,
)

self.assertIsNone(analysis_node.score_computation_function)

def test_score_computation_function_end_to_end(self) -> None:
"""Test that score_computation_function integrates with run_analysis."""

def constant_scores(
received_labels: np.ndarray,
y1_probs: np.ndarray,
predictions: np.ndarray,
) -> np.ndarray:
"""Return constant scores for deterministic analysis output."""
return np.ones(len(received_labels)) * 42.0

analysis_node = LIAAnalysisNode(
analysis_input=self.analysis_input,
delta=1e-6,
num_bootstrap_resampling_times=2,
score_computation_function=constant_scores,
)

outputs = analysis_node.compute_outputs()

# Analysis should complete successfully with the provided function
self.assertIsInstance(outputs, dict)
self.assertIsInstance(outputs["eps"], float)
self.assertIsInstance(outputs["accuracy"], float)
self.assertIsInstance(outputs["auc"], float)
46 changes: 42 additions & 4 deletions privacy_guard/attacks/lia_attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# pyre-strict

from typing import Dict, List
from typing import Callable, Dict, List

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -120,19 +120,31 @@ def __init__(
attack_input: Dict[str, pd.DataFrame],
row_aggregation: AggregationType,
y1_generation: str = "calibration",
y1_generation_function: Callable[[np.ndarray, np.ndarray, int], np.ndarray]
| None = None,
num_resampling_times: int = 100,
) -> None:
"""
args:
attack_input: dictionary containing dataframes for the attack, must contain keys "df_train_and_calib" and "df_aggregated"
row_aggregation: specifies aggregation strategy for aggregating rows for each user
y1_generation: strategy for generating the labels y1 (reconstructed labels)
y1_generation_function: optional function to generate synthetic y1 labels.
Signature: (predictions_y1_generation, labels, num_resampling_times) -> y1_labels
predictions_y1_generation: predictions used for synthetic label generation,
np.ndarray of shape (num_samples,)
labels: true training labels (y0),
np.ndarray of shape (num_samples,)
num_resampling_times: number of independent resampling iterations, int
Returns np.ndarray of shape (num_resampling_times, num_samples).
If None, uses Binomial sampling from predictions_y1_generation.
num_resampling_times: Number of times to instantiate the LIA game (for confidence interval estimation)
"""
self.attack_input = attack_input
self.row_aggregation = row_aggregation
self.y1_generation = y1_generation
self.num_resampling_times = num_resampling_times
self.y1_generation_function = y1_generation_function

def get_y1_predictions(self, df_attack: pd.DataFrame) -> np.ndarray:
"""
Expand Down Expand Up @@ -177,6 +189,31 @@ def get_y1_predictions(self, df_attack: pd.DataFrame) -> np.ndarray:

return predictions_y1_generation

def _generate_y1_labels(
self, predictions_y1_generation: np.ndarray, labels: np.ndarray
) -> np.ndarray:
"""
Generate y1 labels for the attack.
args:
predictions_y1_generation: predictions used for generating y1
labels: true labels from the attack dataframe
returns:
y1: y1 labels
"""

if self.y1_generation_function is None:
# generate binary labels using Binomial distribution
random_floats = np.random.rand(
self.num_resampling_times, len(predictions_y1_generation)
)
y1_all_reps = (random_floats < predictions_y1_generation).astype(int)
else:
y1_all_reps = self.y1_generation_function(
predictions_y1_generation, labels, self.num_resampling_times
)

return y1_all_reps

def run_attack(self) -> LIAAnalysisInput:
"""
Run LIA attack.
Expand All @@ -189,12 +226,13 @@ def run_attack(self) -> LIAAnalysisInput:

y0 = np.asarray(df_attack["label"].values)
predictions = np.asarray(df_attack["predictions"].values)
predictions_y1_generation = np.asarray(self.get_y1_predictions(df_attack))
true_bits_all_reps = np.random.randint(
2, size=(self.num_resampling_times, len(df_attack))
)
random_floats = np.random.rand(self.num_resampling_times, len(df_attack))
y1_all_reps = (random_floats < predictions_y1_generation).astype(int)
predictions_y1_generation = np.asarray(self.get_y1_predictions(df_attack))
y1_all_reps = self._generate_y1_labels(
predictions_y1_generation, df_attack["label"].values
)
received_labels_all_reps = np.where(true_bits_all_reps == 0, y0, y1_all_reps)

# Create analysis input object
Expand Down
93 changes: 93 additions & 0 deletions privacy_guard/attacks/tests/test_lia_attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,96 @@ def test_run_attack_analysis_input_structure(self) -> None:
self.assertEqual(
analysis_input.received_labels[i, j], analysis_input.y1[i, j]
)

def test_y1_generation_function_is_called(self) -> None:
"""Test that a provided y1_generation_function is used for label generation."""

def deterministic_y1_generation(
predictions_y1: np.ndarray,
labels: np.ndarray,
num_resampling_times: int,
) -> np.ndarray:
"""Return all-ones labels to verify the function is invoked."""
return np.ones((num_resampling_times, len(labels)), dtype=int)

lia_attack = LIAAttack(
attack_input=self.attack_input,
row_aggregation=AggregationType.MAX,
y1_generation="calibration",
y1_generation_function=deterministic_y1_generation,
num_resampling_times=5,
)

analysis_input = lia_attack.run_attack()

# All y1 values should be 1 since our function returns all-ones
self.assertTrue(np.all(analysis_input.y1 == 1))
self.assertEqual(
analysis_input.y1.shape, (5, len(self.attack_input["df_aggregated"]))
)

def test_y1_generation_function_receives_correct_args(self) -> None:
"""Test that y1_generation_function receives correct arguments."""
captured_args: dict[str, object] = {}

def capturing_y1_generation(
predictions_y1: np.ndarray,
labels: np.ndarray,
num_resampling_times: int,
) -> np.ndarray:
"""Capture arguments for verification."""
captured_args["predictions_y1"] = predictions_y1
captured_args["labels"] = labels
captured_args["num_resampling_times"] = num_resampling_times
return np.zeros((num_resampling_times, len(labels)), dtype=int)

num_resampling = 7
lia_attack = LIAAttack(
attack_input=self.attack_input,
row_aggregation=AggregationType.MAX,
y1_generation="calibration",
y1_generation_function=capturing_y1_generation,
num_resampling_times=num_resampling,
)

df_attack = self.attack_input["df_aggregated"]
lia_attack.run_attack()

# Verify the function received the correct arguments
self.assertEqual(captured_args["num_resampling_times"], num_resampling)
assert_array_equal(
captured_args["predictions_y1"],
np.asarray(df_attack["predictions_calib"].values),
)
assert_array_equal(
captured_args["labels"],
df_attack["label"],
)

def test_default_y1_generation_function_is_none(self) -> None:
"""Test that y1_generation_function defaults to None."""
lia_attack = LIAAttack(
attack_input=self.attack_input,
row_aggregation=AggregationType.MAX,
)

self.assertIsNone(lia_attack.y1_generation_function)

def test_generate_y1_labels_without_function(self) -> None:
"""Test that _generate_y1_labels uses Binomial sampling when no function is provided."""
lia_attack = LIAAttack(
attack_input=self.attack_input,
row_aggregation=AggregationType.MAX,
y1_generation="calibration",
num_resampling_times=1000,
)

df_attack = self.attack_input["df_aggregated"]
predictions_y1 = np.asarray(lia_attack.get_y1_predictions(df_attack))
y1_all_reps = lia_attack._generate_y1_labels(
predictions_y1, df_attack["label"].values
)

# Should produce binary values
self.assertTrue(np.all(np.isin(y1_all_reps, [0, 1])))
self.assertEqual(y1_all_reps.shape, (1000, len(df_attack)))
Loading