Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions extropy/core/models/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ class ReasoningContext(BaseModel):
background_context: str | None = Field(
default=None, description="Scenario-level background context"
)
identity_threat_summary: str | None = Field(
default=None,
description="Deterministic identity-relevance framing when scenario content threatens group identity",
)
agent_names: dict[str, str] = Field(
default_factory=dict,
description="Mapping of agent_id → first name for resolving peer references",
Expand Down Expand Up @@ -357,6 +361,10 @@ class ReasoningResponse(BaseModel):
position: str | None = Field(
default=None, description="Classified position (filled by Pass 2)"
)
public_position: str | None = Field(
default=None,
description="Public-facing position when THINK/SAY diverges (high fidelity)",
)
sentiment: float | None = Field(
default=None, description="Sentiment value (-1 to 1)"
)
Expand Down
202 changes: 198 additions & 4 deletions extropy/simulation/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
_BOUNDED_CONFIDENCE_RHO = 0.35
_PRIVATE_ADJUSTMENT_RHO = 0.12
_PRIVATE_FLIP_CONVICTION = CONVICTION_MAP[ConvictionLevel.FIRM]
_IDENTITY_VALUE_SENTINELS = {"", "unknown", "none", "n/a", "na", "null"}


class _StateTimelineAdapter:
Expand Down Expand Up @@ -926,24 +927,25 @@ def _process_reasoning_chunk(
public_conviction = max(0.0, min(1.0, public_conviction))

public_will_share = response.will_share
public_position = response.position
candidate_public_position = response.public_position or response.position
public_position = candidate_public_position

if (
old_public_conviction is not None
and old_public_conviction >= _FIRM_CONVICTION
):
if (
old_public_position is not None
and response.position is not None
and old_public_position != response.position
and candidate_public_position is not None
and old_public_position != candidate_public_position
):
new_conviction = (
public_conviction if public_conviction is not None else 0.0
)
if new_conviction < _MODERATE_CONVICTION:
logger.info(
f"[CONVICTION] Agent {agent_id}: public flip from {old_public_position} "
f"to {response.position} rejected (old conviction={float_to_conviction(old_public_conviction)}, "
f"to {candidate_public_position} rejected (old conviction={float_to_conviction(old_public_conviction)}, "
f"new conviction={float_to_conviction(public_conviction)})"
)
public_position = old_public_position
Expand Down Expand Up @@ -1447,6 +1449,9 @@ def _build_reasoning_context(
ctx.macro_summary = macro_summary
ctx.local_mood_summary = local_mood_summary
ctx.background_context = self.scenario.background_context
ctx.identity_threat_summary = self._render_identity_threat_context(
agent, timestep
)
ctx.agent_names = self._agent_names

# Populate Phase C fields
Expand Down Expand Up @@ -1476,6 +1481,194 @@ def _build_reasoning_context(

return ctx

@staticmethod
def _identity_value(agent: dict[str, Any], keys: tuple[str, ...]) -> str | None:
"""Return the first meaningful identity value from candidate attribute keys."""
for key in keys:
value = agent.get(key)
if value is None:
continue
if isinstance(value, bool):
if value:
return "yes"
continue
if isinstance(value, (list, tuple, set)):
if not value:
continue
return ", ".join(str(v) for v in value)
text = str(value).strip()
if text.lower() in _IDENTITY_VALUE_SENTINELS:
continue
return text.replace("_", " ")
return None

def _render_identity_threat_context(
self,
agent: dict[str, Any],
timestep: int,
) -> str | None:
"""Render deterministic identity-relevance framing from scenario + agent data."""
corpus_parts = [
self.scenario.meta.description,
self.scenario.event.content,
self.scenario.background_context,
]
if self.scenario.timeline:
for timeline_event in self.scenario.timeline:
if timeline_event.timestep <= timestep:
corpus_parts.append(timeline_event.event.content)
if timeline_event.description:
corpus_parts.append(timeline_event.description)
corpus = " ".join(str(part).lower() for part in corpus_parts if part).strip()
if not corpus:
return None

def scenario_mentions(keywords: tuple[str, ...]) -> bool:
return any(keyword in corpus for keyword in keywords)

identity_dimensions: list[str] = []

political_value = self._identity_value(
agent,
("political_orientation", "political_ideology", "party_affiliation"),
)
if political_value and scenario_mentions(
(
"liberal",
"conservative",
"left",
"right",
"republican",
"democrat",
"politic",
"ideolog",
"culture war",
"censorship",
"book ban",
"school board",
" ban ",
)
):
identity_dimensions.append(f"political orientation ({political_value})")

religious_value = self._identity_value(
agent, ("religious_affiliation", "religion", "faith_tradition")
)
if religious_value and scenario_mentions(
(
"religio",
"faith",
"church",
"mosque",
"temple",
"christian",
"muslim",
"jewish",
"moral",
)
):
identity_dimensions.append(f"religious affiliation ({religious_value})")

race_value = self._identity_value(agent, ("race_ethnicity", "race", "ethnicity"))
if race_value and scenario_mentions(
(
"race",
"racial",
"ethnic",
"minority",
"majority",
"immigra",
"inclusion",
"diversity",
"equity",
"civil rights",
"discrimination",
)
):
identity_dimensions.append(f"race/ethnicity ({race_value})")

gender_value = self._identity_value(
agent,
("gender_identity", "gender", "sexual_orientation"),
)
if gender_value and scenario_mentions(
(
"gender",
"women",
"woman",
"men",
"man",
"lgbt",
"lgbtq",
"queer",
"gay",
"lesbian",
"trans",
"transgender",
"sexual orientation",
"sexuality",
"pronoun",
)
):
identity_dimensions.append(f"gender/sexual identity ({gender_value})")

parental_value = self._identity_value(
agent, ("parental_status", "household_role", "family_role")
)
has_dependents = bool(agent.get("dependents"))
has_children_flag = bool(agent.get("has_children"))
if (
scenario_mentions(
(
"parent",
"parents",
"kid",
"kids",
"children",
"school",
"curriculum",
"classroom",
"book",
"library",
"parental rights",
)
)
and (parental_value or has_dependents or has_children_flag)
):
if parental_value:
identity_dimensions.append(f"parent/family role ({parental_value})")
elif has_dependents:
identity_dimensions.append("parent/family role (caregiver)")
else:
identity_dimensions.append("parent/family role")

citizenship_value = self._identity_value(
agent, ("citizenship_status", "nationality", "country_of_origin")
)
if citizenship_value and scenario_mentions(
(
"immigration",
"immigrant",
"border",
"citizenship",
"deport",
"national identity",
"patriot",
)
):
identity_dimensions.append(f"citizenship/national identity ({citizenship_value})")

if not identity_dimensions:
return None

framed_dimensions = "; ".join(identity_dimensions)
return (
"This development can feel identity-relevant, not just practical. "
f"Parts of who I am that may feel implicated: {framed_dimensions}. "
"If it feels personal, acknowledge that in both your internal reaction "
"and what you choose to say publicly."
)

def _get_peer_opinions(self, agent_id: str) -> list[PeerOpinion]:
"""Get opinions of connected peers who have visibly shared.

Expand Down Expand Up @@ -2041,6 +2234,7 @@ def _export_results(self) -> None:
"population_size": len(self.agents),
"strong_model": self.config.strong,
"fast_model": self.config.fast,
"fidelity": self.config.fidelity,
"seed": self.seed,
"multi_touch_threshold": self.config.multi_touch_threshold,
"completed_at": datetime.now().isoformat(),
Expand Down
Loading
Loading