Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pyrit/backend/services/attack_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,20 @@ async def _update_attack_after_message_async(
if ar.atomic_attack_identifier:
atomic = ComponentIdentifier.from_dict(ar.atomic_attack_identifier.to_dict())
atomic_children = dict(atomic.children)
atomic_children["attack"] = new_aid
# Navigate into attack_technique child to update the nested attack child.
technique = atomic_children.get("attack_technique")
if isinstance(technique, ComponentIdentifier):
tech_children = dict(technique.children)
tech_children["attack"] = new_aid
atomic_children["attack_technique"] = ComponentIdentifier(
class_name=technique.class_name,
class_module=technique.class_module,
params=dict(technique.params),
children=tech_children,
)
else:
# Fallback for pre-nesting rows with children["attack"] directly.
atomic_children["attack"] = new_aid
new_atomic = ComponentIdentifier(
class_name=atomic.class_name,
class_module=atomic.class_module,
Expand Down
2 changes: 1 addition & 1 deletion pyrit/identifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
__all__ = [
"AtomicAttackEvaluationIdentifier",
"build_atomic_attack_identifier",
"ChildEvalRule",
"build_seed_identifier",
"ChildEvalRule",
"class_name_to_snake_case",
"ComponentIdentifier",
"compute_eval_hash",
Expand Down
75 changes: 49 additions & 26 deletions pyrit/identifiers/atomic_attack_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
by combining the attack strategy's identity with the seed identifiers from
the dataset.

The composite identifier always has the same shape:
class_name = "AtomicAttack"
children["attack"] = attack strategy's ComponentIdentifier
children["seeds"] = list of seed ComponentIdentifiers
(may be empty when no seeds are present)
The composite identifier has this shape::

AtomicAttack
├── attack_technique (class_name="AttackTechnique")
│ ├── attack (attack strategy's ComponentIdentifier)
│ └── technique_seeds (optional, list of seed ComponentIdentifiers)
└── seed_group (list of ALL seed ComponentIdentifiers, for traceability)
"""

import logging
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any

from pyrit.identifiers.component_identifier import ComponentIdentifier

Expand All @@ -30,6 +32,9 @@
_ATOMIC_ATTACK_CLASS_NAME = "AtomicAttack"
_ATOMIC_ATTACK_CLASS_MODULE = "pyrit.scenario.core.atomic_attack"

_ATTACK_TECHNIQUE_CLASS_NAME = "AttackTechnique"
_ATTACK_TECHNIQUE_CLASS_MODULE = "pyrit.scenario.core.attack_technique"


def build_seed_identifier(seed: "Seed") -> ComponentIdentifier:
"""
Expand All @@ -40,10 +45,10 @@ def build_seed_identifier(seed: "Seed") -> ComponentIdentifier:
always produces the same identifier.

Args:
seed (Seed): The seed to build an identifier for.
seed: The seed to build an identifier for.

Returns:
ComponentIdentifier: An identifier capturing the seed's behavioral properties.
An identifier capturing the seed's behavioral properties.
"""
params: dict[str, Any] = {
"value": seed.value,
Expand All @@ -61,39 +66,57 @@ def build_seed_identifier(seed: "Seed") -> ComponentIdentifier:

def build_atomic_attack_identifier(
*,
attack_identifier: ComponentIdentifier,
seed_group: Optional["SeedGroup"] = None,
technique_identifier: ComponentIdentifier | None = None,
attack_identifier: ComponentIdentifier | None = None,
seed_group: "SeedGroup | None" = None,
) -> ComponentIdentifier:
"""
Build a composite ComponentIdentifier for an atomic attack.

Combines the attack strategy's identity with identifiers for all seeds
from the seed group. Every seed in the group is included in the identity;
each seed's ``is_general_technique`` flag is captured as a param so that
downstream consumers (e.g., evaluation identity) can filter as needed.
The identifier places the attack technique in ``children["attack_technique"]``
and all seeds from the seed group in ``children["seed_group"]`` for traceability.

When no seed_group is provided, the resulting identifier has an empty
``seeds`` children list, but still has the standard ``AtomicAttack``
shape for consistent querying.
Callers that have an ``AttackTechnique`` object should pass
``technique_identifier=attack_technique.get_identifier()``.
Callers that only have a raw attack strategy identifier (e.g. legacy
backward-compat paths) can pass ``attack_identifier`` instead, which is
wrapped in a minimal technique node automatically.

Args:
attack_identifier (ComponentIdentifier): The attack strategy's identifier
(from ``attack.get_identifier()``).
seed_group (Optional[SeedGroup]): The seed group to extract seeds from.
If None, the identifier has an empty seeds list.
technique_identifier: Pre-built technique identifier from
``AttackTechnique.get_identifier()``. Mutually exclusive with
``attack_identifier``.
attack_identifier: Raw attack strategy identifier. Used when no
``AttackTechnique`` instance is available. Mutually exclusive
with ``technique_identifier``.
seed_group: The seed group to extract all seeds from.

Returns:
ComponentIdentifier: A composite identifier with class_name="AtomicAttack",
the attack as a child, and seed identifiers as children.
A composite ComponentIdentifier with class_name="AtomicAttack".

Raises:
ValueError: If both or neither of ``technique_identifier`` and
``attack_identifier`` are provided.
"""
seed_identifiers: list[ComponentIdentifier] = []
if technique_identifier is not None and attack_identifier is not None:
raise ValueError("Provide technique_identifier or attack_identifier, not both")

if technique_identifier is None:
if attack_identifier is None:
raise ValueError("Either technique_identifier or attack_identifier must be provided")
technique_identifier = ComponentIdentifier(
class_name=_ATTACK_TECHNIQUE_CLASS_NAME,
class_module=_ATTACK_TECHNIQUE_CLASS_MODULE,
children={"attack": attack_identifier},
)

seed_identifiers: list[ComponentIdentifier] = []
if seed_group is not None:
seed_identifiers.extend(build_seed_identifier(seed) for seed in seed_group.seeds)

children: dict[str, Any] = {
"attack": attack_identifier,
"seeds": seed_identifiers,
"attack_technique": technique_identifier,
"seed_group": seed_identifiers,
}

return ComponentIdentifier(
Expand Down
17 changes: 10 additions & 7 deletions pyrit/identifiers/evaluation_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,17 @@ class AtomicAttackEvaluationIdentifier(EvaluationIdentifier):

Per-child rules:

* ``seed_group`` — excluded entirely (present for traceability only).
* ``attack_technique`` — not listed, so fully included by default.
Its nested children (``objective_target``, ``adversarial_chat``,
``objective_scorer``, ``technique_seeds``) are processed recursively
using the same rules dict, so the rules below apply at any depth.
* ``objective_target`` — include only ``temperature``.
* ``adversarial_chat`` — include ``model_name``, ``temperature``, ``top_p``.
* ``objective_scorer`` — excluded entirely.
* ``seeds`` — include only items where ``is_general_technique=True``.

Non-target children (e.g., ``request_converters``, ``response_converters``)
receive full recursive eval treatment, meaning they fully contribute to
the hash.
Non-target children (e.g., ``request_converters``, ``response_converters``,
``technique_seeds``) receive full recursive eval treatment.
"""

CHILD_EVAL_RULES: ClassVar[dict[str, ChildEvalRule]] = {
Expand All @@ -238,7 +241,7 @@ class AtomicAttackEvaluationIdentifier(EvaluationIdentifier):
included_params=frozenset({"model_name", "temperature", "top_p"}),
),
"objective_scorer": ChildEvalRule(exclude=True),
"seeds": ChildEvalRule(
included_item_values={"is_general_technique": True},
),
"seed_group": ChildEvalRule(exclude=True),
# attack_technique: intentionally omitted — fully included in eval hash.
# technique_seeds (inside attack_technique): also omitted — fully included.
}
8 changes: 4 additions & 4 deletions pyrit/memory/azure_sql_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,19 +483,19 @@ def get_unique_attack_class_names(self) -> list[str]:
rows = session.execute(
text(
"""SELECT DISTINCT JSON_VALUE(atomic_attack_identifier,
'$.children.attack.class_name') AS cls
'$.children.attack_technique.children.attack.class_name') AS cls
FROM "AttackResultEntries"
WHERE ISJSON(atomic_attack_identifier) = 1
AND JSON_VALUE(atomic_attack_identifier,
'$.children.attack.class_name') IS NOT NULL"""
'$.children.attack_technique.children.attack.class_name') IS NOT NULL"""
)
).fetchall()
return sorted(row[0] for row in rows)

def get_unique_converter_class_names(self) -> list[str]:
"""
Azure SQL implementation: extract unique converter class_name values
from the children.attack.children.request_converters array
from the children.attack_technique.children.attack.children.request_converters array
in the atomic_attack_identifier JSON column.

Returns:
Expand All @@ -507,7 +507,7 @@ def get_unique_converter_class_names(self) -> list[str]:
"""SELECT DISTINCT JSON_VALUE(c.value, '$.class_name') AS cls
FROM "AttackResultEntries"
CROSS APPLY OPENJSON(JSON_QUERY(atomic_attack_identifier,
'$.children.attack.children.request_converters')) AS c
'$.children.attack_technique.children.attack.children.request_converters')) AS c
WHERE ISJSON(atomic_attack_identifier) = 1
AND JSON_VALUE(c.value, '$.class_name') IS NOT NULL"""
)
Expand Down
4 changes: 2 additions & 2 deletions pyrit/memory/memory_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,7 @@ def get_attack_results(
conditions.append(
self._get_condition_json_property_match(
json_column=AttackResultEntry.atomic_attack_identifier,
property_path="$.children.attack.class_name",
property_path="$.children.attack_technique.children.attack.class_name",
value=attack_class,
case_sensitive=True,
)
Expand All @@ -1549,7 +1549,7 @@ def get_attack_results(
conditions.append(
self._get_condition_json_array_match(
json_column=AttackResultEntry.atomic_attack_identifier,
property_path="$.children.attack.children.request_converters",
property_path="$.children.attack_technique.children.attack.children.request_converters",
array_element_path="$.class_name",
array_to_match=converter_classes,
)
Expand Down
9 changes: 5 additions & 4 deletions pyrit/memory/sqlite_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,16 +622,17 @@ def get_unique_attack_class_names(self) -> list[str]:
"""
with closing(self.get_session()) as session:
class_name_expr = func.json_extract(
AttackResultEntry.atomic_attack_identifier, "$.children.attack.class_name"
AttackResultEntry.atomic_attack_identifier,
"$.children.attack_technique.children.attack.class_name",
)
rows = session.query(class_name_expr).filter(class_name_expr.isnot(None)).distinct().all()
return sorted(row[0] for row in rows)

def get_unique_converter_class_names(self) -> list[str]:
"""
SQLite implementation: extract unique converter class_name values
from the children.attack.children.request_converters array in the
atomic_attack_identifier JSON column.
from the children.attack_technique.children.attack.children.request_converters
array in the atomic_attack_identifier JSON column.

Returns:
Sorted list of unique converter class name strings.
Expand All @@ -643,7 +644,7 @@ def get_unique_converter_class_names(self) -> list[str]:
FROM "AttackResultEntries",
json_each(
json_extract("AttackResultEntries".atomic_attack_identifier,
'$.children.attack.children.request_converters')
'$.children.attack_technique.children.attack.children.request_converters')
) AS j
WHERE cls IS NOT NULL"""
)
Expand Down
12 changes: 10 additions & 2 deletions pyrit/models/attack_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,23 @@ def get_attack_strategy_identifier(self) -> Optional[ComponentIdentifier]:
Return the attack strategy identifier from the composite atomic identifier.

This is the non-deprecated replacement for the ``attack_identifier`` property.
Extracts and returns the ``"attack"`` child from ``atomic_attack_identifier``.
Extracts the ``"attack"`` child from the nested ``"attack_technique"`` child
of ``atomic_attack_identifier``.

Falls back to ``children["attack"]`` for rows created before the nested
structure was introduced.

Returns:
Optional[ComponentIdentifier]: The attack strategy identifier, or ``None`` if
``atomic_attack_identifier`` is not set.
``atomic_attack_identifier`` is not set or the expected children are missing.

"""
if self.atomic_attack_identifier is None:
return None
technique = self.atomic_attack_identifier.get_child("attack_technique")
if technique is not None:
return technique.get_child("attack")
# Fallback for pre-nesting rows that had children["attack"] directly.
return self.atomic_attack_identifier.get_child("attack")

def get_conversations_by_type(self, conversation_type: ConversationType) -> list[ConversationReference]:
Expand Down
27 changes: 27 additions & 0 deletions pyrit/models/seeds/seed_attack_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from collections.abc import Sequence

from pyrit.models.seeds.seed import Seed
from pyrit.models.seeds.seed_attack_technique_group import SeedAttackTechniqueGroup


class SeedAttackGroup(SeedGroup):
Expand Down Expand Up @@ -96,3 +97,29 @@ def objective(self) -> SeedObjective:
obj = self._get_objective()
assert obj is not None, "SeedAttackGroup should always have an objective"
return obj

def with_technique(self, *, technique: SeedAttackTechniqueGroup) -> SeedAttackGroup:
"""
Return a new SeedAttackGroup with technique seeds merged in.

The original group is not mutated. Technique seeds are inserted at
``technique.insertion_index`` (or appended at the end when ``None``).

Args:
technique: A validated SeedAttackTechniqueGroup whose seeds will be merged.

Returns:
A new SeedAttackGroup with the merged seeds.
"""
base = list(self.seeds)
idx = technique.insertion_index
technique_seeds = list(technique.seeds)
merged_seeds = base + technique_seeds if idx is None else base[:idx] + technique_seeds + base[idx:]

# Clear group IDs so the new group assigns a fresh one.
# This mutates the seed objects, but _enforce_consistent_group_id
# in the constructor will immediately overwrite with a new UUID.
for seed in merged_seeds:
seed.prompt_group_id = None

return SeedAttackGroup(seeds=merged_seeds)
Loading
Loading