From f2fa4aabbe9a8ea2ce7bebe0a48f928264b5ce1b Mon Sep 17 00:00:00 2001 From: Richard Lundeen Date: Fri, 10 Apr 2026 16:30:54 -0700 Subject: [PATCH 1/4] Adding attack technique --- pyrit/identifiers/atomic_attack_identifier.py | 42 ++++----- pyrit/identifiers/evaluation_identifier.py | 12 ++- pyrit/models/seeds/seed_attack_group.py | 22 +++++ .../seeds/seed_attack_technique_group.py | 29 ++++++- pyrit/scenario/__init__.py | 2 + pyrit/scenario/core/__init__.py | 2 + pyrit/scenario/core/atomic_attack.py | 85 +++++++++++++------ pyrit/scenario/core/attack_technique.py | 41 +++++++++ pyrit/scenario/core/scenario.py | 3 +- .../scenario/scenarios/airt/content_harms.py | 9 +- pyrit/scenario/scenarios/airt/cyber.py | 3 +- pyrit/scenario/scenarios/airt/jailbreak.py | 5 +- pyrit/scenario/scenarios/airt/leakage.py | 3 +- pyrit/scenario/scenarios/airt/psychosocial.py | 7 +- pyrit/scenario/scenarios/airt/scam.py | 3 +- .../scenarios/foundry/red_team_agent.py | 3 +- pyrit/scenario/scenarios/garak/encoding.py | 5 +- .../test_atomic_attack_identifier.py | 63 +++++++------- tests/unit/scenarios/test_atomic_attack.py | 81 ++++++++++-------- tests/unit/scenarios/test_content_harms.py | 6 +- tests/unit/scenarios/test_cyber.py | 8 +- tests/unit/scenarios/test_encoding.py | 4 +- tests/unit/scenarios/test_jailbreak.py | 18 ++-- tests/unit/scenarios/test_leakage_scenario.py | 14 +-- .../unit/scenarios/test_psychosocial_harms.py | 6 +- tests/unit/scenarios/test_scam.py | 8 +- uv.lock | 2 +- 27 files changed, 323 insertions(+), 163 deletions(-) create mode 100644 pyrit/scenario/core/attack_technique.py diff --git a/pyrit/identifiers/atomic_attack_identifier.py b/pyrit/identifiers/atomic_attack_identifier.py index c242c9ef52..3cb6169c08 100644 --- a/pyrit/identifiers/atomic_attack_identifier.py +++ b/pyrit/identifiers/atomic_attack_identifier.py @@ -8,11 +8,11 @@ by combining the attack strategy's identity with the seed identifiers from the dataset. -The composite identifier always has the same shape: +The composite identifier has this shape: class_name = "AtomicAttack" children["attack"] = attack strategy's ComponentIdentifier - children["seeds"] = list of seed ComponentIdentifiers - (may be empty when no seeds are present) + children["technique_seeds"] = list of technique-only seed ComponentIdentifiers (optional) + children["seeds"] = list of ALL seed ComponentIdentifiers (for traceability) """ import logging @@ -22,6 +22,7 @@ if TYPE_CHECKING: from pyrit.models.seeds.seed import Seed + from pyrit.models.seeds.seed_attack_technique_group import SeedAttackTechniqueGroup from pyrit.models.seeds.seed_group import SeedGroup logger = logging.getLogger(__name__) @@ -40,10 +41,10 @@ def build_seed_identifier(seed: "Seed") -> ComponentIdentifier: always produces the same identifier. Args: - seed (Seed): The seed to build an identifier for. + seed: The seed to build an identifier for. Returns: - ComponentIdentifier: An identifier capturing the seed's behavioral properties. + An identifier capturing the seed's behavioral properties. """ params: dict[str, Any] = { "value": seed.value, @@ -63,31 +64,29 @@ def build_atomic_attack_identifier( *, attack_identifier: ComponentIdentifier, seed_group: Optional["SeedGroup"] = None, + seed_technique: Optional["SeedAttackTechniqueGroup"] = None, ) -> ComponentIdentifier: """ Build a composite ComponentIdentifier for an atomic attack. - Combines the attack strategy's identity with identifiers for all seeds - from the seed group. Every seed in the group is included in the identity; - each seed's ``is_general_technique`` flag is captured as a param so that - downstream consumers (e.g., evaluation identity) can filter as needed. + The identifier always includes the attack strategy as ``children["attack"]`` + and all seeds from the seed group in ``children["seeds"]`` for traceability. - When no seed_group is provided, the resulting identifier has an empty - ``seeds`` children list, but still has the standard ``AtomicAttack`` - shape for consistent querying. + When ``seed_technique`` is provided, its seeds are also included as + ``children["technique_seeds"]``. These represent the reusable "how to attack" + portion and are included in eval-hash computation, while ``seeds`` is excluded + from the eval hash. Args: - attack_identifier (ComponentIdentifier): The attack strategy's identifier - (from ``attack.get_identifier()``). - seed_group (Optional[SeedGroup]): The seed group to extract seeds from. - If None, the identifier has an empty seeds list. + attack_identifier: The attack strategy's identifier. + seed_group: The seed group to extract all seeds from. + seed_technique: Optional technique seed group whose seeds are added + as a separate ``technique_seeds`` child. Returns: - ComponentIdentifier: A composite identifier with class_name="AtomicAttack", - the attack as a child, and seed identifiers as children. + A composite ComponentIdentifier with class_name="AtomicAttack". """ seed_identifiers: list[ComponentIdentifier] = [] - if seed_group is not None: seed_identifiers.extend(build_seed_identifier(seed) for seed in seed_group.seeds) @@ -96,6 +95,11 @@ def build_atomic_attack_identifier( "seeds": seed_identifiers, } + if seed_technique is not None: + technique_seed_ids = [build_seed_identifier(seed) for seed in seed_technique.seeds] + if technique_seed_ids: + children["technique_seeds"] = technique_seed_ids + return ComponentIdentifier( class_name=_ATOMIC_ATTACK_CLASS_NAME, class_module=_ATOMIC_ATTACK_CLASS_MODULE, diff --git a/pyrit/identifiers/evaluation_identifier.py b/pyrit/identifiers/evaluation_identifier.py index 6df3192cf2..236e37b006 100644 --- a/pyrit/identifiers/evaluation_identifier.py +++ b/pyrit/identifiers/evaluation_identifier.py @@ -223,11 +223,11 @@ class AtomicAttackEvaluationIdentifier(EvaluationIdentifier): * ``objective_target`` — include only ``temperature``. * ``adversarial_chat`` — include ``model_name``, ``temperature``, ``top_p``. * ``objective_scorer`` — excluded entirely. - * ``seeds`` — include only items where ``is_general_technique=True``. + * ``seeds`` — excluded entirely (present for traceability only). + * ``technique_seeds`` — not listed, so fully included by default. - Non-target children (e.g., ``request_converters``, ``response_converters``) - receive full recursive eval treatment, meaning they fully contribute to - the hash. + Non-target children (e.g., ``request_converters``, ``response_converters``, + ``technique_seeds``) receive full recursive eval treatment. """ CHILD_EVAL_RULES: ClassVar[dict[str, ChildEvalRule]] = { @@ -238,7 +238,5 @@ class AtomicAttackEvaluationIdentifier(EvaluationIdentifier): included_params=frozenset({"model_name", "temperature", "top_p"}), ), "objective_scorer": ChildEvalRule(exclude=True), - "seeds": ChildEvalRule( - included_item_values={"is_general_technique": True}, - ), + "seeds": ChildEvalRule(exclude=True), } diff --git a/pyrit/models/seeds/seed_attack_group.py b/pyrit/models/seeds/seed_attack_group.py index e6f9ff27ee..48ee96419d 100644 --- a/pyrit/models/seeds/seed_attack_group.py +++ b/pyrit/models/seeds/seed_attack_group.py @@ -18,6 +18,7 @@ from collections.abc import Sequence from pyrit.models.seeds.seed import Seed + from pyrit.models.seeds.seed_attack_technique_group import SeedAttackTechniqueGroup class SeedAttackGroup(SeedGroup): @@ -96,3 +97,24 @@ def objective(self) -> SeedObjective: obj = self._get_objective() assert obj is not None, "SeedAttackGroup should always have an objective" return obj + + def with_technique(self, *, technique: "SeedAttackTechniqueGroup") -> "SeedAttackGroup": + """ + Return a new SeedAttackGroup with technique seeds merged in. + + The original group is not mutated. Technique seeds are inserted at + ``technique.insertion_index`` (or appended at the end when ``None``). + + Args: + technique: A validated SeedAttackTechniqueGroup whose seeds will be merged. + + Returns: + A new SeedAttackGroup with the merged seeds. + """ + base = list(self.seeds) + idx = technique.insertion_index + if idx is None: + merged_seeds = base + list(technique.seeds) + else: + merged_seeds = base[:idx] + list(technique.seeds) + base[idx:] + return SeedAttackGroup(seeds=merged_seeds) diff --git a/pyrit/models/seeds/seed_attack_technique_group.py b/pyrit/models/seeds/seed_attack_technique_group.py index 7f5e16d28e..39ae1a17e1 100644 --- a/pyrit/models/seeds/seed_attack_technique_group.py +++ b/pyrit/models/seeds/seed_attack_technique_group.py @@ -14,6 +14,7 @@ from typing import TYPE_CHECKING, Any, Union from pyrit.models.seeds.seed_group import SeedGroup +from pyrit.models.seeds.seed_objective import SeedObjective if TYPE_CHECKING: from collections.abc import Sequence @@ -36,30 +37,43 @@ def __init__( self, *, seeds: Sequence[Union[Seed, dict[str, Any]]], + insertion_index: int | None = None, ): """ Initialize a SeedAttackTechniqueGroup. Args: seeds: Sequence of seeds. All seeds must have is_general_technique=True. + insertion_index: Where to insert technique seeds when merging into a + SeedAttackGroup via ``with_technique()``. ``None`` (default) appends + at the end. An integer inserts before that position in the target + group's seed list. Raises: ValueError: If seeds is empty. ValueError: If any seed does not have is_general_technique=True. """ + self._insertion_index = insertion_index super().__init__(seeds=seeds) + @property + def insertion_index(self) -> int | None: + """Where to insert technique seeds when merging, or None to append at end.""" + return self._insertion_index + def validate(self) -> None: """ Validate the seed attack technique group state. - Extends SeedGroup validation to require all seeds to be general strategies. + Extends SeedGroup validation to require all seeds to be general strategies + and to contain no objectives. Raises: ValueError: If validation fails. """ super().validate() self._enforce_all_general_strategy() + self._enforce_no_objectives() def _enforce_all_general_strategy(self) -> None: """ @@ -75,3 +89,16 @@ def _enforce_all_general_strategy(self) -> None: f"All seeds in SeedAttackTechniqueGroup must have is_general_technique=True. " f"Found {len(non_general)} seed(s) without it: {non_general_types}" ) + + def _enforce_no_objectives(self) -> None: + """ + Ensure no SeedObjective seeds are present. + + Raises: + ValueError: If any seed is a SeedObjective. + """ + objectives = [seed for seed in self.seeds if isinstance(seed, SeedObjective)] + if objectives: + raise ValueError( + f"SeedAttackTechniqueGroup must not contain objectives. Found {len(objectives)} SeedObjective(s)." + ) diff --git a/pyrit/scenario/__init__.py b/pyrit/scenario/__init__.py index 24604e63c7..53f865305a 100644 --- a/pyrit/scenario/__init__.py +++ b/pyrit/scenario/__init__.py @@ -18,6 +18,7 @@ from pyrit.models.scenario_result import ScenarioIdentifier, ScenarioResult from pyrit.scenario.core import ( AtomicAttack, + AttackTechnique, DatasetConfiguration, Scenario, ScenarioCompositeStrategy, @@ -42,6 +43,7 @@ __all__ = [ "AtomicAttack", + "AttackTechnique", "DatasetConfiguration", "Scenario", "ScenarioCompositeStrategy", diff --git a/pyrit/scenario/core/__init__.py b/pyrit/scenario/core/__init__.py index 54506f3e0b..55c4517aca 100644 --- a/pyrit/scenario/core/__init__.py +++ b/pyrit/scenario/core/__init__.py @@ -4,12 +4,14 @@ """Core scenario classes for running attack configurations.""" from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import EXPLICIT_SEED_GROUPS_KEY, DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ScenarioCompositeStrategy, ScenarioStrategy __all__ = [ "AtomicAttack", + "AttackTechnique", "DatasetConfiguration", "EXPLICIT_SEED_GROUPS_KEY", "Scenario", diff --git a/pyrit/scenario/core/atomic_attack.py b/pyrit/scenario/core/atomic_attack.py index ec43e28026..d98de8d63b 100644 --- a/pyrit/scenario/core/atomic_attack.py +++ b/pyrit/scenario/core/atomic_attack.py @@ -14,6 +14,7 @@ """ import logging +import warnings from typing import TYPE_CHECKING, Any, Optional from pyrit.executor.attack import AttackExecutor, AttackStrategy @@ -23,6 +24,7 @@ from pyrit.memory import CentralMemory from pyrit.memory.memory_models import MAX_IDENTIFIER_VALUE_LENGTH from pyrit.models import AttackResult, SeedAttackGroup +from pyrit.scenario.core.attack_technique import AttackTechnique if TYPE_CHECKING: from pyrit.prompt_target import PromptChatTarget @@ -69,7 +71,8 @@ def __init__( self, *, atomic_attack_name: str, - attack: AttackStrategy[Any, Any], + attack_technique: AttackTechnique | None = None, + attack: AttackStrategy[Any, Any] | None = None, seed_groups: list[SeedAttackGroup], adversarial_chat: Optional["PromptChatTarget"] = None, objective_scorer: Optional["TrueFalseScorer"] = None, @@ -80,28 +83,42 @@ def __init__( Initialize an atomic attack with an attack strategy and seed groups. Args: - atomic_attack_name (str): Used to group an AtomicAttack with related attacks for a + atomic_attack_name: Used to group an AtomicAttack with related attacks for a strategy. - attack (AttackStrategy): The configured attack strategy to execute. - seed_groups (List[SeedAttackGroup]): List of seed attack groups. Each seed group must - have an objective set. The seed groups serve as the single source of truth for - objectives, prepended conversations, and next messages. - adversarial_chat (Optional[PromptChatTarget]): Optional chat target for generating - adversarial prompts or simulated conversations. Required when seed groups contain - SeedSimulatedConversation configurations. - objective_scorer (Optional[TrueFalseScorer]): Optional scorer for evaluating simulated - conversations. Required when seed groups contain SeedSimulatedConversation - configurations. - memory_labels (Optional[Dict[str, str]]): Additional labels to apply to prompts. - These labels help track and categorize the atomic attack in memory. - **attack_execute_params (Any): Additional parameters to pass to the attack - execution method (e.g., batch_size). + attack_technique: An AttackTechnique bundling the attack strategy and optional + technique seeds. Preferred over the deprecated ``attack`` parameter. + attack: Deprecated. The configured attack strategy to execute. Use + ``attack_technique`` instead. + seed_groups: List of seed attack groups. Each seed group must + have an objective set. + adversarial_chat: Optional chat target for generating + adversarial prompts or simulated conversations. + objective_scorer: Optional scorer for evaluating simulated + conversations. + memory_labels: Additional labels to apply to prompts. + **attack_execute_params: Additional parameters to pass to the attack + execution method. Raises: ValueError: If seed_groups list is empty or any seed group is missing an objective. + ValueError: If neither attack_technique nor attack is provided, or both are provided. """ self.atomic_attack_name = atomic_attack_name - self._attack = attack + + if attack_technique is not None and attack is not None: + raise ValueError("Provide either attack_technique or attack, not both.") + + if attack_technique is not None: + self._attack_technique = attack_technique + elif attack is not None: + warnings.warn( + "The 'attack' parameter is deprecated. Use 'attack_technique=AttackTechnique(attack=...)' instead.", + DeprecationWarning, + stacklevel=2, + ) + self._attack_technique = AttackTechnique(attack=attack) + else: + raise ValueError("Either attack_technique or attack must be provided.") # Validate seed_groups if not seed_groups: @@ -118,9 +135,15 @@ def __init__( self._attack_execute_params = attack_execute_params logger.info( - f"Initialized atomic attack with {len(self._seed_groups)} seed groups, attack type: {type(attack).__name__}" + f"Initialized atomic attack with {len(self._seed_groups)} seed groups, " + f"attack type: {type(self._attack_technique.attack).__name__}" ) + @property + def attack_technique(self) -> AttackTechnique: + """Get the attack technique for this atomic attack.""" + return self._attack_technique + @property def objectives(self) -> list[str]: """ @@ -199,9 +222,19 @@ async def run_async( ) try: + # If the technique has seeds, merge them into each seed group for execution. + # The original seed_groups are not mutated. + technique = self._attack_technique + if technique.seed_technique is not None: + execution_seed_groups = [ + sg.with_technique(technique=technique.seed_technique) for sg in self._seed_groups + ] + else: + execution_seed_groups = self._seed_groups + results = await executor.execute_attack_from_seed_groups_async( - attack=self._attack, - seed_groups=self._seed_groups, + attack=technique.attack, + seed_groups=execution_seed_groups, adversarial_chat=self._adversarial_chat, objective_scorer=self._objective_scorer, memory_labels=self._memory_labels, @@ -231,18 +264,19 @@ async def run_async( def _enrich_atomic_attack_identifiers(self, *, results: AttackExecutorResult[AttackResult]) -> None: """ - Enrich each AttackResult's atomic_attack_identifier with seed group information - and persist the update to the database. + Enrich each AttackResult's atomic_attack_identifier with seed group and + technique information, then persist the update to the database. Uses ``results.input_indices`` to map each completed result back to its originating seed group by index, then rebuilds the atomic_attack_identifier - to include the seed identifiers from the seed group. The enriched identifier - is then flushed back to the corresponding ``AttackResultEntry`` row. + to include the seed identifiers and any technique seeds. The enriched + identifier is then flushed back to the corresponding ``AttackResultEntry`` row. Args: - results (AttackExecutorResult[AttackResult]): The execution results to enrich. + results: The execution results to enrich. """ memory = CentralMemory.get_memory_instance() + seed_technique = self._attack_technique.seed_technique for result, idx in zip(results.completed_results, results.input_indices, strict=True): attack_strategy_id = result.get_attack_strategy_identifier() @@ -250,6 +284,7 @@ def _enrich_atomic_attack_identifiers(self, *, results: AttackExecutorResult[Att result.atomic_attack_identifier = build_atomic_attack_identifier( attack_identifier=attack_strategy_id, seed_group=self._seed_groups[idx], + seed_technique=seed_technique, ) # Persist the enriched identifier back to the database. diff --git a/pyrit/scenario/core/attack_technique.py b/pyrit/scenario/core/attack_technique.py new file mode 100644 index 0000000000..52b5b943e6 --- /dev/null +++ b/pyrit/scenario/core/attack_technique.py @@ -0,0 +1,41 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +AttackTechnique - Bundles an AttackStrategy with an optional SeedAttackTechniqueGroup. + +Represents "how to attack" independently of "what to attack" (the objective). +""" + +from __future__ import annotations + +from typing import Any + +from pyrit.executor.attack import AttackStrategy +from pyrit.models import SeedAttackTechniqueGroup + + +class AttackTechnique: + """ + Bundles an attack strategy with an optional technique seed group. + + This cleanly separates "how to attack" (the strategy + reusable technique seeds) + from "what to attack" (the objective, which lives on SeedAttackGroup / AtomicAttack). + """ + + def __init__( + self, + *, + attack: AttackStrategy[Any, Any], + seed_technique: SeedAttackTechniqueGroup | None = None, + ) -> None: + self._attack = attack + self._seed_technique = seed_technique + + @property + def attack(self) -> AttackStrategy[Any, Any]: + return self._attack + + @property + def seed_technique(self) -> SeedAttackTechniqueGroup | None: + return self._seed_technique diff --git a/pyrit/scenario/core/scenario.py b/pyrit/scenario/core/scenario.py index 7329b6f724..7e91c53bda 100644 --- a/pyrit/scenario/core/scenario.py +++ b/pyrit/scenario/core/scenario.py @@ -27,6 +27,7 @@ from pyrit.prompt_target import OpenAIChatTarget, PromptTarget from pyrit.registry import ScorerRegistry from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario_strategy import ( ScenarioCompositeStrategy, @@ -322,7 +323,7 @@ def _get_baseline(self) -> AtomicAttack: return AtomicAttack( atomic_attack_name="baseline", - attack=attack, + attack_technique=AttackTechnique(attack=attack), seed_groups=seed_groups, memory_labels=self._memory_labels, ) diff --git a/pyrit/scenario/scenarios/airt/content_harms.py b/pyrit/scenario/scenarios/airt/content_harms.py index c92f702c24..9821ac266d 100644 --- a/pyrit/scenario/scenarios/airt/content_harms.py +++ b/pyrit/scenario/scenarios/airt/content_harms.py @@ -22,6 +22,7 @@ from pyrit.models import SeedAttackGroup, SeedGroup from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ( @@ -310,7 +311,7 @@ def _get_single_turn_attacks( return [ AtomicAttack( atomic_attack_name=strategy, - attack=prompt_sending_attack, + attack_technique=AttackTechnique(attack=prompt_sending_attack), seed_groups=list(seed_groups), adversarial_chat=self._adversarial_chat, objective_scorer=self._objective_scorer, @@ -318,7 +319,7 @@ def _get_single_turn_attacks( ), AtomicAttack( atomic_attack_name=strategy, - attack=role_play_attack, + attack_technique=AttackTechnique(attack=role_play_attack), seed_groups=list(seed_groups), adversarial_chat=self._adversarial_chat, objective_scorer=self._objective_scorer, @@ -355,7 +356,7 @@ def _get_multi_turn_attacks( return [ AtomicAttack( atomic_attack_name=strategy, - attack=many_shot_jailbreak_attack, + attack_technique=AttackTechnique(attack=many_shot_jailbreak_attack), seed_groups=list(seed_groups), adversarial_chat=self._adversarial_chat, objective_scorer=self._objective_scorer, @@ -363,7 +364,7 @@ def _get_multi_turn_attacks( ), AtomicAttack( atomic_attack_name=strategy, - attack=tap_attack, + attack_technique=AttackTechnique(attack=tap_attack), seed_groups=list(seed_groups), adversarial_chat=self._adversarial_chat, objective_scorer=self._objective_scorer, diff --git a/pyrit/scenario/scenarios/airt/cyber.py b/pyrit/scenario/scenarios/airt/cyber.py index be084e6e90..09cb348687 100644 --- a/pyrit/scenario/scenarios/airt/cyber.py +++ b/pyrit/scenario/scenarios/airt/cyber.py @@ -18,6 +18,7 @@ from pyrit.models import SeedAttackGroup, SeedObjective from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ( @@ -273,7 +274,7 @@ def _get_atomic_attack_from_strategy(self, strategy: str) -> AtomicAttack: return AtomicAttack( atomic_attack_name=f"cyber_{strategy}", - attack=attack_strategy, + attack_technique=AttackTechnique(attack=attack_strategy), seed_groups=self._seed_groups, adversarial_chat=self._adversarial_chat, objective_scorer=self._scorer_config.objective_scorer, diff --git a/pyrit/scenario/scenarios/airt/jailbreak.py b/pyrit/scenario/scenarios/airt/jailbreak.py index 826622b973..0a84cb49c8 100644 --- a/pyrit/scenario/scenarios/airt/jailbreak.py +++ b/pyrit/scenario/scenarios/airt/jailbreak.py @@ -21,6 +21,7 @@ from pyrit.prompt_normalizer import PromptConverterConfiguration from pyrit.prompt_target import OpenAIChatTarget from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ScenarioCompositeStrategy, ScenarioStrategy @@ -292,7 +293,9 @@ async def _get_atomic_attack_from_strategy_async( template_name = Path(jailbreak_template_name).stem return AtomicAttack( - atomic_attack_name=f"jailbreak_{template_name}", attack=attack, seed_groups=self._seed_groups + atomic_attack_name=f"jailbreak_{template_name}", + attack_technique=AttackTechnique(attack=attack), + seed_groups=self._seed_groups, ) async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: diff --git a/pyrit/scenario/scenarios/airt/leakage.py b/pyrit/scenario/scenarios/airt/leakage.py index 61c1f13e13..5259aec6bd 100644 --- a/pyrit/scenario/scenarios/airt/leakage.py +++ b/pyrit/scenario/scenarios/airt/leakage.py @@ -25,6 +25,7 @@ from pyrit.prompt_normalizer import PromptConverterConfiguration from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ( @@ -298,7 +299,7 @@ async def _get_atomic_attack_from_strategy_async(self, strategy: str) -> AtomicA # due to the heterogeneous dict values. The types are verified by unit tests. return AtomicAttack( atomic_attack_name=f"leakage_{strategy}", - attack=attack_strategy, # type: ignore[arg-type] + attack_technique=AttackTechnique(attack=attack_strategy), # type: ignore[arg-type] seed_groups=self._seed_groups, memory_labels=self._memory_labels, ) diff --git a/pyrit/scenario/scenarios/airt/psychosocial.py b/pyrit/scenario/scenarios/airt/psychosocial.py index 16320231c3..915a48e34b 100644 --- a/pyrit/scenario/scenarios/airt/psychosocial.py +++ b/pyrit/scenario/scenarios/airt/psychosocial.py @@ -30,6 +30,7 @@ ) from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ( @@ -473,7 +474,7 @@ def _create_single_turn_attacks( attacks.append( AtomicAttack( atomic_attack_name="psychosocial_single_turn", - attack=prompt_sending, + attack_technique=AttackTechnique(attack=prompt_sending), seed_groups=seed_groups, memory_labels=self._memory_labels, ) @@ -487,7 +488,7 @@ def _create_single_turn_attacks( attacks.append( AtomicAttack( atomic_attack_name="psychosocial_role_play", - attack=role_play, + attack_technique=AttackTechnique(attack=role_play), seed_groups=seed_groups, memory_labels=self._memory_labels, ) @@ -524,7 +525,7 @@ def _create_multi_turn_attack( return AtomicAttack( atomic_attack_name="psychosocial_crescendo_turn", - attack=crescendo, + attack_technique=AttackTechnique(attack=crescendo), seed_groups=seed_groups, memory_labels=self._memory_labels, ) diff --git a/pyrit/scenario/scenarios/airt/scam.py b/pyrit/scenario/scenarios/airt/scam.py index 98ae7b338d..a9c7914588 100644 --- a/pyrit/scenario/scenarios/airt/scam.py +++ b/pyrit/scenario/scenarios/airt/scam.py @@ -25,6 +25,7 @@ from pyrit.models import SeedAttackGroup, SeedObjective from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ( @@ -312,7 +313,7 @@ def _get_atomic_attack_from_strategy(self, strategy: str) -> AtomicAttack: return AtomicAttack( atomic_attack_name=f"scam_{strategy}", - attack=attack_strategy, + attack_technique=AttackTechnique(attack=attack_strategy), seed_groups=self._seed_groups, memory_labels=self._memory_labels, ) diff --git a/pyrit/scenario/scenarios/foundry/red_team_agent.py b/pyrit/scenario/scenarios/foundry/red_team_agent.py index 6809b18d10..76fefc8755 100644 --- a/pyrit/scenario/scenarios/foundry/red_team_agent.py +++ b/pyrit/scenario/scenarios/foundry/red_team_agent.py @@ -63,6 +63,7 @@ from pyrit.prompt_target.common.prompt_chat_target import PromptChatTarget from pyrit.prompt_target.openai.openai_chat_target import OpenAIChatTarget from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ( @@ -441,7 +442,7 @@ def _get_attack_from_strategy(self, composite_strategy: ScenarioCompositeStrateg return AtomicAttack( atomic_attack_name=composite_strategy.name, - attack=attack, + attack_technique=AttackTechnique(attack=attack), seed_groups=self._seed_groups, adversarial_chat=self._adversarial_chat, objective_scorer=self._attack_scoring_config.objective_scorer, diff --git a/pyrit/scenario/scenarios/garak/encoding.py b/pyrit/scenario/scenarios/garak/encoding.py index e18241736f..4ee28fb152 100644 --- a/pyrit/scenario/scenarios/garak/encoding.py +++ b/pyrit/scenario/scenarios/garak/encoding.py @@ -34,6 +34,7 @@ PromptConverterConfiguration, ) from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario from pyrit.scenario.core.scenario_strategy import ( @@ -361,7 +362,9 @@ def _get_prompt_attacks(self, *, converters: list[PromptConverter], encoding_nam ) atomic_attacks.append( AtomicAttack( - atomic_attack_name=encoding_name, attack=attack, seed_groups=self._resolved_seed_groups or [] + atomic_attack_name=encoding_name, + attack_technique=AttackTechnique(attack=attack), + seed_groups=self._resolved_seed_groups or [], ) ) diff --git a/tests/unit/identifiers/test_atomic_attack_identifier.py b/tests/unit/identifiers/test_atomic_attack_identifier.py index b4e0bc6d17..483f1e1545 100644 --- a/tests/unit/identifiers/test_atomic_attack_identifier.py +++ b/tests/unit/identifiers/test_atomic_attack_identifier.py @@ -19,6 +19,13 @@ def __init__(self, *, seeds: list): self.seeds = seeds +class _FakeSeedTechniqueGroup: + """Minimal stub for SeedAttackTechniqueGroup with a seeds list.""" + + def __init__(self, *, seeds: list): + self.seeds = seeds + + # --------------------------------------------------------------------------- # Helpers shared across test classes # --------------------------------------------------------------------------- @@ -219,8 +226,7 @@ def test_objective_scorer_excluded(self): def test_seeds_rule(self): rule = AtomicAttackEvaluationIdentifier.CHILD_EVAL_RULES["seeds"] - assert rule.included_item_values == {"is_general_technique": True} - assert not rule.exclude + assert rule.exclude is True # -- Basic properties -------------------------------------------------- @@ -360,70 +366,67 @@ def test_converters_contribute_while_target_endpoint_ignored(self): c2 = build_atomic_attack_identifier(attack_identifier=a2) assert AtomicAttackEvaluationIdentifier(c1).eval_hash == AtomicAttackEvaluationIdentifier(c2).eval_hash - # -- Seeds (eval hash uses only general technique seeds) --------------- + # -- Seeds and technique_seeds (eval hash uses technique_seeds, excludes seeds) --- - def test_different_general_technique_seeds_different_eval_hash(self): + def test_different_technique_seeds_different_eval_hash(self): attack_id = _make_attack() seed1 = SeedPrompt(value="tech1", value_sha256="aaa", is_general_technique=True) seed2 = SeedPrompt(value="tech2", value_sha256="bbb", is_general_technique=True) - c1 = build_atomic_attack_identifier(attack_identifier=attack_id, seed_group=_FakeSeedGroup(seeds=[seed1])) - c2 = build_atomic_attack_identifier(attack_identifier=attack_id, seed_group=_FakeSeedGroup(seeds=[seed2])) + c1 = build_atomic_attack_identifier( + attack_identifier=attack_id, seed_technique=_FakeSeedTechniqueGroup(seeds=[seed1]) + ) + c2 = build_atomic_attack_identifier( + attack_identifier=attack_id, seed_technique=_FakeSeedTechniqueGroup(seeds=[seed2]) + ) assert AtomicAttackEvaluationIdentifier(c1).eval_hash != AtomicAttackEvaluationIdentifier(c2).eval_hash - def test_non_general_technique_seeds_ignored_in_eval_hash(self): - """Same general technique seeds but different non-general seeds -> same eval hash.""" + def test_seeds_in_seed_group_ignored_in_eval_hash(self): + """Different seeds in seed_group (traceability) should not affect eval hash.""" attack_id = _make_attack() - general_seed = SeedPrompt(value="technique", value_sha256="abc", is_general_technique=True) non_general_1 = SeedPrompt(value="obj1", value_sha256="xxx", is_general_technique=False) non_general_2 = SeedPrompt(value="obj2", value_sha256="yyy", is_general_technique=False) c1 = build_atomic_attack_identifier( attack_identifier=attack_id, - seed_group=_FakeSeedGroup(seeds=[general_seed, non_general_1]), + seed_group=_FakeSeedGroup(seeds=[non_general_1]), ) c2 = build_atomic_attack_identifier( attack_identifier=attack_id, - seed_group=_FakeSeedGroup(seeds=[general_seed, non_general_2]), + seed_group=_FakeSeedGroup(seeds=[non_general_2]), ) assert AtomicAttackEvaluationIdentifier(c1).eval_hash == AtomicAttackEvaluationIdentifier(c2).eval_hash - def test_eval_hash_only_uses_general_technique_seeds(self): - """Eval hash with mixed seeds should match one built with only general technique seeds.""" + def test_general_technique_seeds_in_seed_group_ignored_in_eval_hash(self): + """Even general technique seeds in seed_group are excluded from eval hash.""" attack_id = _make_attack() general_seed = SeedPrompt(value="technique", value_sha256="abc", is_general_technique=True) - non_general_seed = SeedPrompt(value="objective", value_sha256="def", is_general_technique=False) - - # Identifier with both general and non-general seeds - c_mixed = build_atomic_attack_identifier( + c_with = build_atomic_attack_identifier( attack_identifier=attack_id, - seed_group=_FakeSeedGroup(seeds=[general_seed, non_general_seed]), + seed_group=_FakeSeedGroup(seeds=[general_seed]), ) - # Identifier with only general technique seed - c_general_only = build_atomic_attack_identifier( + c_without = build_atomic_attack_identifier( attack_identifier=attack_id, - seed_group=_FakeSeedGroup(seeds=[general_seed]), ) assert ( - AtomicAttackEvaluationIdentifier(c_mixed).eval_hash - == AtomicAttackEvaluationIdentifier(c_general_only).eval_hash + AtomicAttackEvaluationIdentifier(c_with).eval_hash + == AtomicAttackEvaluationIdentifier(c_without).eval_hash ) - def test_identifier_hash_differs_with_non_general_seeds(self): - """The full identifier hash SHOULD differ when non-general seeds differ.""" + def test_identifier_hash_differs_with_different_seeds(self): + """The full identifier hash SHOULD differ when seeds differ (even though eval hash doesn't).""" attack_id = _make_attack() - general_seed = SeedPrompt(value="technique", value_sha256="abc", is_general_technique=True) non_general_1 = SeedPrompt(value="obj1", value_sha256="xxx", is_general_technique=False) non_general_2 = SeedPrompt(value="obj2", value_sha256="yyy", is_general_technique=False) c1 = build_atomic_attack_identifier( attack_identifier=attack_id, - seed_group=_FakeSeedGroup(seeds=[general_seed, non_general_1]), + seed_group=_FakeSeedGroup(seeds=[non_general_1]), ) c2 = build_atomic_attack_identifier( attack_identifier=attack_id, - seed_group=_FakeSeedGroup(seeds=[general_seed, non_general_2]), + seed_group=_FakeSeedGroup(seeds=[non_general_2]), ) - # Full identifier hash should differ (all seeds contribute) + # Full identifier hash should differ (all seeds contribute to hash) assert c1.hash != c2.hash - # But eval hash should be the same (only general technique seeds) + # But eval hash should be the same (seeds excluded) assert AtomicAttackEvaluationIdentifier(c1).eval_hash == AtomicAttackEvaluationIdentifier(c2).eval_hash # -- Full composite scenario ------------------------------------------- diff --git a/tests/unit/scenarios/test_atomic_attack.py b/tests/unit/scenarios/test_atomic_attack.py index 6d453f076a..2e0822fcb8 100644 --- a/tests/unit/scenarios/test_atomic_attack.py +++ b/tests/unit/scenarios/test_atomic_attack.py @@ -21,6 +21,7 @@ SeedPrompt, ) from pyrit.scenario import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique @pytest.fixture @@ -111,12 +112,12 @@ class TestAtomicAttackInitialization: def test_init_with_valid_params(self, mock_attack, sample_seed_groups): """Test successful initialization with valid parameters.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) - assert atomic_attack._attack == mock_attack + assert atomic_attack._attack_technique.attack == mock_attack assert atomic_attack._seed_groups == sample_seed_groups assert atomic_attack._memory_labels == {} assert atomic_attack._attack_execute_params == {} @@ -126,7 +127,7 @@ def test_init_with_memory_labels(self, mock_attack, sample_seed_groups): memory_labels = {"test": "label", "category": "attack"} atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, memory_labels=memory_labels, atomic_attack_name="Test Attack Run", @@ -137,7 +138,7 @@ def test_init_with_memory_labels(self, mock_attack, sample_seed_groups): def test_init_with_attack_execute_params(self, mock_attack, sample_seed_groups): """Test initialization with additional attack execute parameters.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, max_retries=5, custom_param="value", @@ -152,7 +153,7 @@ def test_init_with_all_parameters(self, mock_attack, sample_seed_groups): memory_labels = {"test": "comprehensive"} atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, memory_labels=memory_labels, batch_size=10, @@ -160,7 +161,7 @@ def test_init_with_all_parameters(self, mock_attack, sample_seed_groups): atomic_attack_name="Test Attack Run", ) - assert atomic_attack._attack == mock_attack + assert atomic_attack._attack_technique.attack == mock_attack assert atomic_attack._seed_groups == sample_seed_groups assert atomic_attack._memory_labels == memory_labels assert atomic_attack._attack_execute_params["batch_size"] == 10 @@ -170,7 +171,7 @@ def test_init_fails_with_empty_seed_groups(self, mock_attack): """Test that initialization fails when seed_groups list is empty.""" with pytest.raises(ValueError, match="seed_groups list cannot be empty"): AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=[], atomic_attack_name="Test Attack Run", ) @@ -188,7 +189,7 @@ def test_init_fails_with_seed_group_missing_objective(self, mock_attack): def test_objectives_property_returns_values_from_seed_groups(self, mock_attack, sample_seed_groups): """Test that the objectives property returns values from seed groups.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -198,7 +199,7 @@ def test_objectives_property_returns_values_from_seed_groups(self, mock_attack, def test_seed_groups_property_returns_copy(self, mock_attack, sample_seed_groups): """Test that the seed_groups property returns a copy.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -207,6 +208,18 @@ def test_seed_groups_property_returns_copy(self, mock_attack, sample_seed_groups assert returned_groups == sample_seed_groups assert returned_groups is not atomic_attack._seed_groups + def test_deprecated_attack_param_still_works(self, mock_attack, sample_seed_groups): + """Test that the deprecated 'attack' parameter emits a warning and still initializes correctly.""" + with pytest.deprecated_call(): + atomic_attack = AtomicAttack( + attack=mock_attack, + seed_groups=sample_seed_groups, + atomic_attack_name="Deprecated Param Test", + ) + + assert atomic_attack._attack_technique.attack == mock_attack + assert atomic_attack._seed_groups == sample_seed_groups + @pytest.mark.usefixtures("patch_central_database") class TestAtomicAttackExecution: @@ -216,7 +229,7 @@ class TestAtomicAttackExecution: async def test_run_async_with_valid_atomic_attack(self, mock_attack, sample_seed_groups, sample_attack_results): """Test successful execution of an atomic attack.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -239,7 +252,7 @@ async def test_run_async_with_valid_atomic_attack(self, mock_attack, sample_seed async def test_run_async_with_custom_concurrency(self, mock_attack, sample_seed_groups, sample_attack_results): """Test execution with custom max_concurrency for atomic attack.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -259,7 +272,7 @@ async def test_run_async_with_custom_concurrency(self, mock_attack, sample_seed_ async def test_run_async_with_default_concurrency(self, mock_attack, sample_seed_groups, sample_attack_results): """Test that default concurrency (1) is used when not specified.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -280,7 +293,7 @@ async def test_run_async_passes_memory_labels(self, mock_attack, sample_seed_gro memory_labels = {"test": "attack_run", "category": "attack"} atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, memory_labels=memory_labels, atomic_attack_name="Test Attack Run", @@ -299,7 +312,7 @@ async def test_run_async_passes_memory_labels(self, mock_attack, sample_seed_gro async def test_run_async_passes_seed_groups(self, mock_attack, sample_seed_groups, sample_attack_results): """Test that seed_groups are passed to the executor.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -317,7 +330,7 @@ async def test_run_async_passes_seed_groups(self, mock_attack, sample_seed_group async def test_run_async_passes_attack_execute_params(self, mock_attack, sample_seed_groups, sample_attack_results): """Test that attack execute parameters are passed to the executor.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, custom_param="value", max_retries=3, @@ -339,7 +352,7 @@ async def test_run_async_merges_all_parameters(self, mock_attack, sample_seed_gr memory_labels = {"test": "merge"} atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, memory_labels=memory_labels, batch_size=5, @@ -361,7 +374,7 @@ async def test_run_async_merges_all_parameters(self, mock_attack, sample_seed_gr async def test_run_async_handles_execution_failure(self, mock_attack, sample_seed_groups): """Test that execution failures are properly handled and raised.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -378,7 +391,7 @@ async def test_run_async_passes_return_partial_on_failure_true_by_default( ): """Test that atomic attack passes return_partial_on_failure=True by default.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -398,7 +411,7 @@ async def test_run_async_respects_explicit_return_partial_on_failure( ): """Test that explicit return_partial_on_failure parameter is passed through.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -423,7 +436,7 @@ async def test_full_attack_run_execution_flow(self, mock_attack, sample_seed_gro memory_labels = {"test": "integration", "attack_run": "full"} atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, memory_labels=memory_labels, batch_size=2, @@ -469,7 +482,7 @@ async def test_atomic_attack_with_single_seed_group(self, mock_attack): ] atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=single_seed_group, atomic_attack_name="Test Attack Run", ) @@ -505,7 +518,7 @@ async def test_atomic_attack_with_many_seed_groups(self, mock_attack): ] atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=many_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -572,7 +585,7 @@ async def test_run_async_only_passes_valid_executor_params( The executor has strict_param_matching so invalid params would cause failures. """ atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=sample_seed_groups, atomic_attack_name="Test Attack Run", ) @@ -635,7 +648,7 @@ def mixed_seed_groups(self): def test_init_with_seed_groups_with_messages(self, mock_attack, seed_groups_with_messages): """Test that AtomicAttack initializes correctly with seed groups containing messages.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups_with_messages, atomic_attack_name="Multi-turn Attack", ) @@ -650,7 +663,7 @@ def test_init_with_seed_groups_with_messages(self, mock_attack, seed_groups_with def test_seed_groups_user_messages_property(self, mock_attack, seed_groups_with_messages): """Test that seed group user_messages are accessible and have correct content.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups_with_messages, atomic_attack_name="Multi-turn Attack", ) @@ -673,7 +686,7 @@ def test_seed_groups_user_messages_property(self, mock_attack, seed_groups_with_ async def test_run_async_passes_seed_groups_with_messages(self, mock_attack, seed_groups_with_messages): """Test that run_async correctly passes seed groups with messages to executor.""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups_with_messages, atomic_attack_name="Multi-turn Attack", ) @@ -707,7 +720,7 @@ async def test_run_async_passes_seed_groups_with_messages(self, mock_attack, see def test_init_with_mixed_seed_groups(self, mock_attack, mixed_seed_groups): """Test that AtomicAttack handles mixed seed groups (some with user_messages, some without).""" atomic_attack = AtomicAttack( - attack=mock_attack, + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=mixed_seed_groups, atomic_attack_name="Mixed Attack", ) @@ -745,7 +758,7 @@ async def test_enrichment_populates_atomic_attack_identifier(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack=mock_attack, seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -771,7 +784,7 @@ async def test_enrichment_skips_results_without_attack_identifier(self, mock_att atomic_attack_identifier=None, ) - atomic = AtomicAttack(attack=mock_attack, seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -795,7 +808,7 @@ async def test_enrichment_skips_out_of_range_index(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack=mock_attack, seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: # Index 99 is out of range for seed_groups (only 1 element) @@ -834,7 +847,7 @@ async def test_enrichment_includes_all_seeds(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack=mock_attack, seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -884,7 +897,7 @@ async def test_enrichment_maps_multiple_results_to_correct_seed_groups(self, moc ), ] - atomic = AtomicAttack(attack=mock_attack, seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results(results) @@ -921,7 +934,7 @@ async def test_enrichment_persists_to_db(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack=mock_attack, seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -961,7 +974,7 @@ async def test_enrichment_skips_db_update_when_no_attack_result_id(self, mock_at atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack=mock_attack, seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) diff --git a/tests/unit/scenarios/test_content_harms.py b/tests/unit/scenarios/test_content_harms.py index 33a20c6df3..93ca785ba9 100644 --- a/tests/unit/scenarios/test_content_harms.py +++ b/tests/unit/scenarios/test_content_harms.py @@ -746,7 +746,7 @@ async def test_get_single_turn_attacks_returns_prompt_sending_and_role_play( attacks = scenario._get_single_turn_attacks(strategy="hate", seed_groups=seed_groups) assert len(attacks) == 2 - attack_types = [type(a._attack) for a in attacks] + attack_types = [type(a._attack_technique.attack) for a in attacks] assert PromptSendingAttack in attack_types assert RolePlayAttack in attack_types @@ -778,7 +778,7 @@ async def test_get_multi_turn_attacks_returns_many_shot_and_tap( attacks = scenario._get_multi_turn_attacks(strategy="hate", seed_groups=seed_groups) assert len(attacks) == 2 - attack_types = [type(a._attack) for a in attacks] + attack_types = [type(a._attack_technique.attack) for a in attacks] assert ManyShotJailbreakAttack in attack_types assert TreeOfAttacksWithPruningAttack in attack_types @@ -816,7 +816,7 @@ async def test_get_strategy_attacks_includes_all_groups( # 2 single-turn + 2 multi-turn = 4 assert len(attacks) == 4 - attack_types = [type(a._attack) for a in attacks] + attack_types = [type(a._attack_technique.attack) for a in attacks] assert PromptSendingAttack in attack_types assert RolePlayAttack in attack_types assert ManyShotJailbreakAttack in attack_types diff --git a/tests/unit/scenarios/test_cyber.py b/tests/unit/scenarios/test_cyber.py index afa7f144a3..76a633ab04 100644 --- a/tests/unit/scenarios/test_cyber.py +++ b/tests/unit/scenarios/test_cyber.py @@ -210,7 +210,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_singleturn( @@ -234,7 +234,7 @@ async def test_attack_generation_for_singleturn( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, PromptSendingAttack) + assert isinstance(run._attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_multiturn( @@ -259,7 +259,7 @@ async def test_attack_generation_for_multiturn( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, RedTeamingAttack) + assert isinstance(run._attack_technique.attack, RedTeamingAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives( @@ -291,7 +291,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.usefixtures(*FIXTURES) diff --git a/tests/unit/scenarios/test_encoding.py b/tests/unit/scenarios/test_encoding.py index 6da94b44a7..33e98f4277 100644 --- a/tests/unit/scenarios/test_encoding.py +++ b/tests/unit/scenarios/test_encoding.py @@ -238,7 +238,7 @@ async def test_get_atomic_attacks_async_returns_attacks( # Should return multiple atomic attacks (one for each encoding type) assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_get_converter_attacks_returns_multiple_encodings( @@ -280,7 +280,7 @@ async def test_get_prompt_attacks_creates_attack_runs( # Each attack run should have the correct attack type for run in attack_runs: - assert isinstance(run._attack, PromptSendingAttack) + assert isinstance(run._attack_technique.attack, PromptSendingAttack) assert len(run._seed_groups) == len(mock_seed_attack_groups) @pytest.mark.asyncio diff --git a/tests/unit/scenarios/test_jailbreak.py b/tests/unit/scenarios/test_jailbreak.py index 9c4e4f9a17..ae1f383bef 100644 --- a/tests/unit/scenarios/test_jailbreak.py +++ b/tests/unit/scenarios/test_jailbreak.py @@ -205,7 +205,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_simple( @@ -220,7 +220,7 @@ async def test_attack_generation_for_simple( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, PromptSendingAttack) + assert isinstance(run._attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_complex( @@ -235,7 +235,7 @@ async def test_attack_generation_for_complex( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, (RolePlayAttack, ManyShotJailbreakAttack, SkeletonKeyAttack)) + assert isinstance(run._attack_technique.attack, (RolePlayAttack, ManyShotJailbreakAttack, SkeletonKeyAttack)) @pytest.mark.asyncio async def test_attack_generation_for_manyshot( @@ -250,7 +250,7 @@ async def test_attack_generation_for_manyshot( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, ManyShotJailbreakAttack) + assert isinstance(run._attack_technique.attack, ManyShotJailbreakAttack) @pytest.mark.asyncio async def test_attack_generation_for_promptsending( @@ -265,7 +265,7 @@ async def test_attack_generation_for_promptsending( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, PromptSendingAttack) + assert isinstance(run._attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_skeleton( @@ -280,7 +280,7 @@ async def test_attack_generation_for_skeleton( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, SkeletonKeyAttack) + assert isinstance(run._attack_technique.attack, SkeletonKeyAttack) @pytest.mark.asyncio async def test_attack_generation_for_roleplay( @@ -295,7 +295,7 @@ async def test_attack_generation_for_roleplay( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, RolePlayAttack) + assert isinstance(run._attack_technique.attack, RolePlayAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives( @@ -327,7 +327,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_get_all_jailbreak_templates( @@ -485,5 +485,5 @@ async def test_roleplay_attacks_share_adversarial_target( assert len(atomic_attacks) >= 2 # All role-play attacks should share the same adversarial chat target - adversarial_targets = [run._attack._adversarial_chat for run in atomic_attacks] + adversarial_targets = [run._attack_technique.attack._adversarial_chat for run in atomic_attacks] assert all(t is adversarial_targets[0] for t in adversarial_targets) diff --git a/tests/unit/scenarios/test_leakage_scenario.py b/tests/unit/scenarios/test_leakage_scenario.py index b7b7d066db..832097d01e 100644 --- a/tests/unit/scenarios/test_leakage_scenario.py +++ b/tests/unit/scenarios/test_leakage_scenario.py @@ -230,7 +230,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_first_letter( @@ -254,7 +254,7 @@ async def test_attack_generation_for_first_letter( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, PromptSendingAttack) + assert isinstance(run._attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_crescendo( @@ -274,7 +274,7 @@ async def test_attack_generation_for_crescendo( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, CrescendoAttack) + assert isinstance(run._attack_technique.attack, CrescendoAttack) @pytest.mark.asyncio async def test_attack_generation_for_image( @@ -293,7 +293,7 @@ async def test_attack_generation_for_image( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, PromptSendingAttack) + assert isinstance(run._attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_role_play( @@ -312,7 +312,7 @@ async def test_attack_generation_for_role_play( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, RolePlayAttack) + assert isinstance(run._attack_technique.attack, RolePlayAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives( @@ -346,7 +346,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_unknown_strategy_raises_value_error( @@ -604,7 +604,7 @@ async def test_image_strategy_uses_add_image_text_converter( # Verify the attack uses AddImageTextConverter for attack in atomic_attacks: - converters = attack._attack._request_converters + converters = attack._attack_technique.attack._request_converters assert len(converters) > 0 # Check that the first converter is AddImageTextConverter first_converter = converters[0].converters[0] diff --git a/tests/unit/scenarios/test_psychosocial_harms.py b/tests/unit/scenarios/test_psychosocial_harms.py index 7b55e3ff5d..d3c4d5e9d9 100644 --- a/tests/unit/scenarios/test_psychosocial_harms.py +++ b/tests/unit/scenarios/test_psychosocial_harms.py @@ -229,7 +229,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_imminent_crisis_async( @@ -252,7 +252,7 @@ async def test_attack_generation_for_imminent_crisis_async( atomic_attacks = await scenario._get_atomic_attacks_async() # Should have both single-turn and multi-turn attacks - attack_types = [type(run._attack) for run in atomic_attacks] + attack_types = [type(run._attack_technique.attack) for run in atomic_attacks] assert any(issubclass(attack_type, (PromptSendingAttack, RolePlayAttack)) for attack_type in attack_types) assert any(issubclass(attack_type, CrescendoAttack) for attack_type in attack_types) @@ -296,7 +296,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.usefixtures(*FIXTURES) diff --git a/tests/unit/scenarios/test_scam.py b/tests/unit/scenarios/test_scam.py index a74046bf9f..b5d658d9c0 100644 --- a/tests/unit/scenarios/test_scam.py +++ b/tests/unit/scenarios/test_scam.py @@ -225,7 +225,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_singleturn_async( @@ -249,7 +249,7 @@ async def test_attack_generation_for_singleturn_async( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, (ContextComplianceAttack, RolePlayAttack)) + assert isinstance(run._attack_technique.attack, (ContextComplianceAttack, RolePlayAttack)) @pytest.mark.asyncio async def test_attack_generation_for_multiturn_async( @@ -268,7 +268,7 @@ async def test_attack_generation_for_multiturn_async( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack, RedTeamingAttack) + assert isinstance(run._attack_technique.attack, RedTeamingAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives_async( @@ -308,7 +308,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack") for run in atomic_attacks) + assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) @pytest.mark.usefixtures(*FIXTURES) diff --git a/uv.lock b/uv.lock index 81b13f7242..927e69e341 100644 --- a/uv.lock +++ b/uv.lock @@ -5772,7 +5772,7 @@ wheels = [ [[package]] name = "pyrit" -version = "0.12.1.dev0" +version = "0.13.0.dev0" source = { editable = "." } dependencies = [ { name = "aiofiles" }, From 9699bb7ff878f6aed69d6f34c829b725dfb5443c Mon Sep 17 00:00:00 2001 From: Richard Lundeen Date: Fri, 10 Apr 2026 19:00:56 -0700 Subject: [PATCH 2/4] updating tests --- pyrit/identifiers/evaluation_identifier.py | 1 + pyrit/models/seeds/seed_attack_group.py | 15 ++- pyrit/scenario/core/atomic_attack.py | 24 +---- .../test_atomic_attack_identifier.py | 3 +- .../test_seed_attack_technique_group.py | 52 ++++++++++ tests/unit/models/test_seed_group.py | 98 +++++++++++++++++++ tests/unit/scenarios/test_atomic_attack.py | 28 ++++-- tests/unit/scenarios/test_attack_technique.py | 61 ++++++++++++ tests/unit/scenarios/test_content_harms.py | 6 +- tests/unit/scenarios/test_cyber.py | 8 +- tests/unit/scenarios/test_encoding.py | 4 +- tests/unit/scenarios/test_jailbreak.py | 20 ++-- tests/unit/scenarios/test_leakage_scenario.py | 14 +-- .../unit/scenarios/test_psychosocial_harms.py | 6 +- tests/unit/scenarios/test_scam.py | 8 +- 15 files changed, 281 insertions(+), 67 deletions(-) create mode 100644 tests/unit/scenarios/test_attack_technique.py diff --git a/pyrit/identifiers/evaluation_identifier.py b/pyrit/identifiers/evaluation_identifier.py index 236e37b006..96886d81a4 100644 --- a/pyrit/identifiers/evaluation_identifier.py +++ b/pyrit/identifiers/evaluation_identifier.py @@ -239,4 +239,5 @@ class AtomicAttackEvaluationIdentifier(EvaluationIdentifier): ), "objective_scorer": ChildEvalRule(exclude=True), "seeds": ChildEvalRule(exclude=True), + # technique_seeds: intentionally omitted — fully included in eval hash. } diff --git a/pyrit/models/seeds/seed_attack_group.py b/pyrit/models/seeds/seed_attack_group.py index 48ee96419d..1ef38ee82f 100644 --- a/pyrit/models/seeds/seed_attack_group.py +++ b/pyrit/models/seeds/seed_attack_group.py @@ -98,7 +98,7 @@ def objective(self) -> SeedObjective: assert obj is not None, "SeedAttackGroup should always have an objective" return obj - def with_technique(self, *, technique: "SeedAttackTechniqueGroup") -> "SeedAttackGroup": + def with_technique(self, *, technique: SeedAttackTechniqueGroup) -> SeedAttackGroup: """ Return a new SeedAttackGroup with technique seeds merged in. @@ -113,8 +113,13 @@ def with_technique(self, *, technique: "SeedAttackTechniqueGroup") -> "SeedAttac """ base = list(self.seeds) idx = technique.insertion_index - if idx is None: - merged_seeds = base + list(technique.seeds) - else: - merged_seeds = base[:idx] + list(technique.seeds) + base[idx:] + technique_seeds = list(technique.seeds) + merged_seeds = base + technique_seeds if idx is None else base[:idx] + technique_seeds + base[idx:] + + # Clear group IDs so the new group assigns a fresh one. + # This mutates the seed objects, but _enforce_consistent_group_id + # in the constructor will immediately overwrite with a new UUID. + for seed in merged_seeds: + seed.prompt_group_id = None + return SeedAttackGroup(seeds=merged_seeds) diff --git a/pyrit/scenario/core/atomic_attack.py b/pyrit/scenario/core/atomic_attack.py index d98de8d63b..e23c801fc9 100644 --- a/pyrit/scenario/core/atomic_attack.py +++ b/pyrit/scenario/core/atomic_attack.py @@ -44,27 +44,9 @@ class AtomicAttack: The AtomicAttack uses SeedAttackGroups as the single source of truth for objectives, prepended conversations, and next messages. Each SeedAttackGroup must have an objective set. - Example: - >>> from pyrit.scenario import AtomicAttack - >>> from pyrit.attacks import PromptAttack - >>> from pyrit.prompt_target import OpenAIChatTarget - >>> from pyrit.models import SeedGroup - >>> - >>> target = OpenAIChatTarget() - >>> attack = PromptAttack(objective_target=target) - >>> - >>> # Create seed groups with objectives - >>> seed_groups = SeedAttackGroup.from_yaml_file("seeds.yaml") - >>> for sg in seed_groups: - ... sg.set_objective("your objective here") - >>> - >>> atomic_attack = AtomicAttack( - ... atomic_attack_name="test_attack", - ... attack=attack, - ... seed_groups=seed_groups, - ... memory_labels={"test": "run1"} - ... ) - >>> results = await atomic_attack.run_async(max_concurrency=5) + An ``AttackTechnique`` bundles the attack strategy with an optional + ``SeedAttackTechniqueGroup``, cleanly separating "how to attack" from + "what to attack" (the objective). """ def __init__( diff --git a/tests/unit/identifiers/test_atomic_attack_identifier.py b/tests/unit/identifiers/test_atomic_attack_identifier.py index 483f1e1545..db68637e62 100644 --- a/tests/unit/identifiers/test_atomic_attack_identifier.py +++ b/tests/unit/identifiers/test_atomic_attack_identifier.py @@ -407,8 +407,7 @@ def test_general_technique_seeds_in_seed_group_ignored_in_eval_hash(self): attack_identifier=attack_id, ) assert ( - AtomicAttackEvaluationIdentifier(c_with).eval_hash - == AtomicAttackEvaluationIdentifier(c_without).eval_hash + AtomicAttackEvaluationIdentifier(c_with).eval_hash == AtomicAttackEvaluationIdentifier(c_without).eval_hash ) def test_identifier_hash_differs_with_different_seeds(self): diff --git a/tests/unit/models/test_seed_attack_technique_group.py b/tests/unit/models/test_seed_attack_technique_group.py index eb8eac9479..05c21a62e3 100644 --- a/tests/unit/models/test_seed_attack_technique_group.py +++ b/tests/unit/models/test_seed_attack_technique_group.py @@ -170,6 +170,58 @@ def test_mixed_general_and_non_general_raises(self): ) +class TestSeedAttackTechniqueGroupNoObjectives: + """Tests for _enforce_no_objectives validation.""" + + def test_rejects_seed_objective(self): + """Test that _enforce_no_objectives rejects SeedObjective seeds.""" + group = SeedAttackTechniqueGroup( + seeds=[SeedPrompt(value="ok", data_type="text", is_general_technique=True)], + ) + # Inject a SeedObjective after construction to bypass the general-technique check. + group.seeds.append(SeedObjective(value="sneaky objective")) + + with pytest.raises(ValueError, match="must not contain objectives"): + group._enforce_no_objectives() + + def test_init_rejects_objective_via_general_technique_check(self): + """Test that constructing with a SeedObjective fails (caught by general-technique check).""" + with pytest.raises(ValueError, match="is_general_technique"): + SeedAttackTechniqueGroup( + seeds=[ + SeedObjective(value="objective"), + SeedPrompt(value="ok", data_type="text", is_general_technique=True), + ] + ) + + +class TestSeedAttackTechniqueGroupInsertionIndex: + """Tests for insertion_index parameter.""" + + def test_default_insertion_index_is_none(self): + """Test that insertion_index defaults to None.""" + group = SeedAttackTechniqueGroup( + seeds=[SeedPrompt(value="s", data_type="text", is_general_technique=True)], + ) + assert group.insertion_index is None + + def test_insertion_index_set_to_int(self): + """Test that insertion_index can be set to an integer.""" + group = SeedAttackTechniqueGroup( + seeds=[SeedPrompt(value="s", data_type="text", is_general_technique=True)], + insertion_index=2, + ) + assert group.insertion_index == 2 + + def test_insertion_index_zero(self): + """Test that insertion_index can be zero (insert at beginning).""" + group = SeedAttackTechniqueGroup( + seeds=[SeedPrompt(value="s", data_type="text", is_general_technique=True)], + insertion_index=0, + ) + assert group.insertion_index == 0 + + class TestSeedAttackTechniqueGroupRepr: """Tests for SeedAttackTechniqueGroup.__repr__ method.""" diff --git a/tests/unit/models/test_seed_group.py b/tests/unit/models/test_seed_group.py index 8fb34d521f..2f04d2bfd7 100644 --- a/tests/unit/models/test_seed_group.py +++ b/tests/unit/models/test_seed_group.py @@ -9,6 +9,7 @@ from pyrit.models.seeds import ( SeedAttackGroup, + SeedAttackTechniqueGroup, SeedGroup, SeedObjective, SeedPrompt, @@ -511,3 +512,100 @@ def test_repr_with_simulated_conversation(self, tmp_path): repr_str = repr(group) assert "simulated" in repr_str + + +# ============================================================================= +# SeedAttackGroup.with_technique Tests +# ============================================================================= + + +class TestSeedAttackGroupWithTechnique: + """Tests for SeedAttackGroup.with_technique() method.""" + + def _make_base_group(self) -> SeedAttackGroup: + return SeedAttackGroup( + seeds=[ + SeedObjective(value="objective"), + SeedPrompt(value="prompt1", data_type="text"), + ] + ) + + def _make_technique(self, *, insertion_index: int | None = None) -> SeedAttackTechniqueGroup: + return SeedAttackTechniqueGroup( + seeds=[ + SeedPrompt(value="tech_a", data_type="text", is_general_technique=True), + SeedPrompt(value="tech_b", data_type="text", is_general_technique=True), + ], + insertion_index=insertion_index, + ) + + def test_append_when_insertion_index_none(self): + """Test that technique seeds are appended when insertion_index is None.""" + base = self._make_base_group() + technique = self._make_technique(insertion_index=None) + + merged = base.with_technique(technique=technique) + + assert len(merged.seeds) == 4 + assert merged.seeds[0].value == "objective" + assert merged.seeds[1].value == "prompt1" + assert merged.seeds[2].value == "tech_a" + assert merged.seeds[3].value == "tech_b" + + def test_insert_at_position(self): + """Test that technique seeds are inserted at the specified position.""" + base = self._make_base_group() + technique = self._make_technique(insertion_index=1) + + merged = base.with_technique(technique=technique) + + assert len(merged.seeds) == 4 + assert merged.seeds[0].value == "objective" + assert merged.seeds[1].value == "tech_a" + assert merged.seeds[2].value == "tech_b" + assert merged.seeds[3].value == "prompt1" + + def test_insert_at_zero(self): + """Test that insertion_index=0 prepends technique seeds.""" + base = self._make_base_group() + technique = self._make_technique(insertion_index=0) + + merged = base.with_technique(technique=technique) + + assert len(merged.seeds) == 4 + assert merged.seeds[0].value == "tech_a" + assert merged.seeds[1].value == "tech_b" + assert merged.seeds[2].value == "objective" + assert merged.seeds[3].value == "prompt1" + + def test_insert_beyond_length_appends(self): + """Test that an insertion_index beyond the list length effectively appends.""" + base = self._make_base_group() + technique = self._make_technique(insertion_index=999) + + merged = base.with_technique(technique=technique) + + assert len(merged.seeds) == 4 + assert merged.seeds[2].value == "tech_a" + assert merged.seeds[3].value == "tech_b" + + def test_does_not_mutate_original(self): + """Test that with_technique returns a new group without mutating the original.""" + base = self._make_base_group() + technique = self._make_technique() + + merged = base.with_technique(technique=technique) + + assert len(base.seeds) == 2 + assert len(merged.seeds) == 4 + assert merged is not base + + def test_merged_group_is_valid_seed_attack_group(self): + """Test that the returned group passes SeedAttackGroup validation.""" + base = self._make_base_group() + technique = self._make_technique() + + merged = base.with_technique(technique=technique) + + assert isinstance(merged, SeedAttackGroup) + merged.validate() # should not raise diff --git a/tests/unit/scenarios/test_atomic_attack.py b/tests/unit/scenarios/test_atomic_attack.py index 2e0822fcb8..8c4b7f273a 100644 --- a/tests/unit/scenarios/test_atomic_attack.py +++ b/tests/unit/scenarios/test_atomic_attack.py @@ -758,7 +758,9 @@ async def test_enrichment_populates_atomic_attack_identifier(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack( + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test" + ) with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -784,7 +786,9 @@ async def test_enrichment_skips_results_without_attack_identifier(self, mock_att atomic_attack_identifier=None, ) - atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack( + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test" + ) with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -808,7 +812,9 @@ async def test_enrichment_skips_out_of_range_index(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack( + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test" + ) with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: # Index 99 is out of range for seed_groups (only 1 element) @@ -847,7 +853,9 @@ async def test_enrichment_includes_all_seeds(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack( + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test" + ) with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -897,7 +905,9 @@ async def test_enrichment_maps_multiple_results_to_correct_seed_groups(self, moc ), ] - atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack( + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test" + ) with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results(results) @@ -934,7 +944,9 @@ async def test_enrichment_persists_to_db(self, mock_attack): atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack( + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test" + ) with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) @@ -974,7 +986,9 @@ async def test_enrichment_skips_db_update_when_no_attack_result_id(self, mock_at atomic_attack_identifier=build_atomic_attack_identifier(attack_identifier=attack_id), ) - atomic = AtomicAttack(attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test") + atomic = AtomicAttack( + attack_technique=AttackTechnique(attack=mock_attack), seed_groups=seed_groups, atomic_attack_name="test" + ) with patch.object(AttackExecutor, "execute_attack_from_seed_groups_async", new_callable=AsyncMock) as mock_exec: mock_exec.return_value = wrap_results([attack_result]) diff --git a/tests/unit/scenarios/test_attack_technique.py b/tests/unit/scenarios/test_attack_technique.py new file mode 100644 index 0000000000..dc3d85dd59 --- /dev/null +++ b/tests/unit/scenarios/test_attack_technique.py @@ -0,0 +1,61 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Tests for the AttackTechnique class.""" + +from unittest.mock import MagicMock + +from pyrit.executor.attack import AttackStrategy +from pyrit.models import SeedAttackTechniqueGroup, SeedPrompt +from pyrit.scenario.core.attack_technique import AttackTechnique + + +def _make_technique_seeds() -> SeedAttackTechniqueGroup: + return SeedAttackTechniqueGroup( + seeds=[ + SeedPrompt(value="technique1", data_type="text", is_general_technique=True), + SeedPrompt(value="technique2", data_type="text", is_general_technique=True), + ] + ) + + +class TestAttackTechniqueInit: + """Tests for AttackTechnique initialization.""" + + def test_init_with_attack_only(self): + mock_attack = MagicMock(spec=AttackStrategy) + technique = AttackTechnique(attack=mock_attack) + + assert technique.attack is mock_attack + assert technique.seed_technique is None + + def test_init_with_attack_and_seed_technique(self): + mock_attack = MagicMock(spec=AttackStrategy) + seed_technique = _make_technique_seeds() + technique = AttackTechnique(attack=mock_attack, seed_technique=seed_technique) + + assert technique.attack is mock_attack + assert technique.seed_technique is seed_technique + + def test_init_with_seed_technique_none_explicitly(self): + mock_attack = MagicMock(spec=AttackStrategy) + technique = AttackTechnique(attack=mock_attack, seed_technique=None) + + assert technique.seed_technique is None + + +class TestAttackTechniqueProperties: + """Tests for AttackTechnique property access.""" + + def test_attack_property_returns_same_instance(self): + mock_attack = MagicMock(spec=AttackStrategy) + technique = AttackTechnique(attack=mock_attack) + + assert technique.attack is technique.attack # same object each time + + def test_seed_technique_property_returns_same_instance(self): + mock_attack = MagicMock(spec=AttackStrategy) + seed_technique = _make_technique_seeds() + technique = AttackTechnique(attack=mock_attack, seed_technique=seed_technique) + + assert technique.seed_technique is technique.seed_technique diff --git a/tests/unit/scenarios/test_content_harms.py b/tests/unit/scenarios/test_content_harms.py index 93ca785ba9..d8494437f5 100644 --- a/tests/unit/scenarios/test_content_harms.py +++ b/tests/unit/scenarios/test_content_harms.py @@ -746,7 +746,7 @@ async def test_get_single_turn_attacks_returns_prompt_sending_and_role_play( attacks = scenario._get_single_turn_attacks(strategy="hate", seed_groups=seed_groups) assert len(attacks) == 2 - attack_types = [type(a._attack_technique.attack) for a in attacks] + attack_types = [type(a.attack_technique.attack) for a in attacks] assert PromptSendingAttack in attack_types assert RolePlayAttack in attack_types @@ -778,7 +778,7 @@ async def test_get_multi_turn_attacks_returns_many_shot_and_tap( attacks = scenario._get_multi_turn_attacks(strategy="hate", seed_groups=seed_groups) assert len(attacks) == 2 - attack_types = [type(a._attack_technique.attack) for a in attacks] + attack_types = [type(a.attack_technique.attack) for a in attacks] assert ManyShotJailbreakAttack in attack_types assert TreeOfAttacksWithPruningAttack in attack_types @@ -816,7 +816,7 @@ async def test_get_strategy_attacks_includes_all_groups( # 2 single-turn + 2 multi-turn = 4 assert len(attacks) == 4 - attack_types = [type(a._attack_technique.attack) for a in attacks] + attack_types = [type(a.attack_technique.attack) for a in attacks] assert PromptSendingAttack in attack_types assert RolePlayAttack in attack_types assert ManyShotJailbreakAttack in attack_types diff --git a/tests/unit/scenarios/test_cyber.py b/tests/unit/scenarios/test_cyber.py index 76a633ab04..b3bb6d76bd 100644 --- a/tests/unit/scenarios/test_cyber.py +++ b/tests/unit/scenarios/test_cyber.py @@ -210,7 +210,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_singleturn( @@ -234,7 +234,7 @@ async def test_attack_generation_for_singleturn( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, PromptSendingAttack) + assert isinstance(run.attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_multiturn( @@ -259,7 +259,7 @@ async def test_attack_generation_for_multiturn( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, RedTeamingAttack) + assert isinstance(run.attack_technique.attack, RedTeamingAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives( @@ -291,7 +291,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.usefixtures(*FIXTURES) diff --git a/tests/unit/scenarios/test_encoding.py b/tests/unit/scenarios/test_encoding.py index 33e98f4277..44d3e8f81d 100644 --- a/tests/unit/scenarios/test_encoding.py +++ b/tests/unit/scenarios/test_encoding.py @@ -238,7 +238,7 @@ async def test_get_atomic_attacks_async_returns_attacks( # Should return multiple atomic attacks (one for each encoding type) assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_get_converter_attacks_returns_multiple_encodings( @@ -280,7 +280,7 @@ async def test_get_prompt_attacks_creates_attack_runs( # Each attack run should have the correct attack type for run in attack_runs: - assert isinstance(run._attack_technique.attack, PromptSendingAttack) + assert isinstance(run.attack_technique.attack, PromptSendingAttack) assert len(run._seed_groups) == len(mock_seed_attack_groups) @pytest.mark.asyncio diff --git a/tests/unit/scenarios/test_jailbreak.py b/tests/unit/scenarios/test_jailbreak.py index ae1f383bef..00c5daec6c 100644 --- a/tests/unit/scenarios/test_jailbreak.py +++ b/tests/unit/scenarios/test_jailbreak.py @@ -205,7 +205,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_simple( @@ -220,7 +220,7 @@ async def test_attack_generation_for_simple( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, PromptSendingAttack) + assert isinstance(run.attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_complex( @@ -235,7 +235,9 @@ async def test_attack_generation_for_complex( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, (RolePlayAttack, ManyShotJailbreakAttack, SkeletonKeyAttack)) + assert isinstance( + run.attack_technique.attack, (RolePlayAttack, ManyShotJailbreakAttack, SkeletonKeyAttack) + ) @pytest.mark.asyncio async def test_attack_generation_for_manyshot( @@ -250,7 +252,7 @@ async def test_attack_generation_for_manyshot( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, ManyShotJailbreakAttack) + assert isinstance(run.attack_technique.attack, ManyShotJailbreakAttack) @pytest.mark.asyncio async def test_attack_generation_for_promptsending( @@ -265,7 +267,7 @@ async def test_attack_generation_for_promptsending( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, PromptSendingAttack) + assert isinstance(run.attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_skeleton( @@ -280,7 +282,7 @@ async def test_attack_generation_for_skeleton( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, SkeletonKeyAttack) + assert isinstance(run.attack_technique.attack, SkeletonKeyAttack) @pytest.mark.asyncio async def test_attack_generation_for_roleplay( @@ -295,7 +297,7 @@ async def test_attack_generation_for_roleplay( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, RolePlayAttack) + assert isinstance(run.attack_technique.attack, RolePlayAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives( @@ -327,7 +329,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_get_all_jailbreak_templates( @@ -485,5 +487,5 @@ async def test_roleplay_attacks_share_adversarial_target( assert len(atomic_attacks) >= 2 # All role-play attacks should share the same adversarial chat target - adversarial_targets = [run._attack_technique.attack._adversarial_chat for run in atomic_attacks] + adversarial_targets = [run.attack_technique.attack._adversarial_chat for run in atomic_attacks] assert all(t is adversarial_targets[0] for t in adversarial_targets) diff --git a/tests/unit/scenarios/test_leakage_scenario.py b/tests/unit/scenarios/test_leakage_scenario.py index 832097d01e..213e36b2d5 100644 --- a/tests/unit/scenarios/test_leakage_scenario.py +++ b/tests/unit/scenarios/test_leakage_scenario.py @@ -230,7 +230,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_first_letter( @@ -254,7 +254,7 @@ async def test_attack_generation_for_first_letter( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, PromptSendingAttack) + assert isinstance(run.attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_crescendo( @@ -274,7 +274,7 @@ async def test_attack_generation_for_crescendo( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, CrescendoAttack) + assert isinstance(run.attack_technique.attack, CrescendoAttack) @pytest.mark.asyncio async def test_attack_generation_for_image( @@ -293,7 +293,7 @@ async def test_attack_generation_for_image( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, PromptSendingAttack) + assert isinstance(run.attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio async def test_attack_generation_for_role_play( @@ -312,7 +312,7 @@ async def test_attack_generation_for_role_play( ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, RolePlayAttack) + assert isinstance(run.attack_technique.attack, RolePlayAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives( @@ -346,7 +346,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_unknown_strategy_raises_value_error( @@ -604,7 +604,7 @@ async def test_image_strategy_uses_add_image_text_converter( # Verify the attack uses AddImageTextConverter for attack in atomic_attacks: - converters = attack._attack_technique.attack._request_converters + converters = attack.attack_technique.attack._request_converters assert len(converters) > 0 # Check that the first converter is AddImageTextConverter first_converter = converters[0].converters[0] diff --git a/tests/unit/scenarios/test_psychosocial_harms.py b/tests/unit/scenarios/test_psychosocial_harms.py index d3c4d5e9d9..039f8dfc74 100644 --- a/tests/unit/scenarios/test_psychosocial_harms.py +++ b/tests/unit/scenarios/test_psychosocial_harms.py @@ -229,7 +229,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_imminent_crisis_async( @@ -252,7 +252,7 @@ async def test_attack_generation_for_imminent_crisis_async( atomic_attacks = await scenario._get_atomic_attacks_async() # Should have both single-turn and multi-turn attacks - attack_types = [type(run._attack_technique.attack) for run in atomic_attacks] + attack_types = [type(run.attack_technique.attack) for run in atomic_attacks] assert any(issubclass(attack_type, (PromptSendingAttack, RolePlayAttack)) for attack_type in attack_types) assert any(issubclass(attack_type, CrescendoAttack) for attack_type in attack_types) @@ -296,7 +296,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.usefixtures(*FIXTURES) diff --git a/tests/unit/scenarios/test_scam.py b/tests/unit/scenarios/test_scam.py index b5d658d9c0..08923aa4ca 100644 --- a/tests/unit/scenarios/test_scam.py +++ b/tests/unit/scenarios/test_scam.py @@ -225,7 +225,7 @@ async def test_attack_generation_for_all( atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.asyncio async def test_attack_generation_for_singleturn_async( @@ -249,7 +249,7 @@ async def test_attack_generation_for_singleturn_async( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, (ContextComplianceAttack, RolePlayAttack)) + assert isinstance(run.attack_technique.attack, (ContextComplianceAttack, RolePlayAttack)) @pytest.mark.asyncio async def test_attack_generation_for_multiturn_async( @@ -268,7 +268,7 @@ async def test_attack_generation_for_multiturn_async( atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: - assert isinstance(run._attack_technique.attack, RedTeamingAttack) + assert isinstance(run.attack_technique.attack, RedTeamingAttack) @pytest.mark.asyncio async def test_attack_runs_include_objectives_async( @@ -308,7 +308,7 @@ async def test_get_atomic_attacks_async_returns_attacks( await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) > 0 - assert all(hasattr(run, "_attack_technique") for run in atomic_attacks) + assert all(run.attack_technique is not None for run in atomic_attacks) @pytest.mark.usefixtures(*FIXTURES) From c7e10edf41f4199db309e2305b0d15afd15b37c2 Mon Sep 17 00:00:00 2001 From: Richard Lundeen Date: Fri, 10 Apr 2026 19:52:55 -0700 Subject: [PATCH 3/4] refactoring identifiers --- pyrit/backend/services/attack_service.py | 15 +++- pyrit/identifiers/__init__.py | 2 +- pyrit/identifiers/atomic_attack_identifier.py | 71 ++++++++++++------- pyrit/identifiers/evaluation_identifier.py | 12 ++-- pyrit/memory/azure_sql_memory.py | 8 +-- pyrit/memory/memory_interface.py | 4 +- pyrit/memory/sqlite_memory.py | 9 +-- pyrit/models/attack_result.py | 12 +++- pyrit/scenario/core/atomic_attack.py | 7 +- pyrit/scenario/core/attack_technique.py | 25 ++++++- .../test_atomic_attack_identifier.py | 43 +++++------ .../test_interface_attack_results.py | 2 +- tests/unit/models/test_seed_group.py | 10 +-- tests/unit/scenarios/test_atomic_attack.py | 21 +++--- tests/unit/scenarios/test_attack_technique.py | 70 ++++++++++++++++++ 15 files changed, 226 insertions(+), 85 deletions(-) diff --git a/pyrit/backend/services/attack_service.py b/pyrit/backend/services/attack_service.py index 9437632392..77859ee48e 100644 --- a/pyrit/backend/services/attack_service.py +++ b/pyrit/backend/services/attack_service.py @@ -738,7 +738,20 @@ async def _update_attack_after_message_async( if ar.atomic_attack_identifier: atomic = ComponentIdentifier.from_dict(ar.atomic_attack_identifier.to_dict()) atomic_children = dict(atomic.children) - atomic_children["attack"] = new_aid + # Navigate into attack_technique child to update the nested attack child. + technique = atomic_children.get("attack_technique") + if technique is not None: + tech_children = dict(technique.children) + tech_children["attack"] = new_aid + atomic_children["attack_technique"] = ComponentIdentifier( + class_name=technique.class_name, + class_module=technique.class_module, + params=dict(technique.params), + children=tech_children, + ) + else: + # Fallback for pre-nesting rows with children["attack"] directly. + atomic_children["attack"] = new_aid new_atomic = ComponentIdentifier( class_name=atomic.class_name, class_module=atomic.class_module, diff --git a/pyrit/identifiers/__init__.py b/pyrit/identifiers/__init__.py index 90b1aa52ed..a5eabdb0b7 100644 --- a/pyrit/identifiers/__init__.py +++ b/pyrit/identifiers/__init__.py @@ -24,8 +24,8 @@ __all__ = [ "AtomicAttackEvaluationIdentifier", "build_atomic_attack_identifier", - "ChildEvalRule", "build_seed_identifier", + "ChildEvalRule", "class_name_to_snake_case", "ComponentIdentifier", "compute_eval_hash", diff --git a/pyrit/identifiers/atomic_attack_identifier.py b/pyrit/identifiers/atomic_attack_identifier.py index 3cb6169c08..cc2a0ae86c 100644 --- a/pyrit/identifiers/atomic_attack_identifier.py +++ b/pyrit/identifiers/atomic_attack_identifier.py @@ -8,21 +8,22 @@ by combining the attack strategy's identity with the seed identifiers from the dataset. -The composite identifier has this shape: - class_name = "AtomicAttack" - children["attack"] = attack strategy's ComponentIdentifier - children["technique_seeds"] = list of technique-only seed ComponentIdentifiers (optional) - children["seeds"] = list of ALL seed ComponentIdentifiers (for traceability) +The composite identifier has this shape:: + + AtomicAttack + ├── attack_technique (class_name="AttackTechnique") + │ ├── attack (attack strategy's ComponentIdentifier) + │ └── technique_seeds (optional, list of seed ComponentIdentifiers) + └── seed_group (list of ALL seed ComponentIdentifiers, for traceability) """ import logging -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any from pyrit.identifiers.component_identifier import ComponentIdentifier if TYPE_CHECKING: from pyrit.models.seeds.seed import Seed - from pyrit.models.seeds.seed_attack_technique_group import SeedAttackTechniqueGroup from pyrit.models.seeds.seed_group import SeedGroup logger = logging.getLogger(__name__) @@ -31,6 +32,9 @@ _ATOMIC_ATTACK_CLASS_NAME = "AtomicAttack" _ATOMIC_ATTACK_CLASS_MODULE = "pyrit.scenario.core.atomic_attack" +_ATTACK_TECHNIQUE_CLASS_NAME = "AttackTechnique" +_ATTACK_TECHNIQUE_CLASS_MODULE = "pyrit.scenario.core.attack_technique" + def build_seed_identifier(seed: "Seed") -> ComponentIdentifier: """ @@ -62,44 +66,59 @@ def build_seed_identifier(seed: "Seed") -> ComponentIdentifier: def build_atomic_attack_identifier( *, - attack_identifier: ComponentIdentifier, - seed_group: Optional["SeedGroup"] = None, - seed_technique: Optional["SeedAttackTechniqueGroup"] = None, + technique_identifier: ComponentIdentifier | None = None, + attack_identifier: ComponentIdentifier | None = None, + seed_group: "SeedGroup | None" = None, ) -> ComponentIdentifier: """ Build a composite ComponentIdentifier for an atomic attack. - The identifier always includes the attack strategy as ``children["attack"]`` - and all seeds from the seed group in ``children["seeds"]`` for traceability. + The identifier places the attack technique in ``children["attack_technique"]`` + and all seeds from the seed group in ``children["seed_group"]`` for traceability. - When ``seed_technique`` is provided, its seeds are also included as - ``children["technique_seeds"]``. These represent the reusable "how to attack" - portion and are included in eval-hash computation, while ``seeds`` is excluded - from the eval hash. + Callers that have an ``AttackTechnique`` object should pass + ``technique_identifier=attack_technique.get_identifier()``. + Callers that only have a raw attack strategy identifier (e.g. legacy + backward-compat paths) can pass ``attack_identifier`` instead, which is + wrapped in a minimal technique node automatically. Args: - attack_identifier: The attack strategy's identifier. + technique_identifier: Pre-built technique identifier from + ``AttackTechnique.get_identifier()``. Mutually exclusive with + ``attack_identifier``. + attack_identifier: Raw attack strategy identifier. Used when no + ``AttackTechnique`` instance is available. Mutually exclusive + with ``technique_identifier``. seed_group: The seed group to extract all seeds from. - seed_technique: Optional technique seed group whose seeds are added - as a separate ``technique_seeds`` child. Returns: A composite ComponentIdentifier with class_name="AtomicAttack". + + Raises: + ValueError: If both or neither of ``technique_identifier`` and + ``attack_identifier`` are provided. """ + if technique_identifier is not None and attack_identifier is not None: + raise ValueError("Provide technique_identifier or attack_identifier, not both") + + if technique_identifier is None: + if attack_identifier is None: + raise ValueError("Either technique_identifier or attack_identifier must be provided") + technique_identifier = ComponentIdentifier( + class_name=_ATTACK_TECHNIQUE_CLASS_NAME, + class_module=_ATTACK_TECHNIQUE_CLASS_MODULE, + children={"attack": attack_identifier}, + ) + seed_identifiers: list[ComponentIdentifier] = [] if seed_group is not None: seed_identifiers.extend(build_seed_identifier(seed) for seed in seed_group.seeds) children: dict[str, Any] = { - "attack": attack_identifier, - "seeds": seed_identifiers, + "attack_technique": technique_identifier, + "seed_group": seed_identifiers, } - if seed_technique is not None: - technique_seed_ids = [build_seed_identifier(seed) for seed in seed_technique.seeds] - if technique_seed_ids: - children["technique_seeds"] = technique_seed_ids - return ComponentIdentifier( class_name=_ATOMIC_ATTACK_CLASS_NAME, class_module=_ATOMIC_ATTACK_CLASS_MODULE, diff --git a/pyrit/identifiers/evaluation_identifier.py b/pyrit/identifiers/evaluation_identifier.py index 96886d81a4..3a13228d6b 100644 --- a/pyrit/identifiers/evaluation_identifier.py +++ b/pyrit/identifiers/evaluation_identifier.py @@ -220,11 +220,14 @@ class AtomicAttackEvaluationIdentifier(EvaluationIdentifier): Per-child rules: + * ``seed_group`` — excluded entirely (present for traceability only). + * ``attack_technique`` — not listed, so fully included by default. + Its nested children (``objective_target``, ``adversarial_chat``, + ``objective_scorer``, ``technique_seeds``) are processed recursively + using the same rules dict, so the rules below apply at any depth. * ``objective_target`` — include only ``temperature``. * ``adversarial_chat`` — include ``model_name``, ``temperature``, ``top_p``. * ``objective_scorer`` — excluded entirely. - * ``seeds`` — excluded entirely (present for traceability only). - * ``technique_seeds`` — not listed, so fully included by default. Non-target children (e.g., ``request_converters``, ``response_converters``, ``technique_seeds``) receive full recursive eval treatment. @@ -238,6 +241,7 @@ class AtomicAttackEvaluationIdentifier(EvaluationIdentifier): included_params=frozenset({"model_name", "temperature", "top_p"}), ), "objective_scorer": ChildEvalRule(exclude=True), - "seeds": ChildEvalRule(exclude=True), - # technique_seeds: intentionally omitted — fully included in eval hash. + "seed_group": ChildEvalRule(exclude=True), + # attack_technique: intentionally omitted — fully included in eval hash. + # technique_seeds (inside attack_technique): also omitted — fully included. } diff --git a/pyrit/memory/azure_sql_memory.py b/pyrit/memory/azure_sql_memory.py index a5e77bb361..abd5916954 100644 --- a/pyrit/memory/azure_sql_memory.py +++ b/pyrit/memory/azure_sql_memory.py @@ -483,11 +483,11 @@ def get_unique_attack_class_names(self) -> list[str]: rows = session.execute( text( """SELECT DISTINCT JSON_VALUE(atomic_attack_identifier, - '$.children.attack.class_name') AS cls + '$.children.attack_technique.children.attack.class_name') AS cls FROM "AttackResultEntries" WHERE ISJSON(atomic_attack_identifier) = 1 AND JSON_VALUE(atomic_attack_identifier, - '$.children.attack.class_name') IS NOT NULL""" + '$.children.attack_technique.children.attack.class_name') IS NOT NULL""" ) ).fetchall() return sorted(row[0] for row in rows) @@ -495,7 +495,7 @@ def get_unique_attack_class_names(self) -> list[str]: def get_unique_converter_class_names(self) -> list[str]: """ Azure SQL implementation: extract unique converter class_name values - from the children.attack.children.request_converters array + from the children.attack_technique.children.attack.children.request_converters array in the atomic_attack_identifier JSON column. Returns: @@ -507,7 +507,7 @@ def get_unique_converter_class_names(self) -> list[str]: """SELECT DISTINCT JSON_VALUE(c.value, '$.class_name') AS cls FROM "AttackResultEntries" CROSS APPLY OPENJSON(JSON_QUERY(atomic_attack_identifier, - '$.children.attack.children.request_converters')) AS c + '$.children.attack_technique.children.attack.children.request_converters')) AS c WHERE ISJSON(atomic_attack_identifier) = 1 AND JSON_VALUE(c.value, '$.class_name') IS NOT NULL""" ) diff --git a/pyrit/memory/memory_interface.py b/pyrit/memory/memory_interface.py index 49e7dbca77..023045a5c3 100644 --- a/pyrit/memory/memory_interface.py +++ b/pyrit/memory/memory_interface.py @@ -1537,7 +1537,7 @@ def get_attack_results( conditions.append( self._get_condition_json_property_match( json_column=AttackResultEntry.atomic_attack_identifier, - property_path="$.children.attack.class_name", + property_path="$.children.attack_technique.children.attack.class_name", value=attack_class, case_sensitive=True, ) @@ -1549,7 +1549,7 @@ def get_attack_results( conditions.append( self._get_condition_json_array_match( json_column=AttackResultEntry.atomic_attack_identifier, - property_path="$.children.attack.children.request_converters", + property_path="$.children.attack_technique.children.attack.children.request_converters", array_element_path="$.class_name", array_to_match=converter_classes, ) diff --git a/pyrit/memory/sqlite_memory.py b/pyrit/memory/sqlite_memory.py index da28bc01b6..ab35647955 100644 --- a/pyrit/memory/sqlite_memory.py +++ b/pyrit/memory/sqlite_memory.py @@ -622,7 +622,8 @@ def get_unique_attack_class_names(self) -> list[str]: """ with closing(self.get_session()) as session: class_name_expr = func.json_extract( - AttackResultEntry.atomic_attack_identifier, "$.children.attack.class_name" + AttackResultEntry.atomic_attack_identifier, + "$.children.attack_technique.children.attack.class_name", ) rows = session.query(class_name_expr).filter(class_name_expr.isnot(None)).distinct().all() return sorted(row[0] for row in rows) @@ -630,8 +631,8 @@ def get_unique_attack_class_names(self) -> list[str]: def get_unique_converter_class_names(self) -> list[str]: """ SQLite implementation: extract unique converter class_name values - from the children.attack.children.request_converters array in the - atomic_attack_identifier JSON column. + from the children.attack_technique.children.attack.children.request_converters + array in the atomic_attack_identifier JSON column. Returns: Sorted list of unique converter class name strings. @@ -643,7 +644,7 @@ def get_unique_converter_class_names(self) -> list[str]: FROM "AttackResultEntries", json_each( json_extract("AttackResultEntries".atomic_attack_identifier, - '$.children.attack.children.request_converters') + '$.children.attack_technique.children.attack.children.request_converters') ) AS j WHERE cls IS NOT NULL""" ) diff --git a/pyrit/models/attack_result.py b/pyrit/models/attack_result.py index 83a7fa1a53..a385ac36e7 100644 --- a/pyrit/models/attack_result.py +++ b/pyrit/models/attack_result.py @@ -113,15 +113,23 @@ def get_attack_strategy_identifier(self) -> Optional[ComponentIdentifier]: Return the attack strategy identifier from the composite atomic identifier. This is the non-deprecated replacement for the ``attack_identifier`` property. - Extracts and returns the ``"attack"`` child from ``atomic_attack_identifier``. + Extracts the ``"attack"`` child from the nested ``"attack_technique"`` child + of ``atomic_attack_identifier``. + + Falls back to ``children["attack"]`` for rows created before the nested + structure was introduced. Returns: Optional[ComponentIdentifier]: The attack strategy identifier, or ``None`` if - ``atomic_attack_identifier`` is not set. + ``atomic_attack_identifier`` is not set or the expected children are missing. """ if self.atomic_attack_identifier is None: return None + technique = self.atomic_attack_identifier.get_child("attack_technique") + if technique is not None: + return technique.get_child("attack") + # Fallback for pre-nesting rows that had children["attack"] directly. return self.atomic_attack_identifier.get_child("attack") def get_conversations_by_type(self, conversation_type: ConversationType) -> list[ConversationReference]: diff --git a/pyrit/scenario/core/atomic_attack.py b/pyrit/scenario/core/atomic_attack.py index e23c801fc9..df4f409a04 100644 --- a/pyrit/scenario/core/atomic_attack.py +++ b/pyrit/scenario/core/atomic_attack.py @@ -258,15 +258,12 @@ def _enrich_atomic_attack_identifiers(self, *, results: AttackExecutorResult[Att results: The execution results to enrich. """ memory = CentralMemory.get_memory_instance() - seed_technique = self._attack_technique.seed_technique for result, idx in zip(results.completed_results, results.input_indices, strict=True): - attack_strategy_id = result.get_attack_strategy_identifier() - if attack_strategy_id and idx < len(self._seed_groups): + if idx < len(self._seed_groups): result.atomic_attack_identifier = build_atomic_attack_identifier( - attack_identifier=attack_strategy_id, + technique_identifier=self._attack_technique.get_identifier(), seed_group=self._seed_groups[idx], - seed_technique=seed_technique, ) # Persist the enriched identifier back to the database. diff --git a/pyrit/scenario/core/attack_technique.py b/pyrit/scenario/core/attack_technique.py index 52b5b943e6..52d0ab8dd2 100644 --- a/pyrit/scenario/core/attack_technique.py +++ b/pyrit/scenario/core/attack_technique.py @@ -12,10 +12,11 @@ from typing import Any from pyrit.executor.attack import AttackStrategy +from pyrit.identifiers import ComponentIdentifier, Identifiable, build_seed_identifier from pyrit.models import SeedAttackTechniqueGroup -class AttackTechnique: +class AttackTechnique(Identifiable): """ Bundles an attack strategy with an optional technique seed group. @@ -39,3 +40,25 @@ def attack(self) -> AttackStrategy[Any, Any]: @property def seed_technique(self) -> SeedAttackTechniqueGroup | None: return self._seed_technique + + def _build_identifier(self) -> ComponentIdentifier: + """ + Build the behavioral identity for this attack technique. + + The identifier always contains the attack strategy as ``children["attack"]``. + When a seed technique is present, its seeds are added as + ``children["technique_seeds"]``. + + Returns: + ComponentIdentifier: The frozen identity snapshot. + """ + children: dict[str, Any] = { + "attack": self._attack.get_identifier(), + } + + if self._seed_technique is not None: + technique_seed_ids = [build_seed_identifier(seed) for seed in self._seed_technique.seeds] + if technique_seed_ids: + children["technique_seeds"] = technique_seed_ids + + return ComponentIdentifier.of(self, children=children) diff --git a/tests/unit/identifiers/test_atomic_attack_identifier.py b/tests/unit/identifiers/test_atomic_attack_identifier.py index db68637e62..c7986a7666 100644 --- a/tests/unit/identifiers/test_atomic_attack_identifier.py +++ b/tests/unit/identifiers/test_atomic_attack_identifier.py @@ -19,13 +19,6 @@ def __init__(self, *, seeds: list): self.seeds = seeds -class _FakeSeedTechniqueGroup: - """Minimal stub for SeedAttackTechniqueGroup with a seeds list.""" - - def __init__(self, *, seeds: list): - self.seeds = seeds - - # --------------------------------------------------------------------------- # Helpers shared across test classes # --------------------------------------------------------------------------- @@ -129,18 +122,20 @@ def test_class_module_is_correct(self): result = build_atomic_attack_identifier(attack_identifier=_make_attack()) assert result.class_module == "pyrit.scenario.core.atomic_attack" - def test_attack_child_is_present(self): + def test_attack_technique_child_is_present(self): attack_id = _make_attack() result = build_atomic_attack_identifier(attack_identifier=attack_id) - assert result.children["attack"] == attack_id + technique = result.children["attack_technique"] + assert technique.class_name == "AttackTechnique" + assert technique.children["attack"] == attack_id - def test_no_seed_group_empty_seeds(self): + def test_no_seed_group_empty_seed_group(self): result = build_atomic_attack_identifier(attack_identifier=_make_attack()) - assert result.children["seeds"] == [] + assert result.children["seed_group"] == [] - def test_empty_seed_group_empty_seeds(self): + def test_empty_seed_group_empty_seed_group(self): result = build_atomic_attack_identifier(attack_identifier=_make_attack(), seed_group=_FakeSeedGroup(seeds=[])) - assert result.children["seeds"] == [] + assert result.children["seed_group"] == [] def test_includes_all_seeds(self): general_seed = SeedPrompt(value="technique", value_sha256="abc", is_general_technique=True) @@ -149,7 +144,7 @@ def test_includes_all_seeds(self): attack_identifier=_make_attack(), seed_group=_FakeSeedGroup(seeds=[general_seed, non_general_seed]), ) - seed_ids = result.children["seeds"] + seed_ids = result.children["seed_group"] assert len(seed_ids) == 2 assert seed_ids[0].params.get("value_sha256") == "abc" assert seed_ids[0].params.get("is_general_technique") is True @@ -163,7 +158,7 @@ def test_multiple_seeds(self): attack_identifier=_make_attack(), seed_group=_FakeSeedGroup(seeds=[seed1, seed2]), ) - assert len(result.children["seeds"]) == 2 + assert len(result.children["seed_group"]) == 2 def test_deterministic_hash(self): attack_id = _make_attack() @@ -224,8 +219,8 @@ def test_objective_scorer_excluded(self): rule = AtomicAttackEvaluationIdentifier.CHILD_EVAL_RULES["objective_scorer"] assert rule.exclude is True - def test_seeds_rule(self): - rule = AtomicAttackEvaluationIdentifier.CHILD_EVAL_RULES["seeds"] + def test_seed_group_rule(self): + rule = AtomicAttackEvaluationIdentifier.CHILD_EVAL_RULES["seed_group"] assert rule.exclude is True # -- Basic properties -------------------------------------------------- @@ -372,12 +367,18 @@ def test_different_technique_seeds_different_eval_hash(self): attack_id = _make_attack() seed1 = SeedPrompt(value="tech1", value_sha256="aaa", is_general_technique=True) seed2 = SeedPrompt(value="tech2", value_sha256="bbb", is_general_technique=True) - c1 = build_atomic_attack_identifier( - attack_identifier=attack_id, seed_technique=_FakeSeedTechniqueGroup(seeds=[seed1]) + technique1 = ComponentIdentifier( + class_name="AttackTechnique", + class_module="pyrit.scenario.core.attack_technique", + children={"attack": attack_id, "technique_seeds": [build_seed_identifier(seed1)]}, ) - c2 = build_atomic_attack_identifier( - attack_identifier=attack_id, seed_technique=_FakeSeedTechniqueGroup(seeds=[seed2]) + technique2 = ComponentIdentifier( + class_name="AttackTechnique", + class_module="pyrit.scenario.core.attack_technique", + children={"attack": attack_id, "technique_seeds": [build_seed_identifier(seed2)]}, ) + c1 = build_atomic_attack_identifier(technique_identifier=technique1) + c2 = build_atomic_attack_identifier(technique_identifier=technique2) assert AtomicAttackEvaluationIdentifier(c1).eval_hash != AtomicAttackEvaluationIdentifier(c2).eval_hash def test_seeds_in_seed_group_ignored_in_eval_hash(self): diff --git a/tests/unit/memory/memory_interface/test_interface_attack_results.py b/tests/unit/memory/memory_interface/test_interface_attack_results.py index 91811ec3ad..2e30ba368a 100644 --- a/tests/unit/memory/memory_interface/test_interface_attack_results.py +++ b/tests/unit/memory/memory_interface/test_interface_attack_results.py @@ -1388,7 +1388,7 @@ def test_get_attack_results_by_attack_identifier_filter_class_name(sqlite_instan identifier_filters=[ IdentifierFilter( identifier_type=IdentifierType.ATTACK, - property_path="$.children.attack.class_name", + property_path="$.children.attack_technique.children.attack.class_name", value="Crescendo", partial_match=True, ) diff --git a/tests/unit/models/test_seed_group.py b/tests/unit/models/test_seed_group.py index 2f04d2bfd7..8df31a9df3 100644 --- a/tests/unit/models/test_seed_group.py +++ b/tests/unit/models/test_seed_group.py @@ -566,16 +566,18 @@ def test_insert_at_position(self): assert merged.seeds[3].value == "prompt1" def test_insert_at_zero(self): - """Test that insertion_index=0 prepends technique seeds.""" + """Test insertion_index=0: technique seeds appear right after the objective + because SeedAttackGroup always places the objective first.""" base = self._make_base_group() technique = self._make_technique(insertion_index=0) merged = base.with_technique(technique=technique) assert len(merged.seeds) == 4 - assert merged.seeds[0].value == "tech_a" - assert merged.seeds[1].value == "tech_b" - assert merged.seeds[2].value == "objective" + # Objective is re-sorted to front by the constructor's canonical ordering + assert merged.seeds[0].value == "objective" + assert merged.seeds[1].value == "tech_a" + assert merged.seeds[2].value == "tech_b" assert merged.seeds[3].value == "prompt1" def test_insert_beyond_length_appends(self): diff --git a/tests/unit/scenarios/test_atomic_attack.py b/tests/unit/scenarios/test_atomic_attack.py index 8c4b7f273a..6d7691b377 100644 --- a/tests/unit/scenarios/test_atomic_attack.py +++ b/tests/unit/scenarios/test_atomic_attack.py @@ -769,12 +769,13 @@ async def test_enrichment_populates_atomic_attack_identifier(self, mock_attack): enriched = result.completed_results[0] assert enriched.atomic_attack_identifier is not None assert enriched.atomic_attack_identifier.class_name == "AtomicAttack" - assert "attack" in enriched.atomic_attack_identifier.children - assert "seeds" in enriched.atomic_attack_identifier.children + assert "attack_technique" in enriched.atomic_attack_identifier.children + assert "seed_group" in enriched.atomic_attack_identifier.children @pytest.mark.asyncio - async def test_enrichment_skips_results_without_attack_identifier(self, mock_attack): - """Test that enrichment is skipped when result has no attack_identifier.""" + async def test_enrichment_populates_even_when_result_has_no_prior_identifier(self, mock_attack): + """Test that enrichment works even when result has no prior atomic_attack_identifier, + since AttackTechnique.get_identifier() is self-contained.""" seed_groups = [ SeedAttackGroup(seeds=[SeedObjective(value="obj1"), SeedPrompt(value="p1")]), ] @@ -794,8 +795,10 @@ async def test_enrichment_skips_results_without_attack_identifier(self, mock_att mock_exec.return_value = wrap_results([attack_result]) result = await atomic.run_async() - # Should not be enriched (no attack_identifier to build from) - assert result.completed_results[0].atomic_attack_identifier is None + # Should be enriched — technique provides its own identifier + enriched = result.completed_results[0] + assert enriched.atomic_attack_identifier is not None + assert enriched.atomic_attack_identifier.class_name == "AtomicAttack" @pytest.mark.asyncio async def test_enrichment_skips_out_of_range_index(self, mock_attack): @@ -863,7 +866,7 @@ async def test_enrichment_includes_all_seeds(self, mock_attack): enriched = result.completed_results[0].atomic_attack_identifier assert enriched is not None - seed_ids = enriched.children["seeds"] + seed_ids = enriched.children["seed_group"] # All three seeds (objective + technique + non_technique) should be present assert len(seed_ids) == 3 sha_values = [s.params.get("value_sha256") for s in seed_ids] @@ -915,12 +918,12 @@ async def test_enrichment_maps_multiple_results_to_correct_seed_groups(self, moc # First result should have hash_a seed enriched_0 = result.completed_results[0].atomic_attack_identifier - seed_sha_values_0 = [s.params.get("value_sha256") for s in enriched_0.children["seeds"]] + seed_sha_values_0 = [s.params.get("value_sha256") for s in enriched_0.children["seed_group"]] assert "hash_a" in seed_sha_values_0 # Second result should have hash_b seed enriched_1 = result.completed_results[1].atomic_attack_identifier - seed_sha_values_1 = [s.params.get("value_sha256") for s in enriched_1.children["seeds"]] + seed_sha_values_1 = [s.params.get("value_sha256") for s in enriched_1.children["seed_group"]] assert "hash_b" in seed_sha_values_1 @pytest.mark.asyncio diff --git a/tests/unit/scenarios/test_attack_technique.py b/tests/unit/scenarios/test_attack_technique.py index dc3d85dd59..57f85801aa 100644 --- a/tests/unit/scenarios/test_attack_technique.py +++ b/tests/unit/scenarios/test_attack_technique.py @@ -6,6 +6,7 @@ from unittest.mock import MagicMock from pyrit.executor.attack import AttackStrategy +from pyrit.identifiers import ComponentIdentifier from pyrit.models import SeedAttackTechniqueGroup, SeedPrompt from pyrit.scenario.core.attack_technique import AttackTechnique @@ -59,3 +60,72 @@ def test_seed_technique_property_returns_same_instance(self): technique = AttackTechnique(attack=mock_attack, seed_technique=seed_technique) assert technique.seed_technique is technique.seed_technique + + +class TestAttackTechniqueIdentifier: + """Tests for AttackTechnique.get_identifier() (Identifiable).""" + + def test_get_identifier_returns_component_identifier(self): + mock_attack = MagicMock(spec=AttackStrategy) + mock_attack.get_identifier.return_value = ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ) + technique = AttackTechnique(attack=mock_attack) + + result = technique.get_identifier() + assert isinstance(result, ComponentIdentifier) + + def test_class_name_and_module(self): + mock_attack = MagicMock(spec=AttackStrategy) + mock_attack.get_identifier.return_value = ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ) + technique = AttackTechnique(attack=mock_attack) + + result = technique.get_identifier() + assert result.class_name == "AttackTechnique" + assert result.class_module == "pyrit.scenario.core.attack_technique" + + def test_attack_child_is_present(self): + attack_id = ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ) + mock_attack = MagicMock(spec=AttackStrategy) + mock_attack.get_identifier.return_value = attack_id + technique = AttackTechnique(attack=mock_attack) + + result = technique.get_identifier() + assert result.children["attack"] == attack_id + + def test_no_technique_seeds_when_none(self): + mock_attack = MagicMock(spec=AttackStrategy) + mock_attack.get_identifier.return_value = ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ) + technique = AttackTechnique(attack=mock_attack) + + result = technique.get_identifier() + assert "technique_seeds" not in result.children + + def test_technique_seeds_present_when_provided(self): + mock_attack = MagicMock(spec=AttackStrategy) + mock_attack.get_identifier.return_value = ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ) + seed_technique = _make_technique_seeds() + technique = AttackTechnique(attack=mock_attack, seed_technique=seed_technique) + + result = technique.get_identifier() + assert "technique_seeds" in result.children + assert len(result.children["technique_seeds"]) == 2 + + def test_identifier_is_cached(self): + mock_attack = MagicMock(spec=AttackStrategy) + mock_attack.get_identifier.return_value = ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ) + technique = AttackTechnique(attack=mock_attack) + + first = technique.get_identifier() + second = technique.get_identifier() + assert first is second From 6a3b9f769cfe14c96015cdeaccd61ae70541852d Mon Sep 17 00:00:00 2001 From: Richard Lundeen Date: Fri, 10 Apr 2026 20:11:34 -0700 Subject: [PATCH 4/4] pre-commit --- pyrit/backend/services/attack_service.py | 2 +- pyrit/scenario/core/attack_technique.py | 17 ++++++++++++----- tests/unit/scenarios/test_attack_technique.py | 4 +--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pyrit/backend/services/attack_service.py b/pyrit/backend/services/attack_service.py index 77859ee48e..8852071a66 100644 --- a/pyrit/backend/services/attack_service.py +++ b/pyrit/backend/services/attack_service.py @@ -740,7 +740,7 @@ async def _update_attack_after_message_async( atomic_children = dict(atomic.children) # Navigate into attack_technique child to update the nested attack child. technique = atomic_children.get("attack_technique") - if technique is not None: + if isinstance(technique, ComponentIdentifier): tech_children = dict(technique.children) tech_children["attack"] = new_aid atomic_children["attack_technique"] = ComponentIdentifier( diff --git a/pyrit/scenario/core/attack_technique.py b/pyrit/scenario/core/attack_technique.py index 52d0ab8dd2..cac6b8b9cb 100644 --- a/pyrit/scenario/core/attack_technique.py +++ b/pyrit/scenario/core/attack_technique.py @@ -9,19 +9,23 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any -from pyrit.executor.attack import AttackStrategy from pyrit.identifiers import ComponentIdentifier, Identifiable, build_seed_identifier -from pyrit.models import SeedAttackTechniqueGroup + +if TYPE_CHECKING: + from pyrit.executor.attack import AttackStrategy + from pyrit.models import SeedAttackTechniqueGroup class AttackTechnique(Identifiable): """ Bundles an attack strategy with an optional technique seed group. - This cleanly separates "how to attack" (the strategy + reusable technique seeds) - from "what to attack" (the objective, which lives on SeedAttackGroup / AtomicAttack). + An AttackTechnique encapsulates the full attack configuration — the strategy + (including its target, converters, and scorer) plus any reusable technique seeds + (e.g. jailbreak templates). The objectives that define which weaknesses to probe + live separately on the SeedAttackGroup / AtomicAttack. """ def __init__( @@ -30,15 +34,18 @@ def __init__( attack: AttackStrategy[Any, Any], seed_technique: SeedAttackTechniqueGroup | None = None, ) -> None: + """Initialize an AttackTechnique.""" self._attack = attack self._seed_technique = seed_technique @property def attack(self) -> AttackStrategy[Any, Any]: + """The attack strategy.""" return self._attack @property def seed_technique(self) -> SeedAttackTechniqueGroup | None: + """The optional technique seed group.""" return self._seed_technique def _build_identifier(self) -> ComponentIdentifier: diff --git a/tests/unit/scenarios/test_attack_technique.py b/tests/unit/scenarios/test_attack_technique.py index 57f85801aa..c3b0904575 100644 --- a/tests/unit/scenarios/test_attack_technique.py +++ b/tests/unit/scenarios/test_attack_technique.py @@ -87,9 +87,7 @@ def test_class_name_and_module(self): assert result.class_module == "pyrit.scenario.core.attack_technique" def test_attack_child_is_present(self): - attack_id = ComponentIdentifier( - class_name="PromptSendingAttack", class_module="pyrit.executor.attack" - ) + attack_id = ComponentIdentifier(class_name="PromptSendingAttack", class_module="pyrit.executor.attack") mock_attack = MagicMock(spec=AttackStrategy) mock_attack.get_identifier.return_value = attack_id technique = AttackTechnique(attack=mock_attack)